AI Integration

Integrate computer vision and AI models into your HORUS applications using OpenCV, external Python scripts, or cloud APIs. HORUS's sub-microsecond communication makes it perfect for combining real-time control with AI inference.

Overview

HORUS supports multiple AI integration patterns:

Local Vision Processing (Real-Time)

  • OpenCV computer vision (optional feature flag)
  • Edge detection, tracking, blob detection
  • Camera capture and processing
  • 5-50ms typical latency

External AI Services (Background Tasks)

  • Python ML models via subprocess or HTTP
  • Cloud APIs (OpenAI, Anthropic, etc.)
  • Custom inference servers
  • 50-5000ms typical latency

Architecture Pattern:

Camera Node ──Hub──> Vision Node ──Hub──> Control Node
   (Fast)            (AI/CV)             (Real-time)
   ~16ms             ~20-50ms            ~1ms

OpenCV Computer Vision

HORUS includes optional OpenCV integration for camera capture and real-time vision processing.

Enabling OpenCV

Add to Cargo.toml:

[dependencies]
horus_library = { version = "0.1.0", features = ["opencv-backend"] }

Using the Camera Node

The built-in CameraNode supports OpenCV:

use horus::prelude::*;
use horus_library::CameraNode;

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Camera node with OpenCV backend
    let mut camera = CameraNode::new()?;
    camera.set_device_id(0);  // Default camera
    camera.set_resolution(640, 480);
    camera.set_fps(30.0);
    camera.set_topic("camera/raw");

    scheduler.register(Box::new(camera), 0, Some(true));
    scheduler.tick_all()?;
    Ok(())
}

Custom OpenCV Processing

use horus::prelude::*;
use opencv::{
    prelude::*,
    core::{Mat, Vector},
    imgproc,
    videoio::{VideoCapture, CAP_ANY}
};

#[derive(Clone, Debug)]
pub struct ImageFrame {
    pub stamp_nanos: u64,
    pub width: u32,
    pub height: u32,
    pub data: Vec<u8>,
}

pub struct VisionProcessorNode {
    camera_sub: Hub<ImageFrame>,
    detections_pub: Hub<String>,
}

impl VisionProcessorNode {
    pub fn new() -> HorusResult<Self> {
        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            detections_pub: Hub::new("vision/detections")?,
        })
    }

    fn process_frame(&self, frame_data: &[u8], width: u32, height: u32) -> Option<String> {
        // Convert to OpenCV Mat
        let mut mat = Mat::from_slice(frame_data).ok()?;
        let mat = mat.reshape(3, height as i32).ok()?;

        // Convert to grayscale
        let mut gray = Mat::default();
        imgproc::cvt_color(&mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0).ok()?;

        // Simple edge detection
        let mut edges = Mat::default();
        imgproc::canny(&gray, &mut edges, 50.0, 150.0, 3, false).ok()?;

        // Count edge pixels (simple object detection metric)
        let edge_count = imgproc::count_non_zero(&edges).ok()?;

        Some(format!("edges_detected:{}", edge_count))
    }
}

impl Node for VisionProcessorNode {
    fn name(&self) -> &'static str { "VisionProcessorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            if let Some(result) = self.process_frame(&frame.data, frame.width, frame.height) {
                self.detections_pub.send(result, ctx).ok();
            }
        }
    }
}

AI Model Inference

HORUS can integrate with various AI model formats and inference engines commonly used in robotics.

ONNX Runtime provides cross-platform inference for models from PyTorch, TensorFlow, scikit-learn, and more.

Add dependencies:

[dependencies]
ort = "2.0"  # ONNX Runtime bindings
ndarray = "0.15"
image = "0.24"

YOLOv8 Object Detection Example:

use horus::prelude::*;
use ort::{GraphOptimizationLevel, Session};
use ndarray::{Array, IxDyn};
use image::{DynamicImage, imageops::FilterType};

#[derive(Clone, Debug)]
pub struct Detection {
    pub class_id: usize,
    pub confidence: f32,
    pub bbox: [f32; 4],  // x, y, w, h
}

pub struct YOLOv8Node {
    camera_sub: Hub<ImageFrame>,
    detections_pub: Hub<Vec<Detection>>,
    session: Session,
}

