Advanced Examples

Advanced HORUS patterns for complex robotics systems. These examples demonstrate priority-based safety systems, multi-process architectures, and cross-language communication.

Prerequisites: Complete Basic Examples first.


1. State Machine Node

Implement complex behavior using state machines - ideal for autonomous robots with multiple operating modes.

File: state_machine.rs

use horus::prelude::*;

#[derive(Debug, Clone, Copy, PartialEq)]
enum RobotState {
    Idle,
    Moving,
    ObstacleDetected,
    Rotating,
    Escaped,
}

struct StateMachineNode {
    state: RobotState,
    obstacle_sub: Topic<bool>,
    cmd_pub: Topic<CmdVel>,
    rotation_counter: u32,
}

impl StateMachineNode {
    fn new() -> Result<Self> {
        Ok(Self {
            state: RobotState::Idle,
            obstacle_sub: Topic::new("obstacle_detected")?,
            cmd_pub: Topic::new("cmd_vel")?,
            rotation_counter: 0,
        })
    }
}

impl Node for StateMachineNode {
    fn name(&self) -> &str { "StateMachineNode" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "State machine initialized - starting in IDLE state");
        Ok(())
    }

    fn tick(&mut self) {
        // Check for obstacles
        let obstacle = self.obstacle_sub.recv().unwrap_or(false);

        // Store previous state for logging
        let prev_state = self.state;

        // State machine logic
        self.state = match self.state {
            RobotState::Idle => {
                if !obstacle {
                    RobotState::Moving
                } else {
                    RobotState::Idle
                }
            }

            RobotState::Moving => {
                if obstacle {
                    self.cmd_pub.send(CmdVel::zero());  // Stop
                    RobotState::ObstacleDetected
                } else {
                    self.cmd_pub.send(CmdVel::new(1.0, 0.0));  // Forward
                    RobotState::Moving
                }
            }

            RobotState::ObstacleDetected => {
                self.rotation_counter = 0;
                RobotState::Rotating
            }

            RobotState::Rotating => {
                self.cmd_pub.send(CmdVel::new(0.0, 0.5));  // Rotate
                self.rotation_counter += 1;

                if self.rotation_counter > 50 {
                    RobotState::Escaped
                } else {
                    RobotState::Rotating
                }
            }

            RobotState::Escaped => {
                RobotState::Moving  // Resume moving
            }
        };

        // Log state transitions
        if self.state != prev_state {
            hlog!(info, "State transition: {:?} -> {:?}",
                prev_state, self.state);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Ensure robot is stopped
        self.cmd_pub.send(CmdVel::zero());
        hlog!(info, "State machine shutdown");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(StateMachineNode::new()?).order(0).build()?;
    scheduler.run()?;
    Ok(())
}

Run it:

horus run state_machine.rs

Key Concepts:

  • Enum for states: Idle, Moving, ObstacleDetected, Rotating, Escaped
  • Match expression handles state transitions
  • Each state defines behavior and next state
  • Log state transitions for debugging

2. Priority-Based Safety System

Use node priorities to ensure safety-critical tasks always run first - essential for production robotics.

File: safety_system.rs

use horus::prelude::*;

// CRITICAL PRIORITY: Emergency stop
struct EmergencyStopNode {
    battery_sub: Topic<BatteryState>,
    lidar_sub: Topic<f32>,  // Min obstacle distance
    estop_pub: Topic<bool>,
    estop_active: bool,
}

impl EmergencyStopNode {
    fn new() -> Result<Self> {
        Ok(Self {
            battery_sub: Topic::new("battery_state")?,
            lidar_sub: Topic::new("min_distance")?,
            estop_pub: Topic::new("emergency_stop")?,
            estop_active: false,
        })
    }
}

impl Node for EmergencyStopNode {
    fn name(&self) -> &str { "EmergencyStop" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, " Emergency stop system online - CRITICAL priority");
        Ok(())
    }

    fn tick(&mut self) {
        let mut should_stop = false;

        // Check battery
        if let Some(battery) = self.battery_sub.recv() {
            if battery.is_critical() {  // Below 10%
                should_stop = true;
                hlog!(error, " CRITICAL: Battery at {:.0}% - EMERGENCY STOP!",
                    battery.percentage);
            }
        }

        // Check obstacle distance
        if let Some(min_dist) = self.lidar_sub.recv() {
            if min_dist < 0.2 {  // 20cm
                should_stop = true;
                hlog!(error, " CRITICAL: Obstacle at {:.2}m - EMERGENCY STOP!",
                    min_dist);
            }
        }

        // Publish estop state
        if should_stop != self.estop_active {
            self.estop_pub.send(should_stop);
            self.estop_active = should_stop;
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Always activate estop on shutdown
        self.estop_pub.send(true);
        hlog!(warn, "Emergency stop system offline");
        Ok(())
    }
}

// HIGH PRIORITY: Motor controller
struct MotorController {
    estop_sub: Topic<bool>,
    cmd_sub: Topic<CmdVel>,
    motor_pub: Topic<CmdVel>,
    estop_active: bool,
}

impl MotorController {
    fn new() -> Result<Self> {
        Ok(Self {
            estop_sub: Topic::new("emergency_stop")?,
            cmd_sub: Topic::new("cmd_vel_request")?,
            motor_pub: Topic::new("cmd_vel_actual")?,
            estop_active: false,
        })
    }
}

impl Node for MotorController {
    fn name(&self) -> &str { "MotorController" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Motor controller online - HIGH priority");
        Ok(())
    }

    fn tick(&mut self) {
        // Check emergency stop FIRST
        if let Some(estop) = self.estop_sub.recv() {
            if estop != self.estop_active {
                self.estop_active = estop;
                if estop {
                    hlog!(warn, "Motors DISABLED - emergency stop active");
                } else {
                    hlog!(info, "Motors ENABLED - emergency stop cleared");
                }
            }
        }

        // Don't move if estop active
        if self.estop_active {
            self.motor_pub.send(CmdVel::zero());
            return;
        }

        // Process normal commands
        if let Some(cmd) = self.cmd_sub.recv() {
            self.motor_pub.send(cmd);
            hlog!(debug, "Motors: linear={:.2}, angular={:.2}",
                cmd.linear, cmd.angular);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Stop motors
        self.motor_pub.send(CmdVel::zero());
        hlog!(info, "Motor controller stopped");
        Ok(())
    }
}

// BACKGROUND PRIORITY: Data logging
struct LoggerNode {
    cmd_sub: Topic<CmdVel>,
    battery_sub: Topic<BatteryState>,
}

impl LoggerNode {
    fn new() -> Result<Self> {
        Ok(Self {
            cmd_sub: Topic::new("cmd_vel_actual")?,
            battery_sub: Topic::new("battery_state")?,
        })
    }
}

impl Node for LoggerNode {
    fn name(&self) -> &str { "Logger" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, " Logger online - BACKGROUND priority");
        Ok(())
    }

    fn tick(&mut self) {
        // Log velocity commands
        if let Some(cmd) = self.cmd_sub.recv() {
            hlog!(debug, "LOG: cmd_vel({:.2}, {:.2})",
                cmd.linear, cmd.angular);
        }

        // Log battery state
        if let Some(battery) = self.battery_sub.recv() {
            hlog!(debug, "LOG: battery({:.1}V, {:.0}%)",
                battery.voltage, battery.percentage);
        }

        // In production: write to file, database, etc.
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Logger stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Order 0 (Critical): Safety runs FIRST
    scheduler.add(EmergencyStopNode::new()?).order(0).build()?;

    // Order 1 (High): Control runs SECOND
    scheduler.add(MotorController::new()?).order(1).build()?;

    // Order 4 (Background): Logging runs LAST
    scheduler.add(LoggerNode::new()?).order(4).build()?;

    scheduler.run()?;
    Ok(())
}

Run it:

horus run safety_system.rs

Key Concepts:

  • Priority 0 (Critical): Emergency stop - runs first, always
  • Priority 1 (High): Motor control - runs after safety checks
  • Priority 4 (Background): Logging - runs last, non-critical
  • Lower number = higher priority
  • Safety systems should always check estop before acting

3. Python Multi-Process System

Build a complete sensor monitoring system with Python nodes running as independent processes.

Project Structure

mkdir multi_node_system
cd multi_node_system
mkdir nodes

Sensor Node

nodes/sensor.py:

#!/usr/bin/env python3
import horus
import random

def sensor_tick(node):
    # Generate realistic temperature with noise
    temperature = 20.0 + random.random() * 10.0
    node.send("temperature", temperature)
    print(f"Sensor: {temperature:.1f}°C")

sensor = horus.Node(name="SensorNode", tick=sensor_tick, order=0, rate=2,
                    pubs=["temperature"])
horus.run(sensor)

Controller Node

nodes/controller.py:

#!/usr/bin/env python3
import horus

def controller_tick(node):
    temp = node.recv("temperature")
    if temp is not None:
        if temp > 25.0:
            fan_speed = min(100, int((temp - 20) * 10))
            node.send("fan_control", fan_speed)
            print(f"Controller: Fan at {fan_speed}%")
        else:
            node.send("fan_control", 0)
            print(f"Controller: Temperature normal, fan off")

controller = horus.Node(name="ControllerNode", tick=controller_tick,
                        order=1, rate=2,
                        subs=["temperature"], pubs=["fan_control"])
horus.run(controller)

Logger Node

nodes/logger.py:

#!/usr/bin/env python3
import horus
import datetime

def logger_tick(node):
    temp = node.recv("temperature")
    fan = node.recv("fan_control")
    if temp is not None and fan is not None:
        timestamp = datetime.datetime.now().strftime("%H:%M:%S")
        status = "COOLING" if fan > 0 else "NORMAL"
        print(f"Logger [{timestamp}]: {temp:.1f}°C | Fan {fan}% | {status}")

logger = horus.Node(name="LoggerNode", tick=logger_tick, order=2, rate=1,
                    subs=["temperature", "fan_control"])
horus.run(logger)

Run All Nodes Concurrently

# Make scripts executable
chmod +x nodes/*.py

# Run all nodes as separate processes
horus run "nodes/*.py"

Output:

Executing 3 files concurrently:
  1. nodes/controller.py (python)
  2. nodes/logger.py (python)
  3. nodes/sensor.py (python)

Phase 1: Building all files...
Phase 2: Starting all processes...
  Started [controller]
  Started [logger]
  Started [sensor]

All processes running. Press Ctrl+C to stop.

[sensor] Sensor: 23.4°C
[controller] Controller: Fan at 34%
[logger] Logger [15:30:45]: 23.4°C | Fan 34% | COOLING
[sensor] Sensor: 26.8°C
[controller] Controller: Fan at 68%
[sensor] Sensor: 21.2°C
[logger] Logger [15:30:46]: 21.2°C | Fan 12% | COOLING

Key Features:

  • Independent Processes: Each node runs in its own process
  • Shared Memory IPC: Nodes communicate via HORUS shared memory topics
  • Color-Coded Output: Each node has a unique color
  • Graceful Shutdown: Ctrl+C stops all processes cleanly
  • Zero Configuration: No launch files needed

4. Rust + Python Cross-Language System

Mix Rust and Python nodes in the same application.

Rust Sensor Node

nodes/rust_sensor.rs:

use horus::prelude::*;

pub struct TempSensor {
    temp_pub: Topic<f32>,
    counter: f32,
}

impl TempSensor {
    fn new() -> Result<Self> {
        Ok(Self {
            temp_pub: Topic::new("temperature")?,
            counter: 0.0,
        })
    }
}

impl Node for TempSensor {
    fn name(&self) -> &str { "RustTempSensor" }

    fn init(&mut self) -> Result<()> {
        hlog!(info, "Rust sensor online - high performance mode");
        Ok(())
    }

    fn tick(&mut self) {
        // Fast sensor simulation
        let temp = 20.0 + (self.counter.sin() * 5.0);
        self.temp_pub.send(temp);

        hlog!(debug, "Rust: {:.2}°C", temp);

        self.counter += 0.1;
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Rust sensor offline");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(TempSensor::new()?).order(0).build()?;
    scheduler.run()
}

Python Controller Node

nodes/py_controller.py:

#!/usr/bin/env python3
import horus

def py_controller_tick(node):
    temp = node.recv("temperature")
    if temp is not None:
        status = "HOT" if temp > 22.0 else "NORMAL"
        print(f"Python controller: {temp:.2f}°C - {status}")

        command = 1.0 if temp > 22.0 else 0.0
        node.send("actuator_cmd", command)

controller = horus.Node(name="PyController", tick=py_controller_tick,
                        order=0, rate=10,
                        subs=["temperature"], pubs=["actuator_cmd"])
horus.run(controller)

Rust Actuator Node

nodes/rust_actuator.rs:

use horus::prelude::*;

struct Actuator {
    cmd_sub: Topic<f32>,
}

impl Node for Actuator {
    fn name(&self) -> &str { "RustActuator" }

    fn tick(&mut self) {
        if let Some(cmd) = self.cmd_sub.recv() {
            let action = if cmd > 0.5 { "COOLING" } else { "IDLE" };
            hlog!(info, "Actuator: {}", action);
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        hlog!(info, "Actuator stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(Actuator {
        cmd_sub: Topic::new("actuator_cmd")?,
    }).order(0).build()?;
    scheduler.run()
}

Run Mixed System

horus run "nodes/*"

HORUS automatically detects file types and compiles/runs appropriately!

Key Concepts:

  • Rust nodes: High performance, type safety
  • Python nodes: Rapid prototyping, easy scripting
  • Shared memory IPC works across languages
  • Zero-copy message passing
  • Sub-microsecond latency even across language boundaries

5. Advanced Python Features

Per-node rates, timestamp checking, and staleness detection.

File: advanced_python.py

#!/usr/bin/env python3
import horus
from horus import Imu

# 100Hz IMU sensor
def imu_tick(node):
    imu = Imu(
        accel_x=1.0, accel_y=0.0, accel_z=9.8,
        gyro_x=0.0, gyro_y=0.0, gyro_z=0.0
    )
    node.send("imu", imu)

# 50Hz controller
def controller_tick(node):
    imu = node.recv("imu")
    if imu is not None:
        # Process IMU data
        accel_magnitude = (
            imu.accel_x**2 +
            imu.accel_y**2 +
            imu.accel_z**2
        ) ** 0.5

        print(f"Control: IMU magnitude = {accel_magnitude:.2f} m/s²")

        # Send command
        cmd = {"linear": 1.0, "angular": 0.5}
        node.send("cmd_vel", cmd)

# 10Hz logger
def logger_tick(node):
    msg = node.recv("cmd_vel")
    if msg is not None:
        print(f"Logger: Received command: {msg}")

# Create nodes with per-node rates and run
sensor = horus.Node(name="HighFreqSensor", tick=imu_tick, order=0, rate=100,
                    pubs=["imu"])
ctrl = horus.Node(name="Controller", tick=controller_tick, order=1, rate=50,
                  subs=["imu"], pubs=["cmd_vel"])
logger = horus.Node(name="Logger", tick=logger_tick, order=2, rate=10,
                    subs=["cmd_vel"])

horus.run(sensor, ctrl, logger)

Run it:

horus run advanced_python.py

Key Concepts:

  • Per-node rates: Each node runs at different frequency via .rate(Hz)
  • Typed messages: Use Topic(Imu) for cross-language compatible messages
  • Generic messages: Use Topic("name") for Python-only dict/list data
  • Priorities: Lower order number = higher priority, runs first each tick

When to Use Multi-Process vs Single-Process

Multi-Process (Concurrent Execution)

Use when:

  • Nodes need to run independently (like ROS)
  • Want fault isolation (one crash doesn't kill others)
  • Different nodes have vastly different rates
  • Testing distributed system architectures
  • Mixing languages (Rust + Python)

Command:

horus run "nodes/*.rs"
horus run "nodes/*.py"
horus run "nodes/*"  # Mix Rust and Python!

Single-Process

Use when:

  • All nodes in one file
  • Need maximum performance
  • Require deterministic execution order
  • Simpler deployment
  • Embedded systems with limited resources

Command:

horus run main.rs

Performance Notes

Multi-Process IPC Performance

  • Latency: 300ns - 1μs (shared memory)
  • Throughput: Thousands of messages/second
  • Scalability: Tested with 50+ concurrent processes
  • Memory: ~2-5MB overhead per process

Single-Process Performance

  • Latency: 50-200ns (in-memory)
  • Throughput: Millions of messages/second
  • Scalability: Hundreds of nodes in one process
  • Memory: Minimal overhead

Testing Multi-Node Systems

Create test script: test_system.py

#!/usr/bin/env python3
import horus

# Shared test state
fan_commands = []

def mock_sensor_tick(node):
    node.send("temperature", 30.0)  # Hot!

def test_controller_tick(node):
    temp = node.recv("temperature")
    if temp is not None and temp > 25.0:
        fan_speed = min(100, int((temp - 20) * 10))
        fan_commands.append(fan_speed)
        node.send("fan_control", fan_speed)

sensor = horus.Node(name="MockSensor", tick=mock_sensor_tick,
                    pubs=["temperature"])
controller = horus.Node(name="TestController", tick=test_controller_tick,
                        subs=["temperature"], pubs=["fan_control"])

# Use tick_once for testing (runs one tick cycle)
scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.tick_once()

# Verify
assert len(fan_commands) > 0, "Controller should have sent fan commands"
assert fan_commands[0] == 100, f"Expected fan at 100%, got {fan_commands[0]}%"
print("Test passed!")

Next Steps

Need help?