Scheduler

The Scheduler is the execution orchestrator in HORUS. It manages the node lifecycle, coordinates priority-based execution, and handles graceful shutdown.

What is the Scheduler?

The Scheduler is responsible for:

Node Registration: Registering nodes with the scheduler

Lifecycle Management: Calling init(), tick(), and shutdown() at the right times

Priority-Based Execution: Running nodes in priority order every tick

Signal Handling: Graceful shutdown on Ctrl+C

Performance Monitoring: Tracking execution metrics for all nodes

Error Recovery: Handling node errors without crashing the system

Basic Usage

Creating a Scheduler

use horus::prelude::*;

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Register nodes (priority 0 = highest, logging enabled)
    scheduler.register(Box::new(my_node), 0, Some(true));

    // Run the scheduler
    scheduler.tick_all()?;
    Ok(())
}

Registering Nodes

use horus::prelude::*;

let mut scheduler = Scheduler::new();

// Register with priority 2 (Normal - default recommended)
scheduler.register(Box::new(sensor_node), 2, Some(true));

// Register with priority 0 (Critical - highest priority)
scheduler.register(Box::new(safety_node), 0, Some(true));

Running the Scheduler

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new();

    // Register your nodes
    scheduler.register(Box::new(node1), 0, Some(true));
    scheduler.register(Box::new(node2), 1, Some(true));

    // Start the main loop
    scheduler.tick_all()?;
    Ok(())
}

Scheduler Architecture

Internal Structure

pub struct Scheduler {
    nodes: Vec<RegisteredNode>,
    running: Arc<Mutex<bool>>,
    tick_rate_fps: u32,
}

struct RegisteredNode {
    node: Box<dyn Node>,
    context: NodeInfo,
    priority: NodePriority,
}

nodes: Vector of registered nodes with their contexts

running: Atomic flag for graceful shutdown

tick_rate_fps: Target execution rate (default: 60 FPS)

Execution Flow

1. Initialize all nodes (call init())
2. Sort nodes by priority
3. Main loop:
   a. For each node (in priority order):
      - Start tick timing
      - Call node.tick()
      - Record tick metrics
   b. Sleep to maintain target FPS
4. On shutdown signal:
   a. Set running = false
   b. Call shutdown() on all nodes

Priority-Based Execution

Priority Levels

pub enum NodePriority {
    Critical = 0,   // Highest priority - safety systems
    High = 1,       // Control loops, actuators
    Normal = 2,     // Default - sensors, processing
    Low = 3,        // Non-critical computation
    Background = 4, // Logging, monitoring
}

Execution Order

Nodes execute in priority order every tick:

let mut scheduler = Scheduler::new();

// Execution order: Safety  Controller  Sensor  Logger
scheduler.register(Box::new(safety_monitor), 0, Some(true));    // Runs 1st (Critical)
scheduler.register(Box::new(controller), 1, Some(true));         // Runs 2nd (High)
scheduler.register(Box::new(sensor), 2, Some(true));             // Runs 3rd (Normal)
scheduler.register(Box::new(logger), 4, Some(true));             // Runs 4th (Background)

scheduler.tick_all()?;

Key Point: Lower numeric value = higher priority (Critical = 0 runs first)

Why Priority Matters

Safety First: Critical nodes (emergency stop, collision detection) run before anything else

Real-Time Guarantees: High-priority control loops execute before low-priority logging

Deterministic Behavior: Same execution order every tick

Resource Management: Important tasks get CPU time first

Priority Best Practices

Critical: Safety monitors, emergency stops, watchdogs

scheduler.register(Box::new(emergency_stop), 0, Some(true));

High: Control loops, actuator commands, time-sensitive operations

scheduler.register(Box::new(motor_controller), 1, Some(true));

Normal: Sensors, filters, state estimation (most nodes)

scheduler.register(Box::new(lidar_node), 2, Some(true));

Low: Non-real-time computation, planning, optimization

scheduler.register(Box::new(path_planner), 3, Some(true));