impl YOLOv8Node {
    pub fn new(model_path: &str) -> HorusResult<Self> {
        // Load ONNX model
        let session = Session::builder()
            .map_err(|e| HorusError::Config(format!("ONNX error: {}", e)))?
            .with_optimization_level(GraphOptimizationLevel::Level3)
            .map_err(|e| HorusError::Config(format!("ONNX error: {}", e)))?
            .commit_from_file(model_path)
            .map_err(|e| HorusError::Config(format!("Failed to load model: {}", e)))?;

        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            detections_pub: Hub::new("vision/detections")?,
            session,
        })
    }

    fn preprocess_image(&self, img_data: &[u8], width: u32, height: u32) -> Option<Array<f32, IxDyn>> {
        // Resize to model input size (640x640 for YOLOv8)
        let img = image::RgbImage::from_raw(width, height, img_data.to_vec())?;
        let img = DynamicImage::ImageRgb8(img).resize_exact(640, 640, FilterType::Triangle);

        // Convert to CHW format and normalize
        let mut input = Array::zeros(IxDyn(&[1, 3, 640, 640]));
        for (x, y, pixel) in img.to_rgb8().enumerate_pixels() {
            input[[0, 0, y as usize, x as usize]] = pixel[0] as f32 / 255.0;
            input[[0, 1, y as usize, x as usize]] = pixel[1] as f32 / 255.0;
            input[[0, 2, y as usize, x as usize]] = pixel[2] as f32 / 255.0;
        }
        Some(input)
    }

    fn run_inference(&self, input: Array<f32, IxDyn>) -> Option<Vec<Detection>> {
        // Run model
        let outputs = self.session.run(ort::inputs![input].ok()?).ok()?;

        // Parse YOLOv8 output format
        let output = outputs[0].try_extract_tensor::<f32>().ok()?;
        let output = output.view();

        let mut detections = Vec::new();

        // YOLOv8 output: [batch, 84, 8400] where 84 = 4 bbox + 80 classes
        for i in 0..8400 {
            let confidence = output[[0, 4, i]];  // First class score as confidence

            if confidence > 0.5 {  // Confidence threshold
                let x = output[[0, 0, i]];
                let y = output[[0, 1, i]];
                let w = output[[0, 2, i]];
                let h = output[[0, 3, i]];

                // Find highest class
                let mut max_class = 0;
                let mut max_score = 0.0;
                for c in 0..80 {
                    let score = output[[0, 4 + c, i]];
                    if score > max_score {
                        max_score = score;
                        max_class = c;
                    }
                }

                detections.push(Detection {
                    class_id: max_class,
                    confidence: max_score,
                    bbox: [x, y, w, h],
                });
            }
        }

        Some(detections)
    }
}

impl Node for YOLOv8Node {
    fn name(&self) -> &'static str { "YOLOv8Node" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            if let Some(input) = self.preprocess_image(&frame.data, frame.width, frame.height) {
                if let Some(detections) = self.run_inference(input) {
                    self.detections_pub.send(detections, ctx).ok();
                }
            }
        }
    }
}

Download YOLOv8 ONNX:

# Export from Ultralytics
pip install ultralytics
python -c "from ultralytics import YOLO; model = YOLO('yolov8n.pt'); model.export(format='onnx')"

TensorFlow Lite (Edge Devices)

For embedded systems and edge devices:

Add dependencies:

[dependencies]
tflite = "0.9"

MobileNet Classification:

use horus::prelude::*;
use tflite::{FlatBufferModel, InterpreterBuilder, ops::builtin::BuiltinOpResolver};

pub struct TFLiteClassifierNode {
    camera_sub: Hub<ImageFrame>,
    class_pub: Hub<String>,
    interpreter: tflite::Interpreter,
    labels: Vec<String>,
}

impl TFLiteClassifierNode {
    pub fn new(model_path: &str, labels_path: &str) -> HorusResult<Self> {
        // Load TFLite model
        let model = FlatBufferModel::build_from_file(model_path)
            .map_err(|e| HorusError::Config(format!("TFLite error: {}", e)))?;

        let resolver = BuiltinOpResolver::default();
        let builder = InterpreterBuilder::new(model, resolver)
            .map_err(|e| HorusError::Config(format!("Builder error: {}", e)))?;

        let mut interpreter = builder.build()
            .map_err(|e| HorusError::Config(format!("Interpreter error: {}", e)))?;

        interpreter.allocate_tensors()
            .map_err(|e| HorusError::Config(format!("Allocation error: {}", e)))?;

        // Load class labels
        let labels = std::fs::read_to_string(labels_path)
            .map_err(|e| HorusError::Config(format!("Labels error: {}", e)))?
            .lines()
            .map(String::from)
            .collect();

        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            class_pub: Hub::new("vision/classification")?,
            interpreter,
            labels,
        })
    }

    fn classify(&mut self, img_data: &[u8]) -> Option<String> {
        // Preprocess and set input tensor
        let input_tensor = self.interpreter.input(0)?;
        // ... preprocessing code ...

        // Run inference
        self.interpreter.invoke().ok()?;

        // Get output
        let output_tensor = self.interpreter.output(0)?;
        let scores = output_tensor.data::<f32>();

        // Find top class
        let (max_idx, max_score) = scores
            .iter()
            .enumerate()
            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())?;

        Some(format!("{}: {:.2}%", self.labels[max_idx], max_score * 100.0))
    }
}

