Examples
Learn HORUS through practical examples. All code is production-ready and demonstrates best practices.
Basic Publisher-Subscriber
The foundational pattern in HORUS: one node publishes data, another subscribes.
Publisher Node
use horus::prelude::*;
// Define publisher node
struct SensorNode {
data_pub: Hub<f32>,
counter: f32,
}
impl SensorNode {
fn new() -> HorusResult<Self> {
Ok(Self {
data_pub: Hub::new("sensor_data")?,
counter: 0.0,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &'static str {
"SensorNode"
}
fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
ctx.log_info("Sensor initialized");
Ok(())
}
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Simulate sensor reading
let reading = self.counter.sin() * 10.0;
// Publish data
self.data_pub.send(reading, ctx).ok();
self.counter += 0.1;
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));
scheduler.tick_all()?;
Ok(())
}
Subscriber Node
use horus::prelude::*;
struct ProcessorNode {
data_sub: Hub<f32>,
}
impl ProcessorNode {
fn new() -> HorusResult<Self> {
Ok(Self {
data_sub: Hub::new("sensor_data")?,
})
}
}
impl Node for ProcessorNode {
fn name(&self) -> &'static str {
"ProcessorNode"
}
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
// Process received data
let processed = data * 2.0;
if let Some(ctx) = ctx {
ctx.log_debug(&format!("Processed: {:.2}", processed));
}
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
scheduler.register(Box::new(ProcessorNode::new()?), 0, Some(true));
scheduler.tick_all()?;
Ok(())
}
Combined Application
use horus::prelude::*;
// Publisher
struct SensorNode {
data_pub: Hub<f32>,
counter: f32,
}
impl Node for SensorNode {
fn name(&self) -> &'static str { "SensorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let reading = self.counter.sin() * 10.0;
self.data_pub.send(reading, ctx).ok();
self.counter += 0.1;
}
}
// Subscriber
struct ProcessorNode {
data_sub: Hub<f32>,
}
impl Node for ProcessorNode {
fn name(&self) -> &'static str { "ProcessorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
// Process data
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Register both nodes
scheduler.register(Box::new(SensorNode {
data_pub: Hub::new("sensor_data")?,
counter: 0.0,
}), 0, Some(true));
scheduler.register(Box::new(ProcessorNode {
data_sub: Hub::new("sensor_data")?,
}), 1, Some(true));
// Run both nodes together
scheduler.tick_all()?;
Ok(())
}
Robot Velocity Controller
Control a robot using CmdVel messages.
use horus::prelude::*;
use horus_library::messages::CmdVel;
// Keyboard input velocity commands
struct TeleopNode {
cmd_pub: Hub<CmdVel>,
}
impl Node for TeleopNode {
fn name(&self) -> &'static str { "TeleopNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Simulate keyboard input (w/a/s/d)
let cmd = CmdVel::new(1.0, 0.5); // Forward + turn
self.cmd_pub.send(cmd, ctx).ok();
}
}
// Velocity commands motor control
struct MotorDriverNode {
cmd_sub: Hub<CmdVel>,
}
impl Node for MotorDriverNode {
fn name(&self) -> &'static str { "MotorDriverNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(cmd) = self.cmd_sub.recv(ctx) {
// Convert to motor speeds
let left_speed = cmd.linear - cmd.angular;
let right_speed = cmd.linear + cmd.angular;
// Send to motors
if let Some(ctx) = ctx {
ctx.log_debug(&format!(
"Motors: L={:.2}, R={:.2}",
left_speed, right_speed
));
}
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
scheduler.register(Box::new(TeleopNode {
cmd_pub: Hub::new("cmd_vel")?,
}), 0, Some(true));
scheduler.register(Box::new(MotorDriverNode {
cmd_sub: Hub::new("cmd_vel")?,
}), 1, Some(true));
scheduler.tick_all()?;
Ok(())
}
Lidar Obstacle Detection
Process laser scan data to detect obstacles.
use horus::prelude::*;
use horus_library::messages::{LaserScan, CmdVel};
// Lidar Scan data
struct LidarNode {
scan_pub: Hub<LaserScan>,
}
impl Node for LidarNode {
fn name(&self) -> &'static str { "LidarNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let mut scan = LaserScan::new();
// Simulate lidar readings
for i in 0..360 {
scan.ranges[i] = 5.0 + (i as f32 * 0.01).sin();
}
self.scan_pub.send(scan, ctx).ok();
}
}
// Scan data Obstacle detection
struct ObstacleDetector {
scan_sub: Hub<LaserScan>,
cmd_pub: Hub<CmdVel>,
safety_distance: f32,
}
impl Node for ObstacleDetector {
fn name(&self) -> &'static str { "ObstacleDetector" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(scan) = self.scan_sub.recv(ctx) {
// Find minimum distance
if let Some(min_dist) = scan.min_range() {
if min_dist < self.safety_distance {
// Emergency stop!
let stop = CmdVel::zero();
self.cmd_pub.send(stop, ctx).ok();
if let Some(ctx) = ctx {
ctx.log_warning(&format!(
"Obstacle detected at {:.2}m - stopping!",
min_dist
));
}
}
}
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
scheduler.register(Box::new(LidarNode {
scan_pub: Hub::new("scan")?,
}), 0, Some(true));
// Obstacle detector runs with High priority (1)
scheduler.register(Box::new(ObstacleDetector {
scan_sub: Hub::new("scan")?,
cmd_pub: Hub::new("cmd_vel")?,
safety_distance: 0.5, // 50cm
}), 1, Some(true));
scheduler.tick_all()?;
Ok(())
}
PID Controller
Implement a PID controller node for position tracking.
use horus::prelude::*;
struct PIDController {
setpoint_sub: Hub<f32>, // Desired position
feedback_sub: Hub<f32>, // Current position
output_pub: Hub<f32>, // Control output
kp: f32, // Proportional gain
ki: f32, // Integral gain
kd: f32, // Derivative gain
integral: f32,
last_error: f32,
}
impl PIDController {
fn new(kp: f32, ki: f32, kd: f32) -> HorusResult<Self> {
Ok(Self {
setpoint_sub: Hub::new("setpoint")?,
feedback_sub: Hub::new("feedback")?,
output_pub: Hub::new("control_output")?,
kp, ki, kd,
integral: 0.0,
last_error: 0.0,
})
}
}
impl Node for PIDController {
fn name(&self) -> &'static str { "PIDController" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Get setpoint and current position
let setpoint = self.setpoint_sub.recv(ctx).unwrap_or(0.0);
let feedback = self.feedback_sub.recv(ctx).unwrap_or(0.0);
// Calculate error
let error = setpoint - feedback;
// Integral term
self.integral += error;
// Derivative term
let derivative = error - self.last_error;
// PID output
let output = self.kp * error
+ self.ki * self.integral
+ self.kd * derivative;
// Publish control output
self.output_pub.send(output, ctx).ok();
// Update state
self.last_error = error;
}
}
App (Multi-Node Pipeline)
Chain multiple processing stages together.
use horus::prelude::*;
// Stage 1: Data acquisition
struct SensorNode {
raw_pub: Hub<f32>,
}
impl Node for SensorNode {
fn name(&self) -> &'static str { "SensorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let raw_data = 42.0; // Read sensor
self.raw_pub.send(raw_data, ctx).ok();
}
}
// Stage 2: Filtering
struct FilterNode {
raw_sub: Hub<f32>,
filtered_pub: Hub<f32>,
alpha: f32, // Low-pass filter coefficient
filtered_value: f32,
}
impl Node for FilterNode {
fn name(&self) -> &'static str { "FilterNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(raw) = self.raw_sub.recv(ctx) {
// Exponential moving average
self.filtered_value = self.alpha * raw
+ (1.0 - self.alpha) * self.filtered_value;
self.filtered_pub.send(self.filtered_value, ctx).ok();
}
}
}
// Stage 3: Decision making
struct ControllerNode {
filtered_sub: Hub<f32>,
cmd_pub: Hub<f32>,
}
impl Node for ControllerNode {
fn name(&self) -> &'static str { "ControllerNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(value) = self.filtered_sub.recv(ctx) {
let command = if value > 50.0 { 1.0 } else { 0.0 };
self.cmd_pub.send(command, ctx).ok();
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Register pipeline stages
scheduler.register(Box::new(SensorNode {
raw_pub: Hub::new("raw_data")?,
}), 0, Some(true));
scheduler.register(Box::new(FilterNode {
raw_sub: Hub::new("raw_data")?,
filtered_pub: Hub::new("filtered_data")?,
alpha: 0.1,
filtered_value: 0.0,
}), 1, Some(true));
scheduler.register(Box::new(ControllerNode {
filtered_sub: Hub::new("filtered_data")?,
cmd_pub: Hub::new("commands")?,
}), 2, Some(true));
scheduler.tick_all()?;
Ok(())
}
State Machine Node
Implement behavior using state machines.
use horus::prelude::*;
#[derive(Debug, Clone, Copy)]
enum RobotState {
Idle,
Moving,
ObstacleDetected,
Rotating,
}
struct StateMachineNode {
state: RobotState,
obstacle_sub: Hub<bool>,
cmd_pub: Hub<f32>,
rotation_counter: u32,
}
impl Node for StateMachineNode {
fn name(&self) -> &'static str { "StateMachineNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Check for obstacles
let obstacle = self.obstacle_sub.recv(ctx).unwrap_or(false);
// State machine logic
match self.state {
RobotState::Idle => {
if !obstacle {
self.state = RobotState::Moving;
if let Some(ctx) = ctx {
ctx.log_info("Starting to move");
}
}
}
RobotState::Moving => {
if obstacle {
self.state = RobotState::ObstacleDetected;
self.cmd_pub.send(0.0, ctx).ok(); // Stop
} else {
self.cmd_pub.send(1.0, ctx).ok(); // Move forward
}
}
RobotState::ObstacleDetected => {
self.state = RobotState::Rotating;
self.rotation_counter = 0;
if let Some(ctx) = ctx {
ctx.log_warning("Obstacle detected, rotating");
}
}
RobotState::Rotating => {
self.cmd_pub.send(0.5, ctx).ok(); // Rotate
self.rotation_counter += 1;
if self.rotation_counter > 50 {
self.state = RobotState::Moving;
}
}
}
}
}
Priority-Based Safety System
Use priority levels to ensure safety-critical tasks run first.
use horus::prelude::*;
// CRITICAL PRIORITY: Emergency stop
struct EmergencyStopNode {
battery_sub: Hub<f32>,
estop_pub: Hub<bool>,
}
impl Node for EmergencyStopNode {
fn name(&self) -> &'static str { "EmergencyStop" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(battery) = self.battery_sub.recv(ctx) {
if battery < 10.0 {
self.estop_pub.send(true, ctx).ok();
if let Some(ctx) = ctx {
ctx.log_error("CRITICAL: Battery low - emergency stop!");
}
}
}
}
}
// HIGH PRIORITY: Motor controller
struct MotorController {
estop_sub: Hub<bool>,
cmd_sub: Hub<f32>,
}
impl Node for MotorController {
fn name(&self) -> &'static str { "MotorController" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Check emergency stop first
if let Some(true) = self.estop_sub.recv(ctx) {
return; // Don't move if estop active
}
if let Some(cmd) = self.cmd_sub.recv(ctx) {
// Execute motor command
}
}
}
// BACKGROUND PRIORITY: Logging
struct LoggerNode {
data_sub: Hub<f32>,
}
impl Node for LoggerNode {
fn name(&self) -> &'static str { "Logger" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
// Log to file (slow operation)
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Safety runs FIRST (priority 0 = Critical)
scheduler.register(Box::new(EmergencyStopNode {
battery_sub: Hub::new("battery")?,
estop_pub: Hub::new("estop")?,
}), 0, Some(true));
// Control runs SECOND (priority 1 = High)
scheduler.register(Box::new(MotorController {
estop_sub: Hub::new("estop")?,
cmd_sub: Hub::new("cmd")?,
}), 1, Some(true));
// Logging runs LAST (priority 4 = Background)
scheduler.register(Box::new(LoggerNode {
data_sub: Hub::new("data")?,
}), 4, Some(true));
scheduler.tick_all()?;
Ok(())
}
Python Example
HORUS supports Python with the same performance characteristics.
import horus
class SensorNode(horus.Node):
def __init__(self):
self.data_pub = horus.Hub("sensor_data", float)
self.counter = 0.0
def name(self):
return "PySensorNode"
def tick(self, ctx):
# Read sensor
import math
reading = math.sin(self.counter) * 10.0
# Publish
self.data_pub.send(reading, ctx)
self.counter += 0.1
# Run node
if __name__ == "__main__":
scheduler = horus.Scheduler()
scheduler.register(SensorNode())
scheduler.tick_all()
Testing Pattern
Test nodes in isolation before integration.
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sensor_node() {
let mut node = SensorNode::new();
let mut ctx = NodeInfo::new("TestSensor".to_string(), true);
// Initialize
assert!(node.init(&mut ctx).is_ok());
// Run a few ticks
for _ in 0..10 {
node.tick(Some(&mut ctx));
}
// Verify metrics
assert_eq!(ctx.metrics.total_ticks, 10);
}
#[test]
fn test_pubsub_integration() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Create publisher and subscriber
scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));
scheduler.register(Box::new(ProcessorNode::new()?), 1, Some(true));
// Note: Scheduler::tick_all() runs infinitely until Ctrl+C
// For testing, you can use tick_node() to run specific nodes
// or use integration tests with timeouts and stop() method
Ok(())
}
}
Snake Game (Multi-Process Demo)
A complete snake game demonstrating HORUS's real-time IPC capabilities with a visual GUI.
Features
- Real-time IPC: Sub-microsecond message passing between backend and GUI
- Keyboard Input: Arrow keys or WASD controls
- Visual Display: Animated snake with eyes that point in the direction of movement
- Multi-Process Architecture: Backend and GUI run as separate processes
Architecture
Backend (main.rs) - Run with horus run:
- KeyboardInputNode: Captures keyboard input
- JoystickInputNode: Optional gamepad support
- SnakeControlNode: Converts input to snake state
- Publishes to
"snakestate"topic
GUI (snakesim_gui) - Separate binary:
- Subscribes to
"snakestate"topic - Renders 20x20 grid with snake visualization
- Built with eframe/egui (Rust GUI framework)
- Updates every 200ms
Running the Game
# Copy from installed examples
cp -r ~/.horus/cache/horus@0.1.0/examples/snakesim ~/my_snakesim
cd ~/my_snakesim
# Terminal 1: Run backend (keyboard input + game logic)
horus run
# Terminal 2: Run GUI (visual display)
./snakesim_gui
Use Arrow Keys or WASD to control the snake!
Code Structure
The backend is a single-file HORUS project:
use horus::prelude::*;
use horus::library::nodes::{JoystickInputNode, KeyboardInputNode};
#[derive(Clone, Copy, Debug)]
pub struct SnakeState {
pub direction: u32,
}
struct SnakeControlNode {
keyboard_subscriber: Hub<KeyboardInput>,
joystick_subscriber: Hub<JoystickInput>,
snake_publisher: Hub<SnakeState>,
}
impl Node for SnakeControlNode {
fn name(&self) -> &'static str {
"SnakeControlNode"
}
fn tick(&mut self, mut ctx: Option<&mut NodeInfo>) {
// Handle keyboard input
while let Some(input) = self.keyboard_subscriber.recv(ctx.as_deref_mut()) {
if input.pressed {
let direction = match input.code {
38 | 87 => 1, // ArrowUp or W -> Up
40 | 83 => 2, // ArrowDown or S -> Down
37 | 65 => 3, // ArrowLeft or A -> Left
39 | 68 => 4, // ArrowRight or D -> Right
_ => continue,
};
let snake_state = SnakeState { direction };
let _ = self.snake_publisher.send(snake_state, ctx.as_deref_mut());
}
}
}
}
fn main() -> HorusResult<()> {
let mut sched = Scheduler::new().name("SnakeScheduler");
let keyboard_input_node = KeyboardInputNode::new_with_topic("snakeinput")?;
let joystick_input_node = JoystickInputNode::new_with_topic("snakeinput")?;
let snake_control_node = SnakeControlNode::new()?;
sched.register(Box::new(keyboard_input_node), 0, Some(true));
sched.register(Box::new(joystick_input_node), 1, None);
sched.register(Box::new(snake_control_node), 2, Some(true));
sched.tick_node(&["KeyboardInputNode", "JoystickInputNode", "SnakeControlNode"])
}
Performance
The game demonstrates HORUS's real-time capabilities:
- IPC latency: 262ns-1527ns (sub-microsecond!)
- Update rate: 200ms (5 FPS for smooth gameplay)
- Message types: KeyboardInput, JoystickInput, SnakeState
Why Two Processes?
This architecture demonstrates:
- Process isolation: GUI crashes don't affect game logic
- Language flexibility: Could rewrite GUI in any language
- Real-time IPC: Shared memory pub/sub with zero-copy messaging
- Production pattern: Typical robotics architecture (sensor nodes + visualization)
Next Steps
- Read the API Reference for detailed documentation
- Optimize with the Performance Guide
- Learn about Python Bindings and C Bindings
- Explore the Core Concepts for deeper understanding