Scheduler API Reference
The Scheduler orchestrates node execution, managing priorities, lifecycle, and the main tick loop.
Creating a Scheduler
Scheduler::new()
Create a new scheduler.
pub fn new() -> Self
Returns: A new Scheduler instance
Example:
use horus::prelude::*;
let mut scheduler = Scheduler::new();
Registering Nodes
register(node, priority, logging)
Register a node with the scheduler.
pub fn register(
&mut self,
node: Box<dyn Node>,
priority: u32,
logging_enabled: Option<bool>
) -> &mut Self
Parameters:
node: Boxed node implementing theNodetraitpriority: Execution priority (0 = highest, 4 = lowest)logging_enabled: Optional logging flag (Some(true),Some(false), orNonefor default false)
Returns: Mutable reference to Self for method chaining
Example:
use horus::prelude::*;
let mut scheduler = Scheduler::new();
// Register with logging enabled
scheduler.register(Box::new(SensorNode::new()?), 0, Some(true));
// Register with logging disabled
scheduler.register(Box::new(ProcessorNode::new()?), 1, Some(false));
// Register with default logging (false)
scheduler.register(Box::new(ActuatorNode::new()?), 2, None);
Priority Levels:
0- Critical (highest priority)1- High2- Normal (recommended for most nodes)3- Low4- Background (lowest priority)
Method Chaining:
scheduler
.register(Box::new(sensor), 0, Some(true))
.register(Box::new(controller), 1, Some(true))
.register(Box::new(actuator), 2, Some(true));
name(name)
Set the scheduler name (chainable).
pub fn name(self, name: &str) -> Self
Parameters:
name: Name for this scheduler instance
Returns: Self for method chaining
Example:
let mut scheduler = Scheduler::new()
.name("RobotController");
Running the Scheduler
tick_all()
Run all registered nodes continuously until stopped (Ctrl+C).
pub fn tick_all(&mut self) -> HorusResult<()>
Returns: HorusResult<()> (always Ok(()) when stopped gracefully)
Example:
scheduler.tick_all()?;
Behavior:
- Initializes all nodes (calls
init()once) - Runs tick loop at ~60 FPS
- Executes nodes in priority order each tick
- Handles Ctrl+C gracefully
- Calls
shutdown()on all nodes before exiting
tick_node(node_names)
Run only specific nodes continuously.
pub fn tick_node(&mut self, node_names: &[&str]) -> HorusResult<()>
Parameters:
node_names: Array of node names to run
Returns: HorusResult<()>
Example:
// Run only sensor and processor nodes
scheduler.tick_node(&["SensorNode", "ProcessorNode"])?;
Control Methods
stop()
Stop the scheduler (from another thread).
pub fn stop(&self)
Example:
use std::sync::Arc;
let scheduler = Arc::new(Mutex::new(Scheduler::new()));
// Clone for another thread
let scheduler_clone = scheduler.clone();
// Stop from another thread
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(10));
scheduler_clone.lock().unwrap().stop();
});
is_running()
Check if the scheduler is currently running.
pub fn is_running(&self) -> bool
Returns: true if scheduler is running, false otherwise
Example:
while scheduler.is_running() {
// Do something
}
Monitoring Methods
get_node_list()
Get list of all registered node names.
pub fn get_node_list(&self) -> Vec<String>
Returns: Vector of node names
Example:
let nodes = scheduler.get_node_list();
for node_name in nodes {
println!("Registered node: {}", node_name);
}
get_node_info(name)
Get detailed information about a specific node.
pub fn get_node_info(&self, name: &str) -> Option<HashMap<String, String>>
Parameters:
name: Name of the node
Returns: Some(HashMap) with node info, or None if not found
Keys:
"name"- Node name"priority"- Execution priority"logging_enabled"- Logging state ("true" or "false")
Example:
if let Some(info) = scheduler.get_node_info("SensorNode") {
println!("Priority: {}", info.get("priority").unwrap());
println!("Logging: {}", info.get("logging_enabled").unwrap());
}
get_monitoring_summary()
Get summary of all nodes (name and priority).
pub fn get_monitoring_summary(&self) -> Vec<(String, u32)>
Returns: Vector of (node_name, priority) tuples
Example:
let summary = scheduler.get_monitoring_summary();
for (name, priority) in summary {
println!("{}: priority {}", name, priority);
}
set_node_logging(name, enabled)
Enable or disable logging for a specific node.
pub fn set_node_logging(&mut self, name: &str, enabled: bool) -> bool
Parameters:
name: Node nameenabled:trueto enable logging,falseto disable
Returns: true if node was found, false otherwise
Example:
// Disable logging for SensorNode
scheduler.set_node_logging("SensorNode", false);
// Enable logging for ProcessorNode
scheduler.set_node_logging("ProcessorNode", true);
Complete Example
use horus::prelude::*;
// Define nodes
struct SensorNode {
data_pub: Hub<f32>,
}
impl SensorNode {
fn new() -> HorusResult<Self> {
Ok(Self {
data_pub: Hub::new("sensor_data")?,
})
}
}
impl Node for SensorNode {
fn name(&self) -> &'static str { "SensorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.data_pub.send(42.0, ctx).ok();
}
}
struct ProcessorNode {
data_sub: Hub<f32>,
result_pub: Hub<f32>,
}
impl ProcessorNode {
fn new() -> HorusResult<Self> {
Ok(Self {
data_sub: Hub::new("sensor_data")?,
result_pub: Hub::new("processed_data")?,
})
}
}
impl Node for ProcessorNode {
fn name(&self) -> &'static str { "ProcessorNode" }
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
let result = data * 2.0;
self.result_pub.send(result, ctx).ok();
}
}
}
fn main() -> HorusResult<()> {
let mut scheduler = Scheduler::new()
.name("ExampleScheduler");
// Register nodes with different priorities
scheduler
.register(Box::new(SensorNode::new()?), 0, Some(true))
.register(Box::new(ProcessorNode::new()?), 1, Some(true));
// Run until Ctrl+C
scheduler.tick_all()?;
Ok(())
}
Execution Order
Nodes execute in priority order each tick:
Tick 1:
Node with priority 0 (Critical)
Node with priority 1 (High)
Node with priority 2 (Normal)
Node with priority 3 (Low)
Node with priority 4 (Background)
Sleep 16ms (~60 FPS)
Tick 2:
Repeat...
Within the same priority: Nodes execute in registration order.
Signal Handling
The scheduler automatically handles Ctrl+C (SIGINT):
- User presses Ctrl+C
- Scheduler stops accepting new ticks
- Current tick completes
shutdown()called on all nodes- Cleanup and exit
Force termination: Press Ctrl+C a second time for immediate exit.
Performance
Tick Rate
Default: ~60 FPS (16ms sleep between ticks)
Actual rate depends on:
- Number of nodes
- Node tick duration
- System load
Monitor actual rate using node metrics:
let metrics = ctx.metrics();
let actual_hz = 1000.0 / metrics.avg_tick_duration_ms;
Best Practices
Keep ticks fast: Each tick should complete in < 16ms (ideally < 1ms)
Use priorities wisely:
- Critical (0): Safety, emergency stop
- High (1): Control loops
- Normal (2): Most nodes
- Low (3): Logging, monitoring
- Background (4): Slow periodic tasks
Profile slow nodes:
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let start = Instant::now();
// Do work...
if let Some(ctx) = ctx {
let duration = start.elapsed();
if duration.as_millis() > 10 {
ctx.log_warning(&format!("Slow tick: {}ms", duration.as_millis()));
}
}
}
Registry and Monitoring
The scheduler writes metadata to ~/.horus_registry.json for monitoring tools:
{
"pid": 12345,
"scheduler_name": "RobotController",
"working_dir": "/path/to/project",
"nodes": [
{
"name": "SensorNode",
"priority": 0,
"state": "Running",
"health": "Healthy",
"publishers": [{"topic": "sensor_data", "type": "f32"}],
"subscribers": []
}
]
}
This file is:
- Created when
tick_all()starts - Updated every 5 seconds with node states
- Deleted when scheduler stops
- Used by
horus dashboardto monitor the system
Heartbeats
The scheduler writes node heartbeats to /dev/shm/horus/heartbeats/:
$ cat /dev/shm/horus/heartbeats/SensorNode
{
"state": "Running",
"health": "Healthy",
"tick_count": 3600,
"target_rate_hz": 60,
"actual_rate_hz": 59,
"error_count": 0
}
Heartbeats are:
- Written after each tick
- Used for real-time monitoring
- Cleaned up on shutdown
Common Patterns
Simple Application
let mut scheduler = Scheduler::new();
scheduler.register(Box::new(my_node), 2, Some(true));
scheduler.tick_all()?;
Multi-Priority System
let mut scheduler = Scheduler::new();
// Critical safety monitor
scheduler.register(Box::new(safety), 0, Some(true));
// Control loop
scheduler.register(Box::new(controller), 1, Some(true));
// Sensors
scheduler.register(Box::new(lidar), 2, Some(true));
scheduler.register(Box::new(camera), 2, Some(true));
// Logging
scheduler.register(Box::new(logger), 4, Some(false));
scheduler.tick_all()?;
Selective Execution
// Run only specific nodes for testing
scheduler.tick_node(&["SensorNode", "ProcessorNode"])?;
Troubleshooting
Nodes not executing
Check registration:
let nodes = scheduler.get_node_list();
println!("Registered nodes: {:?}", nodes);
High CPU usage
Profile tick durations:
if let Some(info) = scheduler.get_node_info("SlowNode") {
println!("Node info: {:?}", info);
}
Logs too verbose
Disable logging for specific nodes:
scheduler.set_node_logging("VerboseNode", false);
See Also
- Node API Reference - Node implementation
- Hub API Reference - Pub/sub communication
- Core Concepts: Scheduler - Detailed guide