impl Node for TFLiteClassifierNode {
    fn name(&self) -> &'static str { "TFLiteClassifierNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            if let Some(result) = self.classify(&frame.data) {
                self.class_pub.send(result, ctx).ok();
            }
        }
    }
}

Tract (Pure Rust Inference)

Tract is a pure-Rust ONNX/TensorFlow inference engine - no external dependencies!

Add dependencies:

[dependencies]
tract-onnx = "0.21"

Example:

use horus::prelude::*;
use tract_onnx::prelude::*;

pub struct TractInferenceNode {
    camera_sub: Hub<ImageFrame>,
    output_pub: Hub<String>,
    model: SimplePlan<TypedFact, Box<dyn TypedOp>, Graph<TypedFact, Box<dyn TypedOp>>>,
}

impl TractInferenceNode {
    pub fn new(model_path: &str) -> HorusResult<Self> {
        // Load ONNX model with Tract
        let model = tract_onnx::onnx()
            .model_for_path(model_path)
            .map_err(|e| HorusError::Config(format!("Tract error: {}", e)))?
            .into_optimized()
            .map_err(|e| HorusError::Config(format!("Optimization error: {}", e)))?
            .into_runnable()
            .map_err(|e| HorusError::Config(format!("Runnable error: {}", e)))?;

        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            output_pub: Hub::new("vision/output")?,
            model,
        })
    }

    fn run_inference(&self, input_data: &[f32]) -> Option<String> {
        let input = tract_ndarray::arr1(input_data).into_dyn();
        let result = self.model.run(tvec!(input.into())).ok()?;

        // Process output
        let output = result[0].to_array_view::<f32>().ok()?;
        Some(format!("Inference result: {:?}", output))
    }
}

impl Node for TractInferenceNode {
    fn name(&self) -> &'static str { "TractInferenceNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            // Convert frame to input format
            let input_data: Vec<f32> = frame.data.iter().map(|&x| x as f32 / 255.0).collect();

            if let Some(result) = self.run_inference(&input_data) {
                self.output_pub.send(result, ctx).ok();
            }
        }
    }
}

Model Format Comparison

FormatUse CaseRust SupportEdge Devices
ONNXGeneral (PyTorch, TF)ort crate
TFLiteMobile/Edgetflite crate✓✓
TractPure RustNative
CoreMLApple SiliconVia Python
TensorRTNVIDIA GPUsVia C++✓✓

Object Detection:

  • YOLOv8 (ONNX) - Real-time detection
  • MobileNet SSD (TFLite) - Edge devices
  • EfficientDet (ONNX) - High accuracy

Semantic Segmentation:

  • DeepLabV3 (ONNX) - Scene understanding
  • U-Net (ONNX) - Medical/precision tasks

Pose Estimation:

  • MediaPipe (TFLite) - Human pose
  • OpenPose (ONNX) - Multi-person

Depth Estimation:

  • MiDaS (ONNX) - Monocular depth
  • DPT (ONNX) - Dense prediction

Download pre-trained models:

# Hugging Face
huggingface-cli download ultralytics/yolov8n --include "*.onnx"

# TensorFlow Hub
wget https://tfhub.dev/tensorflow/lite-model/ssd_mobilenet_v1/1/metadata/1?lite-format=tflite

# ONNX Model Zoo
wget https://github.com/onnx/models/raw/main/vision/object_detection_segmentation/yolov4/model/yolov4.onnx

Large Language Models (LLMs)

Integrate LLMs for natural language commands, task planning, and vision-language reasoning in robotics.

Local LLM Inference

Run LLMs locally using Rust bindings for llama.cpp or Candle.

llama-cpp-rs (Recommended)

Add dependencies:

[dependencies]
llama_cpp_rs = "0.4"

LLM Command Interpreter Node:

use horus::prelude::*;
use llama_cpp_rs::{LlamaModel, LlamaParams, LlamaContext};

pub struct LLMCommandNode {
    text_sub: Hub<String>,  // Natural language commands
    cmd_pub: Hub<String>,   // Structured robot commands
    context: LlamaContext,
}

impl LLMCommandNode {
    pub fn new(model_path: &str) -> HorusResult<Self> {
        // Load LLaMA model (Llama 2, Mistral, Phi, etc.)
        let params = LlamaParams::default();
        let model = LlamaModel::load_from_file(model_path, params)
            .map_err(|e| HorusError::Config(format!("Failed to load LLM: {}", e)))?;

        let context = model.create_context(LlamaParams::default())
            .map_err(|e| HorusError::Config(format!("Context error: {}", e)))?;

        Ok(Self {
            text_sub: Hub::new("speech/text")?,
            cmd_pub: Hub::new("robot/commands")?,
            context,
        })
    }

