AI Integration
Integrate computer vision and AI models into your HORUS applications using OpenCV, external Python scripts, or cloud APIs. HORUS's sub-microsecond communication makes it perfect for combining real-time control with AI inference.
Overview
HORUS supports multiple AI integration patterns:
Local Vision Processing (Real-Time)
- OpenCV computer vision (optional feature flag)
- Edge detection, tracking, blob detection
- Camera capture and processing
- 5-50ms typical latency
External AI Services (Background Tasks)
- Python ML models via subprocess or HTTP
- Cloud APIs (OpenAI, Anthropic, etc.)
- Custom inference servers
- 50-5000ms typical latency
Architecture Pattern:
Camera Node ──Hub──> Vision Node ──Hub──> Control Node
(Fast) (AI/CV) (Real-time)
~16ms ~20-50ms ~1ms
OpenCV Computer Vision
HORUS includes optional OpenCV integration for camera capture and real-time vision processing.
Enabling OpenCV
Add to Cargo.toml:
[dependencies]
horus_library = { version = "0.1.0", features = ["opencv-backend"] }
Using the Camera Node
The built-in CameraNode supports OpenCV:
use horus::prelude::*;
use horus_library::CameraNode;
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Camera node with OpenCV backend
let mut camera = CameraNode::new()?;
camera.set_device_id(0); // Default camera
camera.set_resolution(640, 480);
camera.set_fps(30.0);
camera.set_topic("camera/raw");
scheduler.register(Box::new(camera), 0, Some(true));
scheduler.tick_all()?;
Ok(())
}
Custom OpenCV Processing
use horus::prelude::*;
use opencv::{
prelude::*,
core::{Mat, Vector},
imgproc,
videoio::{VideoCapture, CAP_ANY}
};
#[derive(Clone, Debug)]
pub struct ImageFrame {
pub stamp_nanos: u64,
pub width: u32,
pub height: u32,
pub data: Vec<u8>,
}
pub struct VisionProcessorNode {
camera_sub: Hub<ImageFrame>,
detections_pub: Hub<String>,
}
impl VisionProcessorNode {
pub fn new() -> HorusResult<Self> {
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
detections_pub: Hub::new("vision/detections")?,
})
}
fn process_frame(&self, frame_data: &[u8], width: u32, height: u32) -> Option<String> {
// Convert to OpenCV Mat
let mut mat = Mat::from_slice(frame_data).ok()?;
let mat = mat.reshape(3, height as i32).ok()?;
// Convert to grayscale
let mut gray = Mat::default();
imgproc::cvt_color(&mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0).ok()?;
// Simple edge detection
let mut edges = Mat::default();
imgproc::canny(&gray, &mut edges, 50.0, 150.0, 3, false).ok()?;
// Count edge pixels (simple object detection metric)
let edge_count = imgproc::count_non_zero(&edges).ok()?;
Some(format!("edges_detected:{}", edge_count))
}
}
impl Node for VisionProcessorNode {
fn name(&self) -> &'static str { "VisionProcessorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
if let Some(result) = self.process_frame(&frame.data, frame.width, frame.height) {
self.detections_pub.send(result, ctx).ok();
}
}
}
}
AI Model Inference
HORUS can integrate with various AI model formats and inference engines commonly used in robotics.
ONNX Runtime (Recommended)
ONNX Runtime provides cross-platform inference for models from PyTorch, TensorFlow, scikit-learn, and more.
Add dependencies:
[dependencies]
ort = "2.0" # ONNX Runtime bindings
ndarray = "0.15"
image = "0.24"
YOLOv8 Object Detection Example:
use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::{Array, IxDyn};
use image::{DynamicImage, imageops::FilterType};
#[derive(Clone, Debug)]
pub struct Detection {
pub class_id: usize,
pub confidence: f32,
pub bbox: [f32; 4], // x, y, w, h
}
pub struct YOLOv8Node {
camera_sub: Hub<ImageFrame>,
detections_pub: Hub<Vec<Detection>>,
session: Session,
}
impl YOLOv8Node {
pub fn new(model_path: &str) -> HorusResult<Self> {
// Load ONNX model
let session = Session::builder()
.map_err(|e| HorusError::Config(format!("ONNX error: {}", e)))?
.with_optimization_level(GraphOptimizationLevel::Level3)
.map_err(|e| HorusError::Config(format!("ONNX error: {}", e)))?
.commit_from_file(model_path)
.map_err(|e| HorusError::Config(format!("Failed to load model: {}", e)))?;
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
detections_pub: Hub::new("vision/detections")?,
session,
})
}
fn preprocess_image(&self, img_data: &[u8], width: u32, height: u32) -> Option<Array<f32, IxDyn>> {
// Resize to model input size (640x640 for YOLOv8)
let img = image::RgbImage::from_raw(width, height, img_data.to_vec())?;
let img = DynamicImage::ImageRgb8(img).resize_exact(640, 640, FilterType::Triangle);
// Convert to CHW format and normalize
let mut input = Array::zeros(IxDyn(&[1, 3, 640, 640]));
for (x, y, pixel) in img.to_rgb8().enumerate_pixels() {
input[[0, 0, y as usize, x as usize]] = pixel[0] as f32 / 255.0;
input[[0, 1, y as usize, x as usize]] = pixel[1] as f32 / 255.0;
input[[0, 2, y as usize, x as usize]] = pixel[2] as f32 / 255.0;
}
Some(input)
}
fn run_inference(&self, input: Array<f32, IxDyn>) -> Option<Vec<Detection>> {
// Run model
let outputs = self.session.run(ort::inputs![input].ok()?).ok()?;
// Parse YOLOv8 output format
let output = outputs[0].try_extract_tensor::<f32>().ok()?;
let output = output.view();
let mut detections = Vec::new();
// YOLOv8 output: [batch, 84, 8400] where 84 = 4 bbox + 80 classes
for i in 0..8400 {
let confidence = output[[0, 4, i]]; // First class score as confidence
if confidence > 0.5 { // Confidence threshold
let x = output[[0, 0, i]];
let y = output[[0, 1, i]];
let w = output[[0, 2, i]];
let h = output[[0, 3, i]];
// Find highest class
let mut max_class = 0;
let mut max_score = 0.0;
for c in 0..80 {
let score = output[[0, 4 + c, i]];
if score > max_score {
max_score = score;
max_class = c;
}
}
detections.push(Detection {
class_id: max_class,
confidence: max_score,
bbox: [x, y, w, h],
});
}
}
Some(detections)
}
}
impl Node for YOLOv8Node {
fn name(&self) -> &'static str { "YOLOv8Node" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
if let Some(input) = self.preprocess_image(&frame.data, frame.width, frame.height) {
if let Some(detections) = self.run_inference(input) {
self.detections_pub.send(detections, ctx).ok();
}
}
}
}
}
Download YOLOv8 ONNX:
# Export from Ultralytics
pip install ultralytics
python -c "from ultralytics import YOLO; model = YOLO('yolov8n.pt'); model.export(format='onnx')"
TensorFlow Lite (Edge Devices)
For embedded systems and edge devices:
Add dependencies:
[dependencies]
tflite = "0.9"
MobileNet Classification:
use horus::prelude::*;
use tflite::{FlatBufferModel, InterpreterBuilder, ops::builtin::BuiltinOpResolver};
pub struct TFLiteClassifierNode {
camera_sub: Hub<ImageFrame>,
class_pub: Hub<String>,
interpreter: tflite::Interpreter,
labels: Vec<String>,
}
impl TFLiteClassifierNode {
pub fn new(model_path: &str, labels_path: &str) -> HorusResult<Self> {
// Load TFLite model
let model = FlatBufferModel::build_from_file(model_path)
.map_err(|e| HorusError::Config(format!("TFLite error: {}", e)))?;
let resolver = BuiltinOpResolver::default();
let builder = InterpreterBuilder::new(model, resolver)
.map_err(|e| HorusError::Config(format!("Builder error: {}", e)))?;
let mut interpreter = builder.build()
.map_err(|e| HorusError::Config(format!("Interpreter error: {}", e)))?;
interpreter.allocate_tensors()
.map_err(|e| HorusError::Config(format!("Allocation error: {}", e)))?;
// Load class labels
let labels = std::fs::read_to_string(labels_path)
.map_err(|e| HorusError::Config(format!("Labels error: {}", e)))?
.lines()
.map(String::from)
.collect();
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
class_pub: Hub::new("vision/classification")?,
interpreter,
labels,
})
}
fn classify(&mut self, img_data: &[u8]) -> Option<String> {
// Preprocess and set input tensor
let input_tensor = self.interpreter.input(0)?;
// ... preprocessing code ...
// Run inference
self.interpreter.invoke().ok()?;
// Get output
let output_tensor = self.interpreter.output(0)?;
let scores = output_tensor.data::<f32>();
// Find top class
let (max_idx, max_score) = scores
.iter()
.enumerate()
.max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())?;
Some(format!("{}: {:.2}%", self.labels[max_idx], max_score * 100.0))
}
}
impl Node for TFLiteClassifierNode {
fn name(&self) -> &'static str { "TFLiteClassifierNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
if let Some(result) = self.classify(&frame.data) {
self.class_pub.send(result, ctx).ok();
}
}
}
}
Tract (Pure Rust Inference)
Tract is a pure-Rust ONNX/TensorFlow inference engine - no external dependencies!
Add dependencies:
[dependencies]
tract-onnx = "0.21"
Example:
use horus::prelude::*;
use tract_onnx::prelude::*;
pub struct TractInferenceNode {
camera_sub: Hub<ImageFrame>,
output_pub: Hub<String>,
model: SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>,
}
impl TractInferenceNode {
pub fn new(model_path: &str) -> HorusResult<Self> {
// Load ONNX model with Tract
let model = tract_onnx::onnx()
.model_for_path(model_path)
.map_err(|e| HorusError::Config(format!("Tract error: {}", e)))?
.into_optimized()
.map_err(|e| HorusError::Config(format!("Optimization error: {}", e)))?
.into_runnable()
.map_err(|e| HorusError::Config(format!("Runnable error: {}", e)))?;
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
output_pub: Hub::new("vision/output")?,
model,
})
}
fn run_inference(&self, input_data: &[f32]) -> Option<String> {
let input = tract_ndarray::arr1(input_data).into_dyn();
let result = self.model.run(tvec!(input.into())).ok()?;
// Process output
let output = result[0].to_array_view::<f32>().ok()?;
Some(format!("Inference result: {:?}", output))
}
}
impl Node for TractInferenceNode {
fn name(&self) -> &'static str { "TractInferenceNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
// Convert frame to input format
let input_data: Vec<f32> = frame.data.iter().map(|&x| x as f32 / 255.0).collect();
if let Some(result) = self.run_inference(&input_data) {
self.output_pub.send(result, ctx).ok();
}
}
}
}
Model Format Comparison
| Format | Use Case | Rust Support | Edge Devices |
|---|---|---|---|
| ONNX | General (PyTorch, TF) | ort crate | ✓ |
| TFLite | Mobile/Edge | tflite crate | ✓✓ |
| Tract | Pure Rust | Native | ✓ |
| CoreML | Apple Silicon | Via Python | ✓ |
| TensorRT | NVIDIA GPUs | Via C++ | ✓✓ |
Popular Models for Robotics
Object Detection:
- YOLOv8 (ONNX) - Real-time detection
- MobileNet SSD (TFLite) - Edge devices
- EfficientDet (ONNX) - High accuracy
Semantic Segmentation:
- DeepLabV3 (ONNX) - Scene understanding
- U-Net (ONNX) - Medical/precision tasks
Pose Estimation:
- MediaPipe (TFLite) - Human pose
- OpenPose (ONNX) - Multi-person
Depth Estimation:
- MiDaS (ONNX) - Monocular depth
- DPT (ONNX) - Dense prediction
Download pre-trained models:
# Hugging Face
huggingface-cli download ultralytics/yolov8n --include "*.onnx"
# TensorFlow Hub
wget https://tfhub.dev/tensorflow/lite-model/ssd_mobilenet_v1/1/metadata/1?lite-format=tflite
# ONNX Model Zoo
wget https://github.com/onnx/models/raw/main/vision/object_detection_segmentation/yolov4/model/yolov4.onnx
Large Language Models (LLMs)
Integrate LLMs for natural language commands, task planning, and vision-language reasoning in robotics.
Local LLM Inference
Run LLMs locally using Rust bindings for llama.cpp or Candle.
llama-cpp-rs (Recommended)
Add dependencies:
[dependencies]
llama_cpp_rs = "0.4"
LLM Command Interpreter Node:
use horus::prelude::*;
use llama_cpp_rs::{LlamaModel, LlamaParams, LlamaContext};
pub struct LLMCommandNode {
text_sub: Hub<String>, // Natural language commands
cmd_pub: Hub<String>, // Structured robot commands
context: LlamaContext,
}
impl LLMCommandNode {
pub fn new(model_path: &str) -> HorusResult<Self> {
// Load LLaMA model (Llama 2, Mistral, Phi, etc.)
let params = LlamaParams::default();
let model = LlamaModel::load_from_file(model_path, params)
.map_err(|e| HorusError::Config(format!("Failed to load LLM: {}", e)))?;
let context = model.create_context(LlamaParams::default())
.map_err(|e| HorusError::Config(format!("Context error: {}", e)))?;
Ok(Self {
text_sub: Hub::new("speech/text")?,
cmd_pub: Hub::new("robot/commands")?,
context,
})
}
fn generate_command(&mut self, user_input: &str) -> Option<String> {
let prompt = format!(
"You are a robot command interpreter. Convert natural language to JSON commands.\n\
User: {}\n\
Command (JSON): ",
user_input
);
// Generate response
let output = self.context.predict(
prompt,
64, // max tokens
true, // echo prompt
|token| {
// Token callback
print!("{}", token);
true
}
).ok()?;
// Extract JSON command
Some(output.trim().to_string())
}
}
impl Node for LLMCommandNode {
fn name(&self) -> &'static str { "LLMCommandNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(text) = self.text_sub.recv(ctx) {
if let Some(command) = self.generate_command(&text) {
self.cmd_pub.send(command, ctx).ok();
}
}
}
}
Example usage:
# Download a model (Llama 2, Mistral, Phi-2, etc.)
wget https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf
# Run with HORUS
horus run robot_with_llm.rs
Supported models:
- Llama 2 (7B, 13B, 70B)
- Mistral 7B
- Phi-2 (2.7B - fast on CPU!)
- Mixtral 8x7B
- CodeLlama
- Any GGUF format model
Candle (Pure Rust)
Candle is HuggingFace's pure Rust ML framework:
Add dependencies:
[dependencies]
candle-core = "0.3"
candle-nn = "0.3"
candle-transformers = "0.3"
tokenizers = "0.15"
Candle LLM Node:
use horus::prelude::*;
use candle_core::{Device, Tensor};
use candle_transformers::models::llama::{Llama, Config};
use tokenizers::Tokenizer;
pub struct CandleLLMNode {
text_sub: Hub<String>,
response_pub: Hub<String>,
model: Llama,
tokenizer: Tokenizer,
device: Device,
}
impl CandleLLMNode {
pub fn new(model_path: &str, tokenizer_path: &str) -> HorusResult<Self> {
let device = Device::cuda_if_available(0)
.map_err(|e| HorusError::Config(format!("Device error: {}", e)))?;
let config = Config::default();
let model = Llama::load(model_path, &config, &device)
.map_err(|e| HorusError::Config(format!("Model load error: {}", e)))?;
let tokenizer = Tokenizer::from_file(tokenizer_path)
.map_err(|e| HorusError::Config(format!("Tokenizer error: {}", e)))?;
Ok(Self {
text_sub: Hub::new("user/input")?,
response_pub: Hub::new("llm/response")?,
model,
tokenizer,
device,
})
}
fn generate(&self, prompt: &str, max_tokens: usize) -> Option<String> {
// Tokenize input
let encoding = self.tokenizer.encode(prompt, true).ok()?;
let tokens = encoding.get_ids();
let input_tensor = Tensor::new(tokens, &self.device).ok()?
.unsqueeze(0).ok()?;
// Generate response
let output = self.model.forward(&input_tensor, 0).ok()?;
// Decode tokens
let output_tokens: Vec<u32> = output
.to_vec1::<u32>()
.ok()?;
self.tokenizer.decode(&output_tokens, true).ok()
}
}
impl Node for CandleLLMNode {
fn name(&self) -> &'static str { "CandleLLMNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(input) = self.text_sub.recv(ctx) {
if let Some(response) = self.generate(&input, 100) {
self.response_pub.send(response, ctx).ok();
}
}
}
}
Cloud LLM APIs
Use cloud LLMs for complex reasoning and vision-language tasks.
OpenAI GPT-4
Add dependencies:
[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
GPT-4 Task Planning Node:
use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
temperature: f32,
max_tokens: u32,
}
#[derive(Serialize, Deserialize, Clone)]
struct ChatMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct ChatResponse {
choices: Vec<ChatChoice>,
}
#[derive(Deserialize)]
struct ChatChoice {
message: ChatMessage,
}
pub struct GPT4PlannerNode {
goal_sub: Hub<String>,
plan_pub: Hub<String>,
client: Client,
api_key: String,
conversation: Vec<ChatMessage>,
}
impl GPT4PlannerNode {
pub fn new(api_key: String) -> HorusResult<Self> {
let system_prompt = ChatMessage {
role: "system".to_string(),
content: "You are a robot task planner. Generate step-by-step plans in JSON format.".to_string(),
};
Ok(Self {
goal_sub: Hub::new("user/goal")?,
plan_pub: Hub::new("robot/plan")?,
client: Client::new(),
api_key,
conversation: vec![system_prompt],
})
}
fn generate_plan(&mut self, goal: &str) -> Option<String> {
// Add user message
self.conversation.push(ChatMessage {
role: "user".to_string(),
content: goal.to_string(),
});
let request = ChatRequest {
model: "gpt-4-turbo-preview".to_string(),
messages: self.conversation.clone(),
temperature: 0.7,
max_tokens: 500,
};
let response = self.client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&request)
.send()
.ok()?;
let chat_response: ChatResponse = response.json().ok()?;
let assistant_message = chat_response.choices[0].message.clone();
// Add to conversation history
self.conversation.push(assistant_message.clone());
Some(assistant_message.content)
}
}
impl Node for GPT4PlannerNode {
fn name(&self) -> &'static str { "GPT4PlannerNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(goal) = self.goal_sub.recv(ctx) {
if let Some(plan) = self.generate_plan(&goal) {
self.plan_pub.send(plan, ctx).ok();
}
}
}
}
Anthropic Claude
Claude for Robot Reasoning:
use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
struct ClaudeRequest {
model: String,
max_tokens: u32,
messages: Vec<ClaudeMessage>,
}
#[derive(Serialize, Deserialize)]
struct ClaudeMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct ClaudeResponse {
content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
struct ContentBlock {
text: String,
}
pub struct ClaudeReasoningNode {
sensor_sub: Hub<String>, // Sensor readings as JSON
decision_pub: Hub<String>,
client: Client,
api_key: String,
}
impl ClaudeReasoningNode {
pub fn new(api_key: String) -> HorusResult<Self> {
Ok(Self {
sensor_sub: Hub::new("sensors/summary")?,
decision_pub: Hub::new("robot/decision")?,
client: Client::new(),
api_key,
})
}
fn reason(&self, sensor_data: &str) -> Option<String> {
let request = ClaudeRequest {
model: "claude-3-opus-20240229".to_string(),
max_tokens: 1024,
messages: vec![ClaudeMessage {
role: "user".to_string(),
content: format!(
"Given these robot sensor readings: {}\n\
What should the robot do next? Respond with a JSON action.",
sensor_data
),
}],
};
let response = self.client
.post("https://api.anthropic.com/v1/messages")
.header("x-api-key", &self.api_key)
.header("anthropic-version", "2023-06-01")
.json(&request)
.send()
.ok()?;
let claude_response: ClaudeResponse = response.json().ok()?;
Some(claude_response.content[0].text.clone())
}
}
impl Node for ClaudeReasoningNode {
fn name(&self) -> &'static str { "ClaudeReasoningNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(sensors) = self.sensor_sub.recv(ctx) {
if let Some(decision) = self.reason(&sensors) {
self.decision_pub.send(decision, ctx).ok();
}
}
}
}
Vision-Language Models (VLMs)
Combine vision and language for advanced robot perception.
GPT-4 Vision for Scene Understanding
use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use base64::{Engine as _, engine::general_purpose};
#[derive(Serialize)]
struct VisionMessage {
role: String,
content: Vec<VisionContent>,
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum VisionContent {
Text { text: String },
ImageUrl { image_url: ImageUrl },
}
#[derive(Serialize)]
struct ImageUrl {
url: String,
detail: String,
}
pub struct SceneUnderstandingNode {
camera_sub: Hub<ImageFrame>,
description_pub: Hub<String>,
client: Client,
api_key: String,
}
impl SceneUnderstandingNode {
pub fn new(api_key: String) -> HorusResult<Self> {
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
description_pub: Hub::new("scene/description")?,
client: Client::new(),
api_key,
})
}
fn understand_scene(&self, image_data: &[u8]) -> Option<String> {
// Convert to base64
let base64_image = general_purpose::STANDARD.encode(image_data);
let data_url = format!("data:image/jpeg;base64,{}", base64_image);
let message = VisionMessage {
role: "user".to_string(),
content: vec![
VisionContent::Text {
text: "Describe this scene for a robot. What objects are present? \
What can the robot interact with? Are there any obstacles?".to_string(),
},
VisionContent::ImageUrl {
image_url: ImageUrl {
url: data_url,
detail: "low".to_string(), // "low" or "high"
},
},
],
};
let request = serde_json::json!({
"model": "gpt-4-vision-preview",
"messages": [message],
"max_tokens": 300
});
let response = self.client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&request)
.send()
.ok()?;
let result: serde_json::Value = response.json().ok()?;
Some(result["choices"][0]["message"]["content"].as_str()?.to_string())
}
}
impl Node for SceneUnderstandingNode {
fn name(&self) -> &'static str { "SceneUnderstandingNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
// Process every 30th frame to reduce API calls
if frame.stamp_nanos % 30 == 0 {
if let Some(description) = self.understand_scene(&frame.data) {
self.description_pub.send(description, ctx).ok();
}
}
}
}
}
Complete Example: Voice-Controlled Robot
use horus::prelude::*;
use horus_library::DifferentialDriveNode;
// Combine speech recognition + LLM + robot control
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// 1. Speech-to-text (via external service or Whisper)
let speech_node = SpeechRecognitionNode::new()?;
// 2. LLM converts natural language to commands
let api_key = std::env::var("OPENAI_API_KEY")
.expect("Set OPENAI_API_KEY environment variable");
let llm_node = GPT4PlannerNode::new(api_key)?;
// 3. Command parser extracts motor commands
let parser_node = CommandParserNode::new()?;
// 4. Motor control
let drive_node = DifferentialDriveNode::new();
// Register nodes
scheduler.register(Box::new(speech_node), 0, Some(true));
scheduler.register(Box::new(llm_node), 1, Some(true));
scheduler.register(Box::new(parser_node), 2, Some(true));
scheduler.register(Box::new(drive_node), 3, Some(true));
// User says: "Move forward slowly"
// → GPT-4: {"action": "move", "linear": 0.3, "angular": 0.0}
// → Robot moves forward at 0.3 m/s
scheduler.tick_all()?;
Ok(())
}
pub struct CommandParserNode {
llm_sub: Hub<String>,
cmd_pub: Hub<(f32, f32)>,
}
impl CommandParserNode {
pub fn new() -> HorusResult<Self> {
Ok(Self {
llm_sub: Hub::new("robot/plan")?,
cmd_pub: Hub::new("motor/cmd")?,
})
}
}
impl Node for CommandParserNode {
fn name(&self) -> &'static str { "CommandParserNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(json_str) = self.llm_sub.recv(ctx) {
// Parse JSON from LLM
if let Ok(cmd) = serde_json::from_str::<serde_json::Value>(&json_str) {
let linear = cmd["linear"].as_f64().unwrap_or(0.0) as f32;
let angular = cmd["angular"].as_f64().unwrap_or(0.0) as f32;
self.cmd_pub.send((linear, angular), ctx).ok();
}
}
}
}
LLM Performance & Best Practices
Latency Comparison:
Local LLMs (llama.cpp):
- Phi-2 (2.7B): ~100-500ms per response (CPU)
- Mistral 7B: ~500-2000ms (CPU), ~50-200ms (GPU)
- Llama 2 13B: ~1-4s (CPU), ~100-400ms (GPU)
Cloud APIs:
- GPT-4: ~1-5s per request
- Claude: ~1-3s per request
- GPT-4 Vision: ~2-6s per request
Optimization Tips:
- Cache common responses:
use std::collections::HashMap;
struct CachedLLMNode {
cache: HashMap<String, String>,
// ... llm fields
}
impl CachedLLMNode {
fn get_response(&mut self, input: &str) -> String {
if let Some(cached) = self.cache.get(input) {
return cached.clone();
}
let response = self.generate(input);
self.cache.insert(input.to_string(), response.clone());
response
}
}
- Throttle requests:
if frame_count % 30 == 0 { // Only every 30 frames
run_llm_inference();
}
- Use smaller models locally:
- Phi-2 (2.7B) is excellent for CPU inference
- Mistral 7B for better quality with GPU
- GPT-4 only for complex reasoning
- Async processing:
use tokio::spawn;
spawn(async move {
let response = llm.generate(prompt).await;
response_hub.send(response, None).ok();
});
Model Recommendations
| Task | Local (CPU) | Local (GPU) | Cloud |
|---|---|---|---|
| Commands | Phi-2 | Mistral 7B | GPT-3.5 |
| Planning | Mistral 7B | Llama 2 13B | GPT-4 |
| Vision+Text | LLaVA | LLaVA 13B | GPT-4V |
| Code Gen | CodeLlama 7B | CodeLlama 13B | GPT-4 |
Python ML Integration
Run Python ML models alongside your HORUS application using subprocess or HTTP.
Pattern 1: Python Script via Subprocess
Python inference script (object_detector.py):
#!/usr/bin/env python3
import sys
import json
# Your ML model here (YOLOv8, TensorFlow, PyTorch, etc.)
def detect_objects(image_path):
# Mock detection for example
return [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 300]},
{"class": "car", "confidence": 0.87, "bbox": [300, 150, 500, 400]}
]
if __name__ == "__main__":
image_path = sys.argv[1]
detections = detect_objects(image_path)
print(json.dumps(detections))
HORUS node calling Python:
use horus::prelude::*;
use std::process::Command;
use std::fs;
pub struct PythonMLNode {
camera_sub: Hub<ImageFrame>,
detections_pub: Hub<String>,
frame_count: u64,
}
impl PythonMLNode {
pub fn new() -> HorusResult<Self> {
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
detections_pub: Hub::new("ml/detections")?,
frame_count: 0,
})
}
fn run_inference(&self, image_data: &[u8]) -> Option<String> {
// Save image temporarily
let temp_path = format!("/tmp/frame_{}.jpg", self.frame_count);
fs::write(&temp_path, image_data).ok()?;
// Run Python script
let output = Command::new("python3")
.arg("object_detector.py")
.arg(&temp_path)
.output()
.ok()?;
// Parse JSON output
String::from_utf8(output.stdout).ok()
}
}
impl Node for PythonMLNode {
fn name(&self) -> &'static str { "PythonMLNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
// Run inference every 10th frame (throttle to reduce load)
if self.frame_count % 10 == 0 {
if let Some(detections) = self.run_inference(&frame.data) {
self.detections_pub.send(detections, ctx).ok();
}
}
self.frame_count += 1;
}
}
}
Pattern 2: HTTP Inference Server
Python FastAPI server (inference_server.py):
from fastapi import FastAPI, File, UploadFile
import uvicorn
app = FastAPI()
@app.post("/detect")
async def detect_objects(file: UploadFile = File(...)):
# Your ML inference here
detections = [{"class": "person", "confidence": 0.95}]
return {"detections": detections}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
HORUS HTTP client node:
use horus::prelude::*;
use reqwest::blocking::Client;
pub struct HTTPInferenceNode {
camera_sub: Hub<ImageFrame>,
detections_pub: Hub<String>,
client: Client,
}
impl HTTPInferenceNode {
pub fn new() -> HorusResult<Self> {
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
detections_pub: Hub::new("ml/detections")?,
client: Client::new(),
})
}
fn run_inference(&self, image_data: &[u8]) -> Option<String> {
let response = self.client
.post("http://localhost:8000/detect")
.body(image_data.to_vec())
.send()
.ok()?;
response.text().ok()
}
}
impl Node for HTTPInferenceNode {
fn name(&self) -> &'static str { "HTTPInferenceNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
if let Some(detections) = self.run_inference(&frame.data) {
self.detections_pub.send(detections, ctx).ok();
}
}
}
}
Cloud AI APIs
Integrate cloud AI services like OpenAI Vision, Anthropic Claude, or Google Cloud Vision.
OpenAI Vision Example
Add dependencies:
[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
base64 = "0.21"
Vision API node:
use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use base64::{Engine as _, engine::general_purpose};
#[derive(Serialize)]
struct VisionRequest {
model: String,
messages: Vec<Message>,
max_tokens: u32,
}
#[derive(Serialize)]
struct Message {
role: String,
content: Vec<Content>,
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum Content {
Text { text: String },
ImageUrl { image_url: ImageUrl },
}
#[derive(Serialize)]
struct ImageUrl {
url: String,
}
#[derive(Deserialize)]
struct VisionResponse {
choices: Vec<Choice>,
}
#[derive(Deserialize)]
struct Choice {
message: ResponseMessage,
}
#[derive(Deserialize)]
struct ResponseMessage {
content: String,
}
pub struct OpenAIVisionNode {
camera_sub: Hub<ImageFrame>,
description_pub: Hub<String>,
client: Client,
api_key: String,
}
impl OpenAIVisionNode {
pub fn new(api_key: String) -> HorusResult<Self> {
Ok(Self {
camera_sub: Hub::new("camera/raw")?,
description_pub: Hub::new("vision/description")?,
client: Client::new(),
api_key,
})
}
fn analyze_image(&self, image_data: &[u8]) -> Option<String> {
// Convert to base64
let base64_image = general_purpose::STANDARD.encode(image_data);
let data_url = format!("data:image/jpeg;base64,{}", base64_image);
let request = VisionRequest {
model: "gpt-4-vision-preview".to_string(),
messages: vec![Message {
role: "user".to_string(),
content: vec![
Content::Text { text: "What objects do you see?".to_string() },
Content::ImageUrl { image_url: ImageUrl { url: data_url } },
],
}],
max_tokens: 300,
};
let response = self.client
.post("https://api.openai.com/v1/chat/completions")
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&request)
.send()
.ok()?;
let vision_response: VisionResponse = response.json().ok()?;
Some(vision_response.choices[0].message.content.clone())
}
}
impl Node for OpenAIVisionNode {
fn name(&self) -> &'static str { "OpenAIVisionNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(frame) = self.camera_sub.recv(ctx) {
// Only analyze every 30th frame to avoid API costs
if frame.stamp_nanos % 30 == 0 {
if let Some(description) = self.analyze_image(&frame.data) {
self.description_pub.send(description, ctx).ok();
}
}
}
}
}
Complete Example: Vision-Guided Robot
Combine camera, OpenCV processing, and control:
use horus::prelude::*;
use horus_library::{CameraNode, DifferentialDriveNode};
// Simple vision-based follower
pub struct ObjectFollowerNode {
vision_sub: Hub<String>,
cmd_pub: Hub<(f32, f32)>,
}
impl ObjectFollowerNode {
pub fn new() -> HorusResult<Self> {
Ok(Self {
vision_sub: Hub::new("vision/detections")?,
cmd_pub: Hub::new("motor/cmd")?,
})
}
}
impl Node for ObjectFollowerNode {
fn name(&self) -> &'static str { "ObjectFollowerNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(detection) = self.vision_sub.recv(ctx) {
// Parse edge count from detection string
if let Some(count_str) = detection.strip_prefix("edges_detected:") {
if let Ok(edge_count) = count_str.parse::<i32>() {
// Simple behavior: move forward if edges detected, stop if not
let (linear, angular) = if edge_count > 1000 {
(0.5, 0.0) // Move forward
} else {
(0.0, 0.3) // Turn to search
};
self.cmd_pub.send((linear, angular), ctx).ok();
}
}
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Camera capture (if opencv-backend feature enabled)
let mut camera = CameraNode::new()?;
camera.set_topic("camera/raw");
// Vision processing
let vision = VisionProcessorNode::new()?;
// Decision making
let follower = ObjectFollowerNode::new()?;
// Motor control
let drive = DifferentialDriveNode::new();
// Register in priority order
scheduler.register(Box::new(camera), 0, Some(true)); // Highest priority
scheduler.register(Box::new(vision), 1, Some(true)); // Process images
scheduler.register(Box::new(follower), 2, Some(true)); // Make decisions
scheduler.register(Box::new(drive), 3, Some(true)); // Control motors
scheduler.tick_all()?;
Ok(())
}
Performance Considerations
Latency Budget
Typical robotics control loop at 100Hz (10ms cycle):
Camera: ~16ms (30 FPS)
Vision: ~5-50ms (OpenCV/ML inference)
Decision: ~1μs (HORUS Hub communication)
Control: ~1μs (HORUS Hub communication)
Motors: ~1ms (Hardware actuator delay)
Optimization Tips
- Throttle AI Processing: Process every Nth frame
if frame_count % 10 == 0 {
run_expensive_inference();
}
- Async Processing: Run ML in separate thread
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
// In tick():
tx.send(frame_data).ok();
// Separate thread:
while let Ok(data) = rx.recv() {
let result = expensive_ml(data);
result_hub.send(result, None).ok();
}
- Use GPU: OpenCV and ML libraries support CUDA/Metal
// OpenCV can use GPU acceleration automatically
opencv::core::set_use_optimized(true);
Best Practices
Separate Concerns: Keep AI inference in dedicated nodes
- Camera node → Vision node → Decision node → Control node
Handle Failures Gracefully: AI can fail, robots shouldn't crash
if let Some(result) = try_inference(&data) {
control_pub.send(result, ctx).ok();
} else {
// Fallback behavior
control_pub.send(default_safe_command(), ctx).ok();
}
Monitor Performance: Use HORUS dashboard
horus dashboard # Watch node timing and message flow
Next Steps
- Camera Node Reference - Built-in camera capture
- Message Types - Define custom AI result types
- Examples - See complete vision projects
- Performance - Optimize your pipeline