Scheduler
The Scheduler is the execution orchestrator in HORUS. It manages the node lifecycle, coordinates priority-based execution, and handles graceful shutdown.
What is the Scheduler?
The Scheduler is responsible for:
Node Registration: Registering nodes with the scheduler
Lifecycle Management: Calling init(), tick(), and shutdown() at the right times
Priority-Based Execution: Running nodes in priority order every tick
Signal Handling: Graceful shutdown on Ctrl+C
Performance Monitoring: Tracking execution metrics for all nodes
Error Recovery: Handling node errors without crashing the system
Basic Usage
Creating a Scheduler
use horus::prelude::*;
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Register nodes (priority 0 = highest, logging enabled)
scheduler.register(Box::new(my_node), 0, Some(true));
// Run the scheduler
scheduler.tick_all()?;
Ok(())
}
Registering Nodes
use horus::prelude::*;
let mut scheduler = Scheduler::new();
// Register with priority 2 (Normal - default recommended)
scheduler.register(Box::new(sensor_node), 2, Some(true));
// Register with priority 0 (Critical - highest priority)
scheduler.register(Box::new(safety_node), 0, Some(true));
Running the Scheduler
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new();
// Register your nodes
scheduler.register(Box::new(node1), 0, Some(true));
scheduler.register(Box::new(node2), 1, Some(true));
// Start the main loop
scheduler.tick_all()?;
Ok(())
}
Scheduler Architecture
Internal Structure
pub struct Scheduler {
nodes: Vec<RegisteredNode>,
running: Arc<Mutex<bool>>,
tick_rate_fps: u32,
}
struct RegisteredNode {
node: Box<dyn Node>,
context: NodeInfo,
priority: NodePriority,
}
nodes: Vector of registered nodes with their contexts
running: Atomic flag for graceful shutdown
tick_rate_fps: Target execution rate (default: 60 FPS)
Execution Flow
1. Initialize all nodes (call init())
2. Sort nodes by priority
3. Main loop:
a. For each node (in priority order):
- Start tick timing
- Call node.tick()
- Record tick metrics
b. Sleep to maintain target FPS
4. On shutdown signal:
a. Set running = false
b. Call shutdown() on all nodes
Priority-Based Execution
Priority Levels
pub enum NodePriority {
Critical = 0, // Highest priority - safety systems
High = 1, // Control loops, actuators
Normal = 2, // Default - sensors, processing
Low = 3, // Non-critical computation
Background = 4, // Logging, monitoring
}
Execution Order
Nodes execute in priority order every tick:
let mut scheduler = Scheduler::new();
// Execution order: Safety Controller Sensor Logger
scheduler.register(Box::new(safety_monitor), 0, Some(true)); // Runs 1st (Critical)
scheduler.register(Box::new(controller), 1, Some(true)); // Runs 2nd (High)
scheduler.register(Box::new(sensor), 2, Some(true)); // Runs 3rd (Normal)
scheduler.register(Box::new(logger), 4, Some(true)); // Runs 4th (Background)
scheduler.tick_all()?;
Key Point: Lower numeric value = higher priority (Critical = 0 runs first)
Why Priority Matters
Safety First: Critical nodes (emergency stop, collision detection) run before anything else
Real-Time Guarantees: High-priority control loops execute before low-priority logging
Deterministic Behavior: Same execution order every tick
Resource Management: Important tasks get CPU time first
Priority Best Practices
Critical: Safety monitors, emergency stops, watchdogs
scheduler.register(Box::new(emergency_stop), 0, Some(true));
High: Control loops, actuator commands, time-sensitive operations
scheduler.register(Box::new(motor_controller), 1, Some(true));
Normal: Sensors, filters, state estimation (most nodes)
scheduler.register(Box::new(lidar_node), 2, Some(true));
Low: Non-real-time computation, planning, optimization
scheduler.register(Box::new(path_planner), 3, Some(true));
Background: Logging, diagnostics, data recording
scheduler.register(Box::new(data_logger), 4, Some(true));
Lifecycle Management
Initialization Phase
When you call tick_all(), the scheduler first initializes all nodes:
pub fn tick_all(&mut self) -> HorusResult<()> {
// Initialize all nodes
for registered in self.nodes.iter_mut() {
registered.context.state = NodeState::Initializing;
match registered.node.init(&mut registered.context) {
Ok(()) => {
registered.context.state = NodeState::Running;
}
Err(e) => {
eprintln!("Node {} failed to initialize: {}",
registered.context.name, e);
registered.context.state = NodeState::Error;
}
}
}
// Continue to main loop...
}
Initialization Order: All nodes initialize before the main loop starts
Error Handling: If init() fails, node enters Error state and won't run
State Tracking: NodeState transitions Uninitialized Initializing Running
Main Execution Loop
The main loop runs at a target FPS (default 60):
while self.is_running() {
// Sort by priority each tick
self.nodes.sort_by_key(|r| r.priority);
for registered in self.nodes.iter_mut() {
if registered.context.state != NodeState::Running {
continue; // Skip non-running nodes
}
let context = &mut registered.context;
context.start_tick(); // Start timing
registered.node.tick(Some(context));
context.record_tick(); // Record metrics
}
// Maintain target FPS
std::thread::sleep(Duration::from_millis(16)); // ~60 FPS
}
Sorting: Nodes are sorted by priority every tick
State Check: Only nodes in Running state execute
Timing: Each tick is timed and recorded in metrics
FPS Control: Sleep maintains target execution rate
Shutdown Phase
Graceful shutdown on Ctrl+C:
// Signal handler (set up in tick_all)
ctrlc::set_handler(move || {
eprintln!("\nCtrl+C received! Shutting down HORUS scheduler...");
if let Ok(mut r) = running.lock() {
*r = false; // Stop the main loop
}
}).expect("Error setting HORUS signal handler");
// After main loop exits
for registered in self.nodes.iter_mut() {
registered.context.state = NodeState::Stopping;
match registered.node.shutdown(&mut registered.context) {
Ok(()) => {
registered.context.state = NodeState::Stopped;
}
Err(e) => {
eprintln!("Node {} shutdown error: {}",
registered.context.name, e);
}
}
}
Signal Handling: Ctrl+C triggers graceful shutdown
State Transition: Running Stopping Stopped
Error Tolerance: Shutdown continues even if individual nodes fail
Tick Rate Control
Default Tick Rate
The scheduler runs at 60 FPS (16ms per tick) by default:
let scheduler = Scheduler::new();
// Runs at ~60 FPS (16ms per tick)
Note: The tick rate is currently hardcoded at approximately 60 FPS. The scheduler sleeps for 16ms between tick cycles to maintain this target rate. If your nodes take longer than 16ms to execute, the actual FPS will be lower.
Tick Rate Considerations
60 FPS (16ms): Default tick rate for most robotics applications
- Suitable for sensor reading, control loops, and general robotics tasks
- Provides good balance between responsiveness and CPU usage
Custom tick rates: If you need a different tick rate for your application, you can modify the sleep duration in the scheduler's main loop. However, this is not currently exposed as a public API.
Key Point: If your nodes take longer than the tick period to execute, you won't hit the target FPS. Monitor your node metrics to ensure tick durations stay within acceptable bounds.
Running Modes
Continuous Mode (tick_all)
Run until Ctrl+C:
scheduler.tick_all()?;
This is the most common mode for robotics applications. The scheduler will continuously execute all registered nodes in priority order until a shutdown signal is received.
Node-Specific Execution
Execute specific nodes by name:
scheduler.tick_node(&["SensorNode", "MotorNode"])?;
Useful for:
- Debugging specific nodes in isolation
- Profiling individual nodes
- Testing node behavior independently
- Running only a subset of your system
Signal Handling
Ctrl+C Handling
The scheduler automatically handles Ctrl+C:
// Automatically set up by tick_all()
ctrlc::set_handler(move || {
eprintln!("\nCtrl+C received! Shutting down HORUS scheduler...");
if let Ok(mut r) = running.lock() {
*r = false;
}
}).expect("Error setting HORUS signal handler");
Output:
^C
Ctrl+C received! Shutting down HORUS scheduler...
[Nodes shutting down gracefully...]
Graceful Shutdown
The scheduler ensures clean shutdown:
Stop accepting new ticks: Main loop exits
Call shutdown() on all nodes: Release resources
Wait for cleanup: All nodes shut down before exit
No zombie processes: Shared memory cleaned up
Error Handling
Initialization Errors
If a node fails to initialize:
fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
return Err("Sensor not connected".to_string());
}
Behavior:
- Node enters Error state
- Node will NOT run in main loop
- Other nodes continue normally
- Error message printed to stderr
Runtime Errors
If a node panics during tick():
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
panic!("Something went wrong!");
}
Behavior:
- Entire scheduler stops (panics are not caught)
- Use Result types instead of panics
Better approach:
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
match self.do_something() {
Ok(result) => { /* process */ }
Err(e) => {
if let Some(ctx) = ctx {
ctx.log_error(&format!("Error: {}", e));
}
}
}
}
Shutdown Errors
If shutdown() fails:
fn shutdown(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
Err("Failed to close connection".to_string())
}
Behavior:
- Error message printed
- Shutdown continues for other nodes
- Scheduler still exits
Performance Monitoring
Accessing Node Metrics
The scheduler tracks metrics for all nodes:
let metrics = scheduler.get_metrics();
for node_metrics in metrics {
println!("Node: {}", node_metrics.name);
println!(" Avg tick: {:.2}ms", node_metrics.avg_tick_duration_ms);
println!(" Total ticks: {}", node_metrics.total_ticks);
println!(" CPU usage: {:.2}%", node_metrics.cpu_usage_percent);
}
Built-in Logging
NodeInfo automatically logs timing:
[12:34:56.789] [IPC: 296ns | Tick: 12μs] PublisherNode --PUB--> 'cmd_vel' = 1.5
IPC: Inter-process communication latency (Hub send/recv)
Tick: Total tick execution time
Advanced Usage
Dynamic Node Registration
Add nodes at runtime:
let mut scheduler = Scheduler::new();
// Start with basic nodes
scheduler.register(Box::new(node1), 2, Some(true));
// Later, add more nodes
scheduler.register(Box::new(node2), 2, Some(true));
Note: Nodes added before tick_all() will initialize automatically.
Conditional Execution
Skip nodes based on conditions:
impl Node for ConditionalNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if !self.should_run {
return; // Skip this tick
}
// Execute normally
self.do_work();
}
}
Multi-Scheduler Systems
Run multiple independent schedulers:
use std::thread;
fn main() -> HorusResult<()> {
let mut scheduler1 = Scheduler::new();
scheduler1.register(Box::new(critical_nodes), 0, Some(true));
let mut scheduler2 = Scheduler::new();
scheduler2.register(Box::new(background_nodes), 4, Some(true));
// Run schedulers in separate threads
let handle1 = thread::spawn(move || scheduler1.tick_all());
let handle2 = thread::spawn(move || scheduler2.tick_all());
handle1.join().unwrap()?;
handle2.join().unwrap()?;
Ok(())
}
Use Cases:
- Separate real-time and non-real-time systems
- Isolate critical from non-critical nodes
- Different tick rates for different subsystems
Best Practices
Initialize Heavy Resources in init()
fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
// Pre-allocate buffers
self.buffer = vec![0.0; 10000];
// Open connections
self.connection = connect_to_hardware()?;
Ok(())
}
Keep tick() Fast
Each tick should complete in <1ms for 1kHz control:
// GOOD: Fast tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let data = self.read_sensor();
self.pub.send(data, ctx).ok();
}
// BAD: Slow tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
std::thread::sleep(Duration::from_millis(100)); // Blocks!
}
Use Appropriate Priorities
Don't make everything Critical:
// GOOD: Appropriate priorities
scheduler.register(Box::new(emergency_stop), 0, Some(true)); // Critical
scheduler.register(Box::new(controller), 1, Some(true)); // High
scheduler.register(Box::new(sensor), 2, Some(true)); // Normal
// BAD: Everything is critical
scheduler.register(Box::new(logger), 0, Some(true)); // Wrong!
Handle Errors Gracefully
Don't panic in production code:
// GOOD: Handle errors
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
match self.operation() {
Ok(_) => {},
Err(e) => {
if let Some(ctx) = ctx {
ctx.log_error(&format!("Error: {}", e));
}
}
}
}
// BAD: Panic on error
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.operation().unwrap(); // Will crash scheduler!
}
Monitor Performance
Track your node metrics:
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(ctx) = ctx {
if ctx.metrics.avg_tick_duration_ms > 1.0 {
ctx.log_warning("Tick duration exceeding 1ms");
}
}
}
Common Patterns
Layered Architecture
Organize nodes by function with appropriate priorities:
// Layer 1: Safety (Critical = 0)
scheduler.register(Box::new(collision_detector), 0, Some(true));
scheduler.register(Box::new(emergency_stop), 0, Some(true));
// Layer 2: Control (High = 1)
scheduler.register(Box::new(pid_controller), 1, Some(true));
scheduler.register(Box::new(motor_driver), 1, Some(true));
// Layer 3: Sensing (Normal = 2)
scheduler.register(Box::new(lidar_node), 2, Some(true));
scheduler.register(Box::new(camera_node), 2, Some(true));
scheduler.register(Box::new(imu_node), 2, Some(true));
// Layer 4: Processing (Low = 3)
scheduler.register(Box::new(path_planner), 3, Some(true));
// Layer 5: Monitoring (Background = 4)
scheduler.register(Box::new(logger), 4, Some(true));
scheduler.register(Box::new(diagnostics), 4, Some(true));
Feedback Loops
Create control loops with pub/sub:
// Sensor (Normal priority = 2)
scheduler.register(Box::new(encoder_sensor), 2, Some(true)); // Publishes position
// Controller (High priority = 1) - runs after sensor
scheduler.register(Box::new(position_controller), 1, Some(true));
// Subscribes to position, publishes velocity command
// Actuator (High priority = 1)
scheduler.register(Box::new(motor_actuator), 1, Some(true));
// Subscribes to velocity command
Pipeline Processing
Chain nodes for data processing:
// Raw sensor (Normal = 2)
scheduler.register(Box::new(raw_sensor), 2, Some(true)); // Publishes raw data
// Filter (Normal = 2)
scheduler.register(Box::new(kalman_filter), 2, Some(true)); // Subscribes to raw, publishes filtered
// Analyzer (Low = 3)
scheduler.register(Box::new(data_analyzer), 3, Some(true));
// Subscribes to filtered data
Troubleshooting
Scheduler Not Stopping
Ensure tick() returns quickly:
// WRONG: Infinite loop in tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
loop { // Never returns!
// ...
}
}
// RIGHT: Return from tick
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Do work
// Return naturally
}
Nodes Not Running
Check initialization:
fn init(&mut self, ctx: &mut NodeInfo) -> Result<(), String> {
ctx.log_info("Initializing...");
// If this fails, node won't run
Ok(())
}
Slow Execution
Profile your nodes:
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(ctx) = ctx {
let avg_ms = ctx.metrics.avg_tick_duration_ms;
if avg_ms > 1.0 {
ctx.log_warning(&format!("Slow tick: {:.2}ms", avg_ms));
}
}
}
Next Steps
- Understand Shared Memory internals
- Learn about Message Types for communication
- Explore Examples for complete applications
- Read the API Reference for detailed documentation