    fn generate_command(&mut self, user_input: &str) -> Option<String> {
        let prompt = format!(
            "You are a robot command interpreter. Convert natural language to JSON commands.\n\
            User: {}\n\
            Command (JSON): ",
            user_input
        );

        // Generate response
        let output = self.context.predict(
            prompt,
            64,  // max tokens
            true,  // echo prompt
            |token| {
                // Token callback
                print!("{}", token);
                true
            }
        ).ok()?;

        // Extract JSON command
        Some(output.trim().to_string())
    }
}

impl Node for LLMCommandNode {
    fn name(&self) -> &'static str { "LLMCommandNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(text) = self.text_sub.recv(ctx) {
            if let Some(command) = self.generate_command(&text) {
                self.cmd_pub.send(command, ctx).ok();
            }
        }
    }
}

Example usage:

# Download a model (Llama 2, Mistral, Phi-2, etc.)
wget https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf

# Run with HORUS
horus run robot_with_llm.rs

Supported models:

  • Llama 2 (7B, 13B, 70B)
  • Mistral 7B
  • Phi-2 (2.7B - fast on CPU!)
  • Mixtral 8x7B
  • CodeLlama
  • Any GGUF format model

Candle (Pure Rust)

Candle is HuggingFace's pure Rust ML framework:

Add dependencies:

[dependencies]
candle-core = "0.3"
candle-nn = "0.3"
candle-transformers = "0.3"
tokenizers = "0.15"

Candle LLM Node:

use horus::prelude::*;
use candle_core::{Device, Tensor};
use candle_transformers::models::llama::{Llama, Config};
use tokenizers::Tokenizer;

pub struct CandleLLMNode {
    text_sub: Hub<String>,
    response_pub: Hub<String>,
    model: Llama,
    tokenizer: Tokenizer,
    device: Device,
}

impl CandleLLMNode {
    pub fn new(model_path: &str, tokenizer_path: &str) -> HorusResult<Self> {
        let device = Device::cuda_if_available(0)
            .map_err(|e| HorusError::Config(format!("Device error: {}", e)))?;

        let config = Config::default();
        let model = Llama::load(model_path, &config, &device)
            .map_err(|e| HorusError::Config(format!("Model load error: {}", e)))?;

        let tokenizer = Tokenizer::from_file(tokenizer_path)
            .map_err(|e| HorusError::Config(format!("Tokenizer error: {}", e)))?;

        Ok(Self {
            text_sub: Hub::new("user/input")?,
            response_pub: Hub::new("llm/response")?,
            model,
            tokenizer,
            device,
        })
    }

    fn generate(&self, prompt: &str, max_tokens: usize) -> Option<String> {
        // Tokenize input
        let encoding = self.tokenizer.encode(prompt, true).ok()?;
        let tokens = encoding.get_ids();

        let input_tensor = Tensor::new(tokens, &self.device).ok()?
            .unsqueeze(0).ok()?;

        // Generate response
        let output = self.model.forward(&input_tensor, 0).ok()?;

        // Decode tokens
        let output_tokens: Vec<u32> = output
            .to_vec1::<u32>()
            .ok()?;

        self.tokenizer.decode(&output_tokens, true).ok()
    }
}

impl Node for CandleLLMNode {
    fn name(&self) -> &'static str { "CandleLLMNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(input) = self.text_sub.recv(ctx) {
            if let Some(response) = self.generate(&input, 100) {
                self.response_pub.send(response, ctx).ok();
            }
        }
    }
}

Cloud LLM APIs

Use cloud LLMs for complex reasoning and vision-language tasks.

OpenAI GPT-4

Add dependencies:

[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

GPT-4 Task Planning Node:

use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};

#[derive(Serialize)]
struct ChatRequest {
    model: String,
    messages: Vec<ChatMessage>,
    temperature: f32,
    max_tokens: u32,
}

#[derive(Serialize, Deserialize, Clone)]
struct ChatMessage {
    role: String,
    content: String,
}

#[derive(Deserialize)]
struct ChatResponse {
    choices: Vec<ChatChoice>,
}

#[derive(Deserialize)]
struct ChatChoice {
    message: ChatMessage,
}

pub struct GPT4PlannerNode {
    goal_sub: Hub<String>,
    plan_pub: Hub<String>,
    client: Client,
    api_key: String,
    conversation: Vec<ChatMessage>,
}

impl GPT4PlannerNode {
    pub fn new(api_key: String) -> HorusResult<Self> {
        let system_prompt = ChatMessage {
            role: "system".to_string(),
            content: "You are a robot task planner. Generate step-by-step plans in JSON format.".to_string(),
        };

        Ok(Self {
            goal_sub: Hub::new("user/goal")?,
            plan_pub: Hub::new("robot/plan")?,
            client: Client::new(),
            api_key,
            conversation: vec![system_prompt],
        })
    }