Background: Logging, diagnostics, data recording

scheduler.register(Box::new(data_logger), 4, Some(true));

Lifecycle Management

Initialization Phase

When you call tick_all(), the scheduler first initializes all nodes:

pub fn tick_all(&mut self) -> HorusResult<()> {
    // Initialize all nodes
    for registered in self.nodes.iter_mut() {
        registered.context.state = NodeState::Initializing;

        match registered.node.init(&mut registered.context) {
            Ok(()) => {
                registered.context.state = NodeState::Running;
            }
            Err(e) => {
                eprintln!("Node {} failed to initialize: {}",
                          registered.context.name, e);
                registered.context.state = NodeState::Error;
            }
        }
    }

    // Continue to main loop...
}

Initialization Order: All nodes initialize before the main loop starts

Error Handling: If init() fails, node enters Error state and won't run

State Tracking: NodeState transitions Uninitialized Initializing Running

Main Execution Loop

The main loop runs at a target FPS (default 60):

while self.is_running() {
    // Sort by priority each tick
    self.nodes.sort_by_key(|r| r.priority);

    for registered in self.nodes.iter_mut() {
        if registered.context.state != NodeState::Running {
            continue;  // Skip non-running nodes
        }

        let context = &mut registered.context;
        context.start_tick();  // Start timing

        registered.node.tick(Some(context));

        context.record_tick();  // Record metrics
    }

    // Maintain target FPS
    std::thread::sleep(Duration::from_millis(16));  // ~60 FPS
}

Sorting: Nodes are sorted by priority every tick

State Check: Only nodes in Running state execute

Timing: Each tick is timed and recorded in metrics

FPS Control: Sleep maintains target execution rate

Shutdown Phase

Graceful shutdown on Ctrl+C:

// Signal handler (set up in tick_all)
ctrlc::set_handler(move || {
    eprintln!("\nCtrl+C received! Shutting down HORUS scheduler...");
    if let Ok(mut r) = running.lock() {
        *r = false;  // Stop the main loop
    }
}).expect("Error setting HORUS signal handler");

// After main loop exits
for registered in self.nodes.iter_mut() {
    registered.context.state = NodeState::Stopping;

    match registered.node.shutdown(&mut registered.context) {
        Ok(()) => {
            registered.context.state = NodeState::Stopped;
        }
        Err(e) => {
            eprintln!("Node {} shutdown error: {}",
                      registered.context.name, e);
        }
    }
}

Signal Handling: Ctrl+C triggers graceful shutdown

State Transition: Running Stopping Stopped

Error Tolerance: Shutdown continues even if individual nodes fail

Tick Rate Control

Default Tick Rate

The scheduler runs at 60 FPS (16ms per tick) by default:

let scheduler = Scheduler::new();
// Runs at ~60 FPS (16ms per tick)

Note: The tick rate is currently hardcoded at approximately 60 FPS. The scheduler sleeps for 16ms between tick cycles to maintain this target rate. If your nodes take longer than 16ms to execute, the actual FPS will be lower.

Tick Rate Considerations

60 FPS (16ms): Default tick rate for most robotics applications

  • Suitable for sensor reading, control loops, and general robotics tasks
  • Provides good balance between responsiveness and CPU usage

Custom tick rates: If you need a different tick rate for your application, you can modify the sleep duration in the scheduler's main loop. However, this is not currently exposed as a public API.

Key Point: If your nodes take longer than the tick period to execute, you won't hit the target FPS. Monitor your node metrics to ensure tick durations stay within acceptable bounds.

Running Modes

Continuous Mode (tick_all)

Run until Ctrl+C:

scheduler.tick_all()?;

This is the most common mode for robotics applications. The scheduler will continuously execute all registered nodes in priority order until a shutdown signal is received.

Node-Specific Execution

Execute specific nodes by name:

scheduler.tick_node(&["SensorNode", "MotorNode"])?;

Useful for:

  • Debugging specific nodes in isolation
  • Profiling individual nodes
  • Testing node behavior independently
  • Running only a subset of your system

