Examples

Learn HORUS through practical examples. All code is production-ready and demonstrates best practices.

Basic Publisher-Subscriber

The foundational pattern in HORUS: one node publishes data, another subscribes.

Publisher Node

use horus::prelude::*;

// Define publisher node
struct SensorNode {
    data_pub: Hub<f32>,
    counter: f32,
}

impl SensorNode {
    fn new() -> HorusResult<Self> {
        Ok(Self {
            data_pub: Hub::new("sensor_data")?,
            counter: 0.0,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &'static str {
        "SensorNode"
    }

    fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
        ctx.log_info("Sensor initialized");
        Ok(())
    }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Simulate sensor reading
        let reading = self.counter.sin() * 10.0;

        // Publish data
        self.data_pub.send(reading, ctx).ok();

        self.counter += 0.1;
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();
    scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));
    scheduler.tick_all()?;
    Ok(())
}

Subscriber Node

use horus::prelude::*;

struct ProcessorNode {
    data_sub: Hub<f32>,
}

impl ProcessorNode {
    fn new() -> HorusResult<Self> {
        Ok(Self {
            data_sub: Hub::new("sensor_data")?,
        })
    }
}

impl Node for ProcessorNode {
    fn name(&self) -> &'static str {
        "ProcessorNode"
    }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            // Process received data
            let processed = data * 2.0;

            if let Some(ctx) = ctx {
                ctx.log_debug(&format!("Processed: {:.2}", processed));
            }
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();
    scheduler.register(Box::new(ProcessorNode::new()?), 0, Some(true));
    scheduler.tick_all()?;
    Ok(())
}

Combined Application

use horus::prelude::*;

// Publisher
struct SensorNode {
    data_pub: Hub<f32>,
    counter: f32,
}

impl Node for SensorNode {
    fn name(&self) -> &'static str { "SensorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let reading = self.counter.sin() * 10.0;
        self.data_pub.send(reading, ctx).ok();
        self.counter += 0.1;
    }
}

// Subscriber
struct ProcessorNode {
    data_sub: Hub<f32>,
}

impl Node for ProcessorNode {
    fn name(&self) -> &'static str { "ProcessorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            // Process data
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Register both nodes
    scheduler.register(Box::new(SensorNode {
        data_pub: Hub::new("sensor_data")?,
        counter: 0.0,
    }), 0, Some(true));

    scheduler.register(Box::new(ProcessorNode {
        data_sub: Hub::new("sensor_data")?,
    }), 1, Some(true));

    // Run both nodes together
    scheduler.tick_all()?;
    Ok(())
}

Robot Velocity Controller

Control a robot using CmdVel messages.

use horus::prelude::*;
use horus_library::messages::CmdVel;

// Keyboard input  velocity commands
struct TeleopNode {
    cmd_pub: Hub<CmdVel>,
}

impl Node for TeleopNode {
    fn name(&self) -> &'static str { "TeleopNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Simulate keyboard input (w/a/s/d)
        let cmd = CmdVel::new(1.0, 0.5);  // Forward + turn
        self.cmd_pub.send(cmd, ctx).ok();
    }
}

// Velocity commands  motor control
struct MotorDriverNode {
    cmd_sub: Hub<CmdVel>,
}

impl Node for MotorDriverNode {
    fn name(&self) -> &'static str { "MotorDriverNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(cmd) = self.cmd_sub.recv(ctx) {
            // Convert to motor speeds
            let left_speed = cmd.linear - cmd.angular;
            let right_speed = cmd.linear + cmd.angular;

            // Send to motors
            if let Some(ctx) = ctx {
                ctx.log_debug(&format!(
                    "Motors: L={:.2}, R={:.2}",
                    left_speed, right_speed
                ));
            }
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    scheduler.register(Box::new(TeleopNode {
        cmd_pub: Hub::new("cmd_vel")?,
    }), 0, Some(true));

    scheduler.register(Box::new(MotorDriverNode {
        cmd_sub: Hub::new("cmd_vel")?,
    }), 1, Some(true));

    scheduler.tick_all()?;
    Ok(())
}

Lidar Obstacle Detection

Process laser scan data to detect obstacles.

use horus::prelude::*;
use horus_library::messages::{LaserScan, CmdVel};

// Lidar  Scan data
struct LidarNode {
    scan_pub: Hub<LaserScan>,
}

impl Node for LidarNode {
    fn name(&self) -> &'static str { "LidarNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let mut scan = LaserScan::new();

        // Simulate lidar readings
        for i in 0..360 {
            scan.ranges[i] = 5.0 + (i as f32 * 0.01).sin();
        }

        self.scan_pub.send(scan, ctx).ok();
    }
}