    fn generate_plan(&mut self, goal: &str) -> Option<String> {
        // Add user message
        self.conversation.push(ChatMessage {
            role: "user".to_string(),
            content: goal.to_string(),
        });

        let request = ChatRequest {
            model: "gpt-4-turbo-preview".to_string(),
            messages: self.conversation.clone(),
            temperature: 0.7,
            max_tokens: 500,
        };

        let response = self.client
            .post("https://api.openai.com/v1/chat/completions")
            .header("Authorization", format!("Bearer {}", self.api_key))
            .json(&request)
            .send()
            .ok()?;

        let chat_response: ChatResponse = response.json().ok()?;
        let assistant_message = chat_response.choices[0].message.clone();

        // Add to conversation history
        self.conversation.push(assistant_message.clone());

        Some(assistant_message.content)
    }
}

impl Node for GPT4PlannerNode {
    fn name(&self) -> &'static str { "GPT4PlannerNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(goal) = self.goal_sub.recv(ctx) {
            if let Some(plan) = self.generate_plan(&goal) {
                self.plan_pub.send(plan, ctx).ok();
            }
        }
    }
}

Anthropic Claude

Claude for Robot Reasoning:

use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};

#[derive(Serialize)]
struct ClaudeRequest {
    model: String,
    max_tokens: u32,
    messages: Vec<ClaudeMessage>,
}

#[derive(Serialize, Deserialize)]
struct ClaudeMessage {
    role: String,
    content: String,
}

#[derive(Deserialize)]
struct ClaudeResponse {
    content: Vec<ContentBlock>,
}

#[derive(Deserialize)]
struct ContentBlock {
    text: String,
}

pub struct ClaudeReasoningNode {
    sensor_sub: Hub<String>,  // Sensor readings as JSON
    decision_pub: Hub<String>,
    client: Client,
    api_key: String,
}

impl ClaudeReasoningNode {
    pub fn new(api_key: String) -> HorusResult<Self> {
        Ok(Self {
            sensor_sub: Hub::new("sensors/summary")?,
            decision_pub: Hub::new("robot/decision")?,
            client: Client::new(),
            api_key,
        })
    }

    fn reason(&self, sensor_data: &str) -> Option<String> {
        let request = ClaudeRequest {
            model: "claude-3-opus-20240229".to_string(),
            max_tokens: 1024,
            messages: vec![ClaudeMessage {
                role: "user".to_string(),
                content: format!(
                    "Given these robot sensor readings: {}\n\
                    What should the robot do next? Respond with a JSON action.",
                    sensor_data
                ),
            }],
        };

        let response = self.client
            .post("https://api.anthropic.com/v1/messages")
            .header("x-api-key", &self.api_key)
            .header("anthropic-version", "2023-06-01")
            .json(&request)
            .send()
            .ok()?;

        let claude_response: ClaudeResponse = response.json().ok()?;
        Some(claude_response.content[0].text.clone())
    }
}

impl Node for ClaudeReasoningNode {
    fn name(&self) -> &'static str { "ClaudeReasoningNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(sensors) = self.sensor_sub.recv(ctx) {
            if let Some(decision) = self.reason(&sensors) {
                self.decision_pub.send(decision, ctx).ok();
            }
        }
    }
}

Vision-Language Models (VLMs)

Combine vision and language for advanced robot perception.

GPT-4 Vision for Scene Understanding

use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use base64::{Engine as _, engine::general_purpose};

#[derive(Serialize)]
struct VisionMessage {
    role: String,
    content: Vec<VisionContent>,
}

#[derive(Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum VisionContent {
    Text { text: String },
    ImageUrl { image_url: ImageUrl },
}

#[derive(Serialize)]
struct ImageUrl {
    url: String,
    detail: String,
}

pub struct SceneUnderstandingNode {
    camera_sub: Hub<ImageFrame>,
    description_pub: Hub<String>,
    client: Client,
    api_key: String,
}

impl SceneUnderstandingNode {
    pub fn new(api_key: String) -> HorusResult<Self> {
        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            description_pub: Hub::new("scene/description")?,
            client: Client::new(),
            api_key,
        })
    }

    fn understand_scene(&self, image_data: &[u8]) -> Option<String> {
        // Convert to base64
        let base64_image = general_purpose::STANDARD.encode(image_data);
        let data_url = format!("data:image/jpeg;base64,{}", base64_image);

        let message = VisionMessage {
            role: "user".to_string(),
            content: vec![
                VisionContent::Text {
                    text: "Describe this scene for a robot. What objects are present? \
                           What can the robot interact with? Are there any obstacles?".to_string(),
                },
                VisionContent::ImageUrl {
                    image_url: ImageUrl {
                        url: data_url,
                        detail: "low".to_string(),  // "low" or "high"
                    },
                },
            ],
        };

        let request = serde_json::json!({
            "model": "gpt-4-vision-preview",
            "messages": [message],
            "max_tokens": 300
        });

        let response = self.client
            .post("https://api.openai.com/v1/chat/completions")
            .header("Authorization", format!("Bearer {}", self.api_key))
            .json(&request)
            .send()
            .ok()?;

        let result: serde_json::Value = response.json().ok()?;
        Some(result["choices"][0]["message"]["content"].as_str()?.to_string())
    }
}