Signal Handling

Ctrl+C Handling

The scheduler automatically handles Ctrl+C:

// Automatically set up by tick_all()
ctrlc::set_handler(move || {
    eprintln!("\nCtrl+C received! Shutting down HORUS scheduler...");
    if let Ok(mut r) = running.lock() {
        *r = false;
    }
}).expect("Error setting HORUS signal handler");

Output:

^C
Ctrl+C received! Shutting down HORUS scheduler...
[Nodes shutting down gracefully...]

Graceful Shutdown

The scheduler ensures clean shutdown:

Stop accepting new ticks: Main loop exits

Call shutdown() on all nodes: Release resources

Wait for cleanup: All nodes shut down before exit

No zombie processes: Shared memory cleaned up

Error Handling

Initialization Errors

If a node fails to initialize:

fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
    return Err("Sensor not connected".to_string());
}

Behavior:

  • Node enters Error state
  • Node will NOT run in main loop
  • Other nodes continue normally
  • Error message printed to stderr

Runtime Errors

If a node panics during tick():

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    panic!("Something went wrong!");
}

Behavior:

  • Entire scheduler stops (panics are not caught)
  • Use Result types instead of panics

Better approach:

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    match self.do_something() {
        Ok(result) => { /* process */ }
        Err(e) => {
            if let Some(ctx) = ctx {
                ctx.log_error(&format!("Error: {}", e));
            }
        }
    }
}

Shutdown Errors

If shutdown() fails:

fn shutdown(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
    Err("Failed to close connection".to_string())
}

Behavior:

  • Error message printed
  • Shutdown continues for other nodes
  • Scheduler still exits

Performance Monitoring

Accessing Node Metrics

The scheduler tracks metrics for all nodes:

let metrics = scheduler.get_metrics();

for node_metrics in metrics {
    println!("Node: {}", node_metrics.name);
    println!("  Avg tick: {:.2}ms", node_metrics.avg_tick_duration_ms);
    println!("  Total ticks: {}", node_metrics.total_ticks);
    println!("  CPU usage: {:.2}%", node_metrics.cpu_usage_percent);
}

Built-in Logging

NodeInfo automatically logs timing:

[12:34:56.789] [IPC: 296ns | Tick: 12μs] PublisherNode --PUB--> 'cmd_vel' = 1.5

IPC: Inter-process communication latency (Hub send/recv)

Tick: Total tick execution time

Advanced Usage

Dynamic Node Registration

Add nodes at runtime:

let mut scheduler = Scheduler::new();

// Start with basic nodes
scheduler.register(Box::new(node1), 2, Some(true));

// Later, add more nodes
scheduler.register(Box::new(node2), 2, Some(true));

Note: Nodes added before tick_all() will initialize automatically.

Conditional Execution

Skip nodes based on conditions:

impl Node for ConditionalNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if !self.should_run {
            return;  // Skip this tick
        }

        // Execute normally
        self.do_work();
    }
}

Multi-Scheduler Systems

Run multiple independent schedulers:

use std::thread;

fn main() -> HorusResult<()> {
    let mut scheduler1 = Scheduler::new();
    scheduler1.register(Box::new(critical_nodes), 0, Some(true));

    let mut scheduler2 = Scheduler::new();
    scheduler2.register(Box::new(background_nodes), 4, Some(true));

    // Run schedulers in separate threads
    let handle1 = thread::spawn(move || scheduler1.tick_all());
    let handle2 = thread::spawn(move || scheduler2.tick_all());

    handle1.join().unwrap()?;
    handle2.join().unwrap()?;
    Ok(())
}

Use Cases:

  • Separate real-time and non-real-time systems
  • Isolate critical from non-critical nodes
  • Different tick rates for different subsystems

Best Practices

Initialize Heavy Resources in init()

fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
    // Pre-allocate buffers
    self.buffer = vec![0.0; 10000];

    // Open connections
    self.connection = connect_to_hardware()?;

    Ok(())
}