// Scan data  Obstacle detection
struct ObstacleDetector {
    scan_sub: Hub<LaserScan>,
    cmd_pub: Hub<CmdVel>,
    safety_distance: f32,
}

impl Node for ObstacleDetector {
    fn name(&self) -> &'static str { "ObstacleDetector" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(scan) = self.scan_sub.recv(ctx) {
            // Find minimum distance
            if let Some(min_dist) = scan.min_range() {
                if min_dist < self.safety_distance {
                    // Emergency stop!
                    let stop = CmdVel::zero();
                    self.cmd_pub.send(stop, ctx).ok();

                    if let Some(ctx) = ctx {
                        ctx.log_warning(&format!(
                            "Obstacle detected at {:.2}m - stopping!",
                            min_dist
                        ));
                    }
                }
            }
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    scheduler.register(Box::new(LidarNode {
        scan_pub: Hub::new("scan")?,
    }), 0, Some(true));

    // Obstacle detector runs with High priority (1)
    scheduler.register(Box::new(ObstacleDetector {
        scan_sub: Hub::new("scan")?,
        cmd_pub: Hub::new("cmd_vel")?,
        safety_distance: 0.5,  // 50cm
    }), 1, Some(true));

    scheduler.tick_all()?;
    Ok(())
}

PID Controller

Implement a PID controller node for position tracking.

use horus::prelude::*;

struct PIDController {
    setpoint_sub: Hub<f32>,    // Desired position
    feedback_sub: Hub<f32>,    // Current position
    output_pub: Hub<f32>,      // Control output

    kp: f32,  // Proportional gain
    ki: f32,  // Integral gain
    kd: f32,  // Derivative gain

    integral: f32,
    last_error: f32,
}

impl PIDController {
    fn new(kp: f32, ki: f32, kd: f32) -> HorusResult<Self> {
        Ok(Self {
            setpoint_sub: Hub::new("setpoint")?,
            feedback_sub: Hub::new("feedback")?,
            output_pub: Hub::new("control_output")?,
            kp, ki, kd,
            integral: 0.0,
            last_error: 0.0,
        })
    }
}

impl Node for PIDController {
    fn name(&self) -> &'static str { "PIDController" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Get setpoint and current position
        let setpoint = self.setpoint_sub.recv(ctx).unwrap_or(0.0);
        let feedback = self.feedback_sub.recv(ctx).unwrap_or(0.0);

        // Calculate error
        let error = setpoint - feedback;

        // Integral term
        self.integral += error;

        // Derivative term
        let derivative = error - self.last_error;

        // PID output
        let output = self.kp * error
                   + self.ki * self.integral
                   + self.kd * derivative;

        // Publish control output
        self.output_pub.send(output, ctx).ok();

        // Update state
        self.last_error = error;
    }
}

App (Multi-Node Pipeline)

Chain multiple processing stages together.

use horus::prelude::*;

// Stage 1: Data acquisition
struct SensorNode {
    raw_pub: Hub<f32>,
}

impl Node for SensorNode {
    fn name(&self) -> &'static str { "SensorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let raw_data = 42.0;  // Read sensor
        self.raw_pub.send(raw_data, ctx).ok();
    }
}

// Stage 2: Filtering
struct FilterNode {
    raw_sub: Hub<f32>,
    filtered_pub: Hub<f32>,
    alpha: f32,  // Low-pass filter coefficient
    filtered_value: f32,
}

impl Node for FilterNode {
    fn name(&self) -> &'static str { "FilterNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(raw) = self.raw_sub.recv(ctx) {
            // Exponential moving average
            self.filtered_value = self.alpha * raw
                                + (1.0 - self.alpha) * self.filtered_value;

            self.filtered_pub.send(self.filtered_value, ctx).ok();
        }
    }
}

// Stage 3: Decision making
struct ControllerNode {
    filtered_sub: Hub<f32>,
    cmd_pub: Hub<f32>,
}

impl Node for ControllerNode {
    fn name(&self) -> &'static str { "ControllerNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(value) = self.filtered_sub.recv(ctx) {
            let command = if value > 50.0 { 1.0 } else { 0.0 };
            self.cmd_pub.send(command, ctx).ok();
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Register pipeline stages
    scheduler.register(Box::new(SensorNode {
        raw_pub: Hub::new("raw_data")?,
    }), 0, Some(true));

    scheduler.register(Box::new(FilterNode {
        raw_sub: Hub::new("raw_data")?,
        filtered_pub: Hub::new("filtered_data")?,
        alpha: 0.1,
        filtered_value: 0.0,
    }), 1, Some(true));