impl Node for SceneUnderstandingNode {
    fn name(&self) -> &'static str { "SceneUnderstandingNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            // Process every 30th frame to reduce API calls
            if frame.stamp_nanos % 30 == 0 {
                if let Some(description) = self.understand_scene(&frame.data) {
                    self.description_pub.send(description, ctx).ok();
                }
            }
        }
    }
}

Complete Example: Voice-Controlled Robot

use horus::prelude::*;
use horus_library::DifferentialDriveNode;

// Combine speech recognition + LLM + robot control
fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // 1. Speech-to-text (via external service or Whisper)
    let speech_node = SpeechRecognitionNode::new()?;

    // 2. LLM converts natural language to commands
    let api_key = std::env::var("OPENAI_API_KEY")
        .expect("Set OPENAI_API_KEY environment variable");
    let llm_node = GPT4PlannerNode::new(api_key)?;

    // 3. Command parser extracts motor commands
    let parser_node = CommandParserNode::new()?;

    // 4. Motor control
    let drive_node = DifferentialDriveNode::new();

    // Register nodes
    scheduler.register(Box::new(speech_node), 0, Some(true));
    scheduler.register(Box::new(llm_node), 1, Some(true));
    scheduler.register(Box::new(parser_node), 2, Some(true));
    scheduler.register(Box::new(drive_node), 3, Some(true));

    // User says: "Move forward slowly"
    // → GPT-4: {"action": "move", "linear": 0.3, "angular": 0.0}
    // → Robot moves forward at 0.3 m/s

    scheduler.tick_all()?;
    Ok(())
}

pub struct CommandParserNode {
    llm_sub: Hub<String>,
    cmd_pub: Hub<(f32, f32)>,
}

impl CommandParserNode {
    pub fn new() -> HorusResult<Self> {
        Ok(Self {
            llm_sub: Hub::new("robot/plan")?,
            cmd_pub: Hub::new("motor/cmd")?,
        })
    }
}

impl Node for CommandParserNode {
    fn name(&self) -> &'static str { "CommandParserNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(json_str) = self.llm_sub.recv(ctx) {
            // Parse JSON from LLM
            if let Ok(cmd) = serde_json::from_str::<serde_json::Value>(&json_str) {
                let linear = cmd["linear"].as_f64().unwrap_or(0.0) as f32;
                let angular = cmd["angular"].as_f64().unwrap_or(0.0) as f32;

                self.cmd_pub.send((linear, angular), ctx).ok();
            }
        }
    }
}

LLM Performance & Best Practices

Latency Comparison:

Local LLMs (llama.cpp):
  - Phi-2 (2.7B):     ~100-500ms per response (CPU)
  - Mistral 7B:       ~500-2000ms (CPU), ~50-200ms (GPU)
  - Llama 2 13B:      ~1-4s (CPU), ~100-400ms (GPU)

Cloud APIs:
  - GPT-4:            ~1-5s per request
  - Claude:           ~1-3s per request
  - GPT-4 Vision:     ~2-6s per request

Optimization Tips:

  1. Cache common responses:
use std::collections::HashMap;

struct CachedLLMNode {
    cache: HashMap<String, String>,
    // ... llm fields
}

impl CachedLLMNode {
    fn get_response(&mut self, input: &str) -> String {
        if let Some(cached) = self.cache.get(input) {
            return cached.clone();
        }

        let response = self.generate(input);
        self.cache.insert(input.to_string(), response.clone());
        response
    }
}
  1. Throttle requests:
if frame_count % 30 == 0 {  // Only every 30 frames
    run_llm_inference();
}
  1. Use smaller models locally:
  • Phi-2 (2.7B) is excellent for CPU inference
  • Mistral 7B for better quality with GPU
  • GPT-4 only for complex reasoning
  1. Async processing:
use tokio::spawn;

spawn(async move {
    let response = llm.generate(prompt).await;
    response_hub.send(response, None).ok();
});

Model Recommendations

TaskLocal (CPU)Local (GPU)Cloud
CommandsPhi-2Mistral 7BGPT-3.5
PlanningMistral 7BLlama 2 13BGPT-4
Vision+TextLLaVALLaVA 13BGPT-4V
Code GenCodeLlama 7BCodeLlama 13BGPT-4