Keep tick() Fast

Each tick should complete in <1ms for 1kHz control:

// GOOD: Fast tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    let data = self.read_sensor();
    self.pub.send(data, ctx).ok();
}

// BAD: Slow tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    std::thread::sleep(Duration::from_millis(100));  // Blocks!
}

Use Appropriate Priorities

Don't make everything Critical:

// GOOD: Appropriate priorities
scheduler.register(Box::new(emergency_stop), 0, Some(true));  // Critical
scheduler.register(Box::new(controller), 1, Some(true));      // High
scheduler.register(Box::new(sensor), 2, Some(true));          // Normal

// BAD: Everything is critical
scheduler.register(Box::new(logger), 0, Some(true));  // Wrong!

Handle Errors Gracefully

Don't panic in production code:

// GOOD: Handle errors
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    match self.operation() {
        Ok(_) => {},
        Err(e) => {
            if let Some(ctx) = ctx {
                ctx.log_error(&format!("Error: {}", e));
            }
        }
    }
}

// BAD: Panic on error
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    self.operation().unwrap();  // Will crash scheduler!
}

Monitor Performance

Track your node metrics:

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    if let Some(ctx) = ctx {
        if ctx.metrics.avg_tick_duration_ms > 1.0 {
            ctx.log_warning("Tick duration exceeding 1ms");
        }
    }
}

Common Patterns

Layered Architecture

Organize nodes by function with appropriate priorities:

// Layer 1: Safety (Critical = 0)
scheduler.register(Box::new(collision_detector), 0, Some(true));
scheduler.register(Box::new(emergency_stop), 0, Some(true));

// Layer 2: Control (High = 1)
scheduler.register(Box::new(pid_controller), 1, Some(true));
scheduler.register(Box::new(motor_driver), 1, Some(true));

// Layer 3: Sensing (Normal = 2)
scheduler.register(Box::new(lidar_node), 2, Some(true));
scheduler.register(Box::new(camera_node), 2, Some(true));
scheduler.register(Box::new(imu_node), 2, Some(true));

// Layer 4: Processing (Low = 3)
scheduler.register(Box::new(path_planner), 3, Some(true));

// Layer 5: Monitoring (Background = 4)
scheduler.register(Box::new(logger), 4, Some(true));
scheduler.register(Box::new(diagnostics), 4, Some(true));

Feedback Loops

Create control loops with pub/sub:

// Sensor (Normal priority = 2)
scheduler.register(Box::new(encoder_sensor), 2, Some(true));  // Publishes position

// Controller (High priority = 1) - runs after sensor
scheduler.register(Box::new(position_controller), 1, Some(true));
// Subscribes to position, publishes velocity command

// Actuator (High priority = 1)
scheduler.register(Box::new(motor_actuator), 1, Some(true));
// Subscribes to velocity command

Pipeline Processing

Chain nodes for data processing:

// Raw sensor (Normal = 2)
scheduler.register(Box::new(raw_sensor), 2, Some(true));  // Publishes raw data

// Filter (Normal = 2)
scheduler.register(Box::new(kalman_filter), 2, Some(true));  // Subscribes to raw, publishes filtered

// Analyzer (Low = 3)
scheduler.register(Box::new(data_analyzer), 3, Some(true));
// Subscribes to filtered data

Troubleshooting

Scheduler Not Stopping

Ensure tick() returns quickly:

// WRONG: Infinite loop in tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    loop {  // Never returns!
        // ...
    }
}

// RIGHT: Return from tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    // Do work
    // Return naturally
}

Nodes Not Running

Check initialization:

fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
    ctx.log_info("Initializing...");
    // If this fails, node won't run
    Ok(())
}

Slow Execution

Profile your nodes:

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    if let Some(ctx) = ctx {
        let avg_ms = ctx.metrics.avg_tick_duration_ms;
        if avg_ms > 1.0 {
            ctx.log_warning(&format!("Slow tick: {:.2}ms", avg_ms));
        }
    }
}

Next Steps