Scheduler API Reference

The Scheduler orchestrates node execution, managing priorities, lifecycle, and the main tick loop.

Creating a Scheduler

Scheduler::new()

Create a new scheduler.

pub fn new() -> Self

Returns: A new Scheduler instance

Example:

use horus::prelude::*;

let mut scheduler = Scheduler::new();

Registering Nodes

register(node, priority, logging)

Register a node with the scheduler.

pub fn register(
    &mut self,
    node: Box<dyn Node>,
    priority: u32,
    logging_enabled: Option<bool>
) -> &mut Self

Parameters:

  • node: Boxed node implementing the Node trait
  • priority: Execution priority (0 = highest, 4 = lowest)
  • logging_enabled: Optional logging flag (Some(true), Some(false), or None for default false)

Returns: Mutable reference to Self for method chaining

Example:

use horus::prelude::*;

let mut scheduler = Scheduler::new();

// Register with logging enabled
scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));

// Register with logging disabled
scheduler.register(Box::new(ProcessorNode::new()?), 1, Some(false));

// Register with default logging (false)
scheduler.register(Box::new(ActuatorNode::new()?), 2, None);

Priority Levels:

  • 0 - Critical (highest priority)
  • 1 - High
  • 2 - Normal (recommended for most nodes)
  • 3 - Low
  • 4 - Background (lowest priority)

Method Chaining:

scheduler
    .register(Box::new(sensor), 0, Some(true))
    .register(Box::new(controller), 1, Some(true))
    .register(Box::new(actuator), 2, Some(true));

name(name)

Set the scheduler name (chainable).

pub fn name(self, name: &str) -> Self

Parameters:

  • name: Name for this scheduler instance

Returns: Self for method chaining

Example:

let mut scheduler = Scheduler::new()
    .name("RobotController");

Running the Scheduler

tick_all()

Run all registered nodes continuously until stopped (Ctrl+C).

pub fn tick_all(&mut self) -> HorusResult<()>

Returns: HorusResult<()> (always Ok(()) when stopped gracefully)

Example:

scheduler.tick_all()?;

Behavior:

  • Initializes all nodes (calls init() once)
  • Runs tick loop at ~60 FPS
  • Executes nodes in priority order each tick
  • Handles Ctrl+C gracefully
  • Calls shutdown() on all nodes before exiting

tick_node(node_names)

Run only specific nodes continuously.

pub fn tick_node(&mut self, node_names: &[&str]) -> HorusResult<()>

Parameters:

  • node_names: Array of node names to run

Returns: HorusResult<()>

Example:

// Run only sensor and processor nodes
scheduler.tick_node(&["SensorNode", "ProcessorNode"])?;

Control Methods

stop()

Stop the scheduler (from another thread).

pub fn stop(&self)

Example:

use std::sync::Arc;

let scheduler = Arc::new(Mutex::new(Scheduler::new()));

// Clone for another thread
let scheduler_clone = scheduler.clone();

// Stop from another thread
std::thread::spawn(move || {
    std::thread::sleep(Duration::from_secs(10));
    scheduler_clone.lock().unwrap().stop();
});

is_running()

Check if the scheduler is currently running.

pub fn is_running(&self) -> bool

Returns: true if scheduler is running, false otherwise

Example:

while scheduler.is_running() {
    // Do something
}

Monitoring Methods

get_node_list()

Get list of all registered node names.

pub fn get_node_list(&self) -> Vec<String>

Returns: Vector of node names

Example:

let nodes = scheduler.get_node_list();
for node_name in nodes {
    println!("Registered node: {}", node_name);
}

get_node_info(name)

Get detailed information about a specific node.

pub fn get_node_info(&self, name: &str) -> Option<HashMap<String, String>>

Parameters:

  • name: Name of the node

Returns: Some(HashMap) with node info, or None if not found

Keys:

  • "name" - Node name
  • "priority" - Execution priority
  • "logging_enabled" - Logging state ("true" or "false")

Example:

if let Some(info) = scheduler.get_node_info("SensorNode") {
    println!("Priority: {}", info.get("priority").unwrap());
    println!("Logging: {}", info.get("logging_enabled").unwrap());
}

get_monitoring_summary()

Get summary of all nodes (name and priority).

pub fn get_monitoring_summary(&self) -> Vec<(String, u32)>

Returns: Vector of (node_name, priority) tuples

Example:

let summary = scheduler.get_monitoring_summary();
for (name, priority) in summary {
    println!("{}: priority {}", name, priority);
}

set_node_logging(name, enabled)

Enable or disable logging for a specific node.

pub fn set_node_logging(&mut self, name: &str, enabled: bool) -> bool

Parameters:

  • name: Node name
  • enabled: true to enable logging, false to disable

Returns: true if node was found, false otherwise

Example:

// Disable logging for SensorNode
scheduler.set_node_logging("SensorNode", false);

// Enable logging for ProcessorNode
scheduler.set_node_logging("ProcessorNode", true);

Complete Example

use horus::prelude::*;

// Define nodes
struct SensorNode {
    data_pub: Hub<f32>,
}