Python ML Integration

Run Python ML models alongside your HORUS application using subprocess or HTTP.

Pattern 1: Python Script via Subprocess

Python inference script (object_detector.py):

#!/usr/bin/env python3
import sys
import json

# Your ML model here (YOLOv8, TensorFlow, PyTorch, etc.)
def detect_objects(image_path):
    # Mock detection for example
    return [
        {"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 300]},
        {"class": "car", "confidence": 0.87, "bbox": [300, 150, 500, 400]}
    ]

if __name__ == "__main__":
    image_path = sys.argv[1]
    detections = detect_objects(image_path)
    print(json.dumps(detections))

HORUS node calling Python:

use horus::prelude::*;
use std::process::Command;
use std::fs;

pub struct PythonMLNode {
    camera_sub: Hub<ImageFrame>,
    detections_pub: Hub<String>,
    frame_count: u64,
}

impl PythonMLNode {
    pub fn new() -> HorusResult<Self> {
        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            detections_pub: Hub::new("ml/detections")?,
            frame_count: 0,
        })
    }

    fn run_inference(&self, image_data: &[u8]) -> Option<String> {
        // Save image temporarily
        let temp_path = format!("/tmp/frame_{}.jpg", self.frame_count);
        fs::write(&temp_path, image_data).ok()?;

        // Run Python script
        let output = Command::new("python3")
            .arg("object_detector.py")
            .arg(&temp_path)
            .output()
            .ok()?;

        // Parse JSON output
        String::from_utf8(output.stdout).ok()
    }
}

impl Node for PythonMLNode {
    fn name(&self) -> &'static str { "PythonMLNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            // Run inference every 10th frame (throttle to reduce load)
            if self.frame_count % 10 == 0 {
                if let Some(detections) = self.run_inference(&frame.data) {
                    self.detections_pub.send(detections, ctx).ok();
                }
            }
            self.frame_count += 1;
        }
    }
}

Pattern 2: HTTP Inference Server

Python FastAPI server (inference_server.py):

from fastapi import FastAPI, File, UploadFile
import uvicorn

app = FastAPI()

@app.post("/detect")
async def detect_objects(file: UploadFile = File(...)):
    # Your ML inference here
    detections = [{"class": "person", "confidence": 0.95}]
    return {"detections": detections}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

HORUS HTTP client node:

use horus::prelude::*;
use reqwest::blocking::Client;

pub struct HTTPInferenceNode {
    camera_sub: Hub<ImageFrame>,
    detections_pub: Hub<String>,
    client: Client,
}

impl HTTPInferenceNode {
    pub fn new() -> HorusResult<Self> {
        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            detections_pub: Hub::new("ml/detections")?,
            client: Client::new(),
        })
    }

    fn run_inference(&self, image_data: &[u8]) -> Option<String> {
        let response = self.client
            .post("http://localhost:8000/detect")
            .body(image_data.to_vec())
            .send()
            .ok()?;

        response.text().ok()
    }
}

impl Node for HTTPInferenceNode {
    fn name(&self) -> &'static str { "HTTPInferenceNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            if let Some(detections) = self.run_inference(&frame.data) {
                self.detections_pub.send(detections, ctx).ok();
            }
        }
    }
}

Cloud AI APIs

Integrate cloud AI services like OpenAI Vision, Anthropic Claude, or Google Cloud Vision.

OpenAI Vision Example

Add dependencies:

[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
base64 = "0.21"

Vision API node:

use horus::prelude::*;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use base64::{Engine as _, engine::general_purpose};

#[derive(Serialize)]
struct VisionRequest {
    model: String,
    messages: Vec<Message>,
    max_tokens: u32,
}

#[derive(Serialize)]
struct Message {
    role: String,
    content: Vec<Content>,
}

#[derive(Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum Content {
    Text { text: String },
    ImageUrl { image_url: ImageUrl },
}

#[derive(Serialize)]
struct ImageUrl {
    url: String,
}

#[derive(Deserialize)]
struct VisionResponse {
    choices: Vec<Choice>,
}

#[derive(Deserialize)]
struct Choice {
    message: ResponseMessage,
}

#[derive(Deserialize)]
struct ResponseMessage {
    content: String,
}

pub struct OpenAIVisionNode {
    camera_sub: Hub<ImageFrame>,
    description_pub: Hub<String>,
    client: Client,
    api_key: String,
}

impl OpenAIVisionNode {
    pub fn new(api_key: String) -> HorusResult<Self> {
        Ok(Self {
            camera_sub: Hub::new("camera/raw")?,
            description_pub: Hub::new("vision/description")?,
            client: Client::new(),
            api_key,
        })
    }