    scheduler.register(Box::new(ControllerNode {
        filtered_sub: Hub::new("filtered_data")?,
        cmd_pub: Hub::new("commands")?,
    }), 2, Some(true));

    scheduler.tick_all()?;
    Ok(())
}

State Machine Node

Implement behavior using state machines.

use horus::prelude::*;

#[derive(Debug, Clone, Copy)]
enum RobotState {
    Idle,
    Moving,
    ObstacleDetected,
    Rotating,
}

struct StateMachineNode {
    state: RobotState,
    obstacle_sub: Hub<bool>,
    cmd_pub: Hub<f32>,
    rotation_counter: u32,
}

impl Node for StateMachineNode {
    fn name(&self) -> &'static str { "StateMachineNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Check for obstacles
        let obstacle = self.obstacle_sub.recv(ctx).unwrap_or(false);

        // State machine logic
        match self.state {
            RobotState::Idle => {
                if !obstacle {
                    self.state = RobotState::Moving;
                    if let Some(ctx) = ctx {
                        ctx.log_info("Starting to move");
                    }
                }
            }

            RobotState::Moving => {
                if obstacle {
                    self.state = RobotState::ObstacleDetected;
                    self.cmd_pub.send(0.0, ctx).ok();  // Stop
                } else {
                    self.cmd_pub.send(1.0, ctx).ok();  // Move forward
                }
            }

            RobotState::ObstacleDetected => {
                self.state = RobotState::Rotating;
                self.rotation_counter = 0;
                if let Some(ctx) = ctx {
                    ctx.log_warning("Obstacle detected, rotating");
                }
            }

            RobotState::Rotating => {
                self.cmd_pub.send(0.5, ctx).ok();  // Rotate
                self.rotation_counter += 1;

                if self.rotation_counter > 50 {
                    self.state = RobotState::Moving;
                }
            }
        }
    }
}

Priority-Based Safety System

Use priority levels to ensure safety-critical tasks run first.

use horus::prelude::*;

// CRITICAL PRIORITY: Emergency stop
struct EmergencyStopNode {
    battery_sub: Hub<f32>,
    estop_pub: Hub<bool>,
}

impl Node for EmergencyStopNode {
    fn name(&self) -> &'static str { "EmergencyStop" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(battery) = self.battery_sub.recv(ctx) {
            if battery < 10.0 {
                self.estop_pub.send(true, ctx).ok();
                if let Some(ctx) = ctx {
                    ctx.log_error("CRITICAL: Battery low - emergency stop!");
                }
            }
        }
    }
}

// HIGH PRIORITY: Motor controller
struct MotorController {
    estop_sub: Hub<bool>,
    cmd_sub: Hub<f32>,
}

impl Node for MotorController {
    fn name(&self) -> &'static str { "MotorController" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Check emergency stop first
        if let Some(true) = self.estop_sub.recv(ctx) {
            return;  // Don't move if estop active
        }

        if let Some(cmd) = self.cmd_sub.recv(ctx) {
            // Execute motor command
        }
    }
}

// BACKGROUND PRIORITY: Logging
struct LoggerNode {
    data_sub: Hub<f32>,
}

impl Node for LoggerNode {
    fn name(&self) -> &'static str { "Logger" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            // Log to file (slow operation)
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Safety runs FIRST (priority 0 = Critical)
    scheduler.register(Box::new(EmergencyStopNode {
        battery_sub: Hub::new("battery")?,
        estop_pub: Hub::new("estop")?,
    }), 0, Some(true));

    // Control runs SECOND (priority 1 = High)
    scheduler.register(Box::new(MotorController {
        estop_sub: Hub::new("estop")?,
        cmd_sub: Hub::new("cmd")?,
    }), 1, Some(true));

    // Logging runs LAST (priority 4 = Background)
    scheduler.register(Box::new(LoggerNode {
        data_sub: Hub::new("data")?,
    }), 4, Some(true));

    scheduler.tick_all()?;
    Ok(())
}

Python Example

HORUS supports Python with the same performance characteristics.

import horus

class SensorNode(horus.Node):
    def __init__(self):
        self.data_pub = horus.Hub("sensor_data", float)
        self.counter = 0.0

    def name(self):
        return "PySensorNode"

    def tick(self, ctx):
        # Read sensor
        import math
        reading = math.sin(self.counter) * 10.0

        # Publish
        self.data_pub.send(reading, ctx)

        self.counter += 0.1

# Run node
if __name__ == "__main__":
    scheduler = horus.Scheduler()
    scheduler.register(SensorNode())
    scheduler.tick_all()

Testing Pattern