impl SensorNode {
    fn new() -> HorusResult<Self> {
        Ok(Self {
            data_pub: Hub::new("sensor_data")?,
        })
    }
}

impl Node for SensorNode {
    fn name(&self) -> &'static str { "SensorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        self.data_pub.send(42.0, ctx).ok();
    }
}

struct ProcessorNode {
    data_sub: Hub<f32>,
    result_pub: Hub<f32>,
}

impl ProcessorNode {
    fn new() -> HorusResult<Self> {
        Ok(Self {
            data_sub: Hub::new("sensor_data")?,
            result_pub: Hub::new("processed_data")?,
        })
    }
}

impl Node for ProcessorNode {
    fn name(&self) -> &'static str { "ProcessorNode" }

    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            let result = data * 2.0;
            self.result_pub.send(result, ctx).ok();
        }
    }
}

fn main() -> HorusResult<()> {
    let mut scheduler = Scheduler::new()
        .name("ExampleScheduler");

    // Register nodes with different priorities
    scheduler
        .register(Box::new(SensorNode::new()?), 0, Some(true))
        .register(Box::new(ProcessorNode::new()?), 1, Some(true));

    // Run until Ctrl+C
    scheduler.tick_all()?;

    Ok(())
}

Execution Order

Nodes execute in priority order each tick:

Tick 1:
   Node with priority 0 (Critical)
   Node with priority 1 (High)
   Node with priority 2 (Normal)
   Node with priority 3 (Low)
   Node with priority 4 (Background)

Sleep 16ms (~60 FPS)

Tick 2:
   Repeat...

Within the same priority: Nodes execute in registration order.

Signal Handling

The scheduler automatically handles Ctrl+C (SIGINT):

  1. User presses Ctrl+C
  2. Scheduler stops accepting new ticks
  3. Current tick completes
  4. shutdown() called on all nodes
  5. Cleanup and exit

Force termination: Press Ctrl+C a second time for immediate exit.

Performance

Tick Rate

Default: ~60 FPS (16ms sleep between ticks)

Actual rate depends on:

  • Number of nodes
  • Node tick duration
  • System load

Monitor actual rate using node metrics:

let metrics = ctx.metrics();
let actual_hz = 1000.0 / metrics.avg_tick_duration_ms;

Best Practices

Keep ticks fast: Each tick should complete in < 16ms (ideally < 1ms)

Use priorities wisely:

  • Critical (0): Safety, emergency stop
  • High (1): Control loops
  • Normal (2): Most nodes
  • Low (3): Logging, monitoring
  • Background (4): Slow periodic tasks

Profile slow nodes:

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    let start = Instant::now();

    // Do work...

    if let Some(ctx) = ctx {
        let duration = start.elapsed();
        if duration.as_millis() > 10 {
            ctx.log_warning(&format!("Slow tick: {}ms", duration.as_millis()));
        }
    }
}

Registry and Monitoring

The scheduler writes metadata to ~/.horus_registry.json for monitoring tools:

{
  "pid": 12345,
  "scheduler_name": "RobotController",
  "working_dir": "/path/to/project",
  "nodes": [
    {
      "name": "SensorNode",
      "priority": 0,
      "state": "Running",
      "health": "Healthy",
      "publishers": [{"topic": "sensor_data", "type": "f32"}],
      "subscribers": []
    }
  ]
}

This file is:

  • Created when tick_all() starts
  • Updated every 5 seconds with node states
  • Deleted when scheduler stops
  • Used by horus dashboard to monitor the system

Heartbeats

The scheduler writes node heartbeats to /dev/shm/horus/heartbeats/:

$ cat /dev/shm/horus/heartbeats/SensorNode
{
  "state": "Running",
  "health": "Healthy",
  "tick_count": 3600,
  "target_rate_hz": 60,
  "actual_rate_hz": 59,
  "error_count": 0
}

Heartbeats are:

  • Written after each tick
  • Used for real-time monitoring
  • Cleaned up on shutdown

Common Patterns

Simple Application

let mut scheduler = Scheduler::new();
scheduler.register(Box::new(my_node), 2, Some(true));
scheduler.tick_all()?;

Multi-Priority System

let mut scheduler = Scheduler::new();

// Critical safety monitor
scheduler.register(Box::new(safety), 0, Some(true));

// Control loop
scheduler.register(Box::new(controller), 1, Some(true));

// Sensors
scheduler.register(Box::new(lidar), 2, Some(true));
scheduler.register(Box::new(camera), 2, Some(true));

// Logging
scheduler.register(Box::new(logger), 4, Some(false));

scheduler.tick_all()?;

Selective Execution

// Run only specific nodes for testing
scheduler.tick_node(&["SensorNode", "ProcessorNode"])?;

Troubleshooting

Nodes not executing

Check registration:

let nodes = scheduler.get_node_list();
println!("Registered nodes: {:?}", nodes);

High CPU usage

Profile tick durations:

if let Some(info) = scheduler.get_node_info("SlowNode") {
    println!("Node info: {:?}", info);
}

Logs too verbose

Disable logging for specific nodes:

scheduler.set_node_logging("VerboseNode", false);

See Also