    fn analyze_image(&self, image_data: &[u8]) -> Option<String> {
        // Convert to base64
        let base64_image = general_purpose::STANDARD.encode(image_data);
        let data_url = format!("data:image/jpeg;base64,{}", base64_image);

        let request = VisionRequest {
            model: "gpt-4-vision-preview".to_string(),
            messages: vec![Message {
                role: "user".to_string(),
                content: vec![
                    Content::Text { text: "What objects do you see?".to_string() },
                    Content::ImageUrl { image_url: ImageUrl { url: data_url } },
                ],
            }],
            max_tokens: 300,
        };

        let response = self.client
            .post("https://api.openai.com/v1/chat/completions")
            .header("Authorization", format!("Bearer {}", self.api_key))
            .json(&request)
            .send()
            .ok()?;

        let vision_response: VisionResponse = response.json().ok()?;
        Some(vision_response.choices[0].message.content.clone())
    }
}

impl Node for OpenAIVisionNode {
    fn name(&self) -> &'static str { "OpenAIVisionNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(frame) = self.camera_sub.recv(ctx) {
            // Only analyze every 30th frame to avoid API costs
            if frame.stamp_nanos % 30 == 0 {
                if let Some(description) = self.analyze_image(&frame.data) {
                    self.description_pub.send(description, ctx).ok();
                }
            }
        }
    }
}

Complete Example: Vision-Guided Robot

Combine camera, OpenCV processing, and control:

use horus::prelude::*;
use horus_library::{CameraNode, DifferentialDriveNode};

// Simple vision-based follower
pub struct ObjectFollowerNode {
    vision_sub: Hub<String>,
    cmd_pub: Hub<(f32, f32)>,
}

impl ObjectFollowerNode {
    pub fn new() -> HorusResult<Self> {
        Ok(Self {
            vision_sub: Hub::new("vision/detections")?,
            cmd_pub: Hub::new("motor/cmd")?,
        })
    }
}

impl Node for ObjectFollowerNode {
    fn name(&self) -> &'static str { "ObjectFollowerNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(detection) = self.vision_sub.recv(ctx) {
            // Parse edge count from detection string
            if let Some(count_str) = detection.strip_prefix("edges_detected:") {
                if let Ok(edge_count) = count_str.parse::<i32>() {
                    // Simple behavior: move forward if edges detected, stop if not
                    let (linear, angular) = if edge_count > 1000 {
                        (0.5, 0.0)  // Move forward
                    } else {
                        (0.0, 0.3)  // Turn to search
                    };

                    self.cmd_pub.send((linear, angular), ctx).ok();
                }
            }
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Camera capture (if opencv-backend feature enabled)
    let mut camera = CameraNode::new()?;
    camera.set_topic("camera/raw");

    // Vision processing
    let vision = VisionProcessorNode::new()?;

    // Decision making
    let follower = ObjectFollowerNode::new()?;

    // Motor control
    let drive = DifferentialDriveNode::new();

    // Register in priority order
    scheduler.register(Box::new(camera), 0, Some(true));     // Highest priority
    scheduler.register(Box::new(vision), 1, Some(true));     // Process images
    scheduler.register(Box::new(follower), 2, Some(true));   // Make decisions
    scheduler.register(Box::new(drive), 3, Some(true));      // Control motors

    scheduler.tick_all()?;
    Ok(())
}

Performance Considerations

Latency Budget

Typical robotics control loop at 100Hz (10ms cycle):

Camera:      ~16ms  (30 FPS)
Vision:      ~5-50ms (OpenCV/ML inference)
Decision:    ~1μs   (HORUS Hub communication)
Control:     ~1μs   (HORUS Hub communication)
Motors:      ~1ms   (Hardware actuator delay)

Optimization Tips

  1. Throttle AI Processing: Process every Nth frame
if frame_count % 10 == 0 {
    run_expensive_inference();
}
  1. Async Processing: Run ML in separate thread
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();

// In tick():
tx.send(frame_data).ok();

// Separate thread:
while let Ok(data) = rx.recv() {
    let result = expensive_ml(data);
    result_hub.send(result, None).ok();
}
  1. Use GPU: OpenCV and ML libraries support CUDA/Metal
// OpenCV can use GPU acceleration automatically
opencv::core::set_use_optimized(true);

Best Practices

Separate Concerns: Keep AI inference in dedicated nodes

  • Camera node → Vision node → Decision node → Control node

Handle Failures Gracefully: AI can fail, robots shouldn't crash

if let Some(result) = try_inference(&data) {
    control_pub.send(result, ctx).ok();
} else {
    // Fallback behavior
    control_pub.send(default_safe_command(), ctx).ok();
}

Monitor Performance: Use HORUS dashboard

horus dashboard  # Watch node timing and message flow

Next Steps