Test nodes in isolation before integration.

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_sensor_node() {
        let mut node = SensorNode::new();
        let mut ctx = NodeInfo::new("TestSensor".to_string(), true);

        // Initialize
        assert!(node.init(&mut ctx).is_ok());

        // Run a few ticks
        for _ in 0..10 {
            node.tick(Some(&mut ctx));
        }

        // Verify metrics
        assert_eq!(ctx.metrics.total_ticks, 10);
    }

    #[test]
    fn test_pubsub_integration() -> HorusResult<()> {
        let mut scheduler = Scheduler::new();

        // Create publisher and subscriber
        scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));
        scheduler.register(Box::new(ProcessorNode::new()?), 1, Some(true));

        // Note: Scheduler::tick_all() runs infinitely until Ctrl+C
        // For testing, you can use tick_node() to run specific nodes
        // or use integration tests with timeouts and stop() method

        Ok(())
    }
}

Snake Game (Multi-Process Demo)

A complete snake game demonstrating HORUS's real-time IPC capabilities with a visual GUI.

Features

  • Real-time IPC: Sub-microsecond message passing between backend and GUI
  • Keyboard Input: Arrow keys or WASD controls
  • Visual Display: Animated snake with eyes that point in the direction of movement
  • Multi-Process Architecture: Backend and GUI run as separate processes

Architecture

Backend (main.rs) - Run with horus run:

  • KeyboardInputNode: Captures keyboard input
  • JoystickInputNode: Optional gamepad support
  • SnakeControlNode: Converts input to snake state
  • Publishes to "snakestate" topic

GUI (snakesim_gui) - Separate binary:

  • Subscribes to "snakestate" topic
  • Renders 20x20 grid with snake visualization
  • Built with eframe/egui (Rust GUI framework)
  • Updates every 200ms

Running the Game

# Copy from installed examples
cp -r ~/.horus/cache/horus@0.1.0/examples/snakesim ~/my_snakesim
cd ~/my_snakesim

# Terminal 1: Run backend (keyboard input + game logic)
horus run

# Terminal 2: Run GUI (visual display)
./snakesim_gui

Use Arrow Keys or WASD to control the snake!

Code Structure

The backend is a single-file HORUS project:

use horus::prelude::*;
use horus::library::nodes::{JoystickInputNode, KeyboardInputNode};

#[derive(Clone, Copy, Debug)]
pub struct SnakeState {
    pub direction: u32,
}

struct SnakeControlNode {
    keyboard_subscriber: Hub<KeyboardInput>,
    joystick_subscriber: Hub<JoystickInput>,
    snake_publisher: Hub<SnakeState>,
}

impl Node for SnakeControlNode {
    fn name(&self) -> &'static str {
        "SnakeControlNode"
    }

    fn tick(&mut self, mut ctx: Option<&mut NodeInfo>) {
        // Handle keyboard input
        while let Some(input) = self.keyboard_subscriber.recv(ctx.as_deref_mut()) {
            if input.pressed {
                let direction = match input.code {
                    38 | 87 => 1,  // ArrowUp or W -> Up
                    40 | 83 => 2,  // ArrowDown or S -> Down
                    37 | 65 => 3,  // ArrowLeft or A -> Left
                    39 | 68 => 4,  // ArrowRight or D -> Right
                    _ => continue,
                };

                let snake_state = SnakeState { direction };
                let _ = self.snake_publisher.send(snake_state, ctx.as_deref_mut());
            }
        }
    }
}

fn main() -> HorusResult<()> {
    let mut sched = Scheduler::new().name("SnakeScheduler");

    let keyboard_input_node = KeyboardInputNode::new_with_topic("snakeinput")?;
    let joystick_input_node = JoystickInputNode::new_with_topic("snakeinput")?;
    let snake_control_node = SnakeControlNode::new()?;

    sched.register(Box::new(keyboard_input_node), 0, Some(true));
    sched.register(Box::new(joystick_input_node), 1, None);
    sched.register(Box::new(snake_control_node), 2, Some(true));

    sched.tick_node(&["KeyboardInputNode", "JoystickInputNode", "SnakeControlNode"])
}

Performance

The game demonstrates HORUS's real-time capabilities:

  • IPC latency: 262ns-1527ns (sub-microsecond!)
  • Update rate: 200ms (5 FPS for smooth gameplay)
  • Message types: KeyboardInput, JoystickInput, SnakeState

Why Two Processes?

This architecture demonstrates:

  • Process isolation: GUI crashes don't affect game logic
  • Language flexibility: Could rewrite GUI in any language
  • Real-time IPC: Shared memory pub/sub with zero-copy messaging
  • Production pattern: Typical robotics architecture (sensor nodes + visualization)

Next Steps