Node API
The Node trait is the most fundamental type in HORUS. Every component in your robotics system — sensors, controllers, planners, loggers — implements Node. The scheduler calls your trait methods in a defined lifecycle: init() once at startup, tick() repeatedly, and shutdown() once at exit.
Python: Available via
horus.Node(name, tick, rate, pubs, subs). See Python API.
use horus::prelude::*;
Quick Reference — Trait Methods
| Method | Required | Default | Description |
|---|---|---|---|
name(&self) -> &str | No | Type name | Unique identifier for this node |
init(&mut self) -> Result<()> | No | Ok(()) | One-time initialization at startup |
tick(&mut self) | Yes | — | Main execution loop, called every cycle |
shutdown(&mut self) -> Result<()> | No | Ok(()) | Cleanup on exit |
is_safe_state(&self) -> bool | No | true | Safety monitor checks this for recovery |
enter_safe_state(&mut self) | No | no-op | Called by watchdog on critical timeout |
on_error(&mut self, error: &str) | No | logs error | Called when tick() panics |
publishers(&self) -> Vec<TopicMetadata> | No | [] | Internal: topic discovery |
subscribers(&self) -> Vec<TopicMetadata> | No | [] | Internal: topic discovery |
Quick Reference — Supporting Types
| Type | Purpose |
|---|---|
NodeState | Lifecycle state (Uninitialized, Running, Stopped, etc.) |
NodeHealthState | Watchdog health (Healthy, Warning, Unhealthy, Isolated) |
NodeMetrics | Performance counters (tick times, message counts, errors) |
Miss | Deadline miss policy (Warn, Skip, SafeMode, Stop) |
FailurePolicy | Error recovery strategy (Fatal, Restart, Skip, Ignore) |
TopicMetadata | Topic name + type name for monitoring |
Lifecycle
The scheduler manages Node lifecycle in a strict order:
Construction → Registration → Init → Tick Loop → Shutdown
you scheduler once repeated once
- Construction — You create your node struct with topics, state, and configuration
- Registration —
scheduler.add(node).build()validates config and registers the node - Initialization — On first
run()ortick_once(), the scheduler callsinit()(lazy). Wrapped incatch_unwindfor panic safety. After init succeeds,publishers()andsubscribers()are called for monitoring - Tick Loop — Each cycle: check health state, feed watchdog, call
tick(), measure timing, check budget/deadline. Iftick()panics,on_error()is called - Shutdown — On Ctrl+C, SIGTERM, or
.stop():shutdown()called on each node in order. Wrapped incatch_unwind. Timing report printed
Trait Methods
name()
fn name(&self) -> &str
Returns a unique identifier for this node within the scheduler.
Default: Extracts the struct's type name from std::any::type_name::<Self>(), stripping the module path. For my_crate::sensors::ImuReader, the default is "ImuReader".
Override when: You create multiple instances of the same struct, or want a human-readable name for monitoring.
struct ImuReader {
name: String,
// ...
}
impl Node for ImuReader {
fn name(&self) -> &str { &self.name }
fn tick(&mut self) { /* ... */ }
}
Rules:
- Must be unique within a scheduler — duplicate names cause a build error
- Used in
horus node list,horus log,horus monitor, andhorus blackbox - Must be stable across restarts for recording/replay to match
init()
fn init(&mut self) -> Result<()>
Called once when the scheduler first starts (run() or tick_once()). Use for setup that may fail: opening hardware, connecting to networks, loading calibration files.
Default: Returns Ok(()).
When called: Lazily on first run, not at scheduler.add() time. Wrapped in catch_unwind — panics are caught and handled by the FailurePolicy.
On error: The configured FailurePolicy determines behavior. Fatal (default) stops the scheduler. Restart retries with exponential backoff.
impl Node for LidarDriver {
fn name(&self) -> &str { "lidar" }
fn init(&mut self) -> Result<()> {
self.device = SerialPort::open(&self.port)
.map_err(|e| Error::config(format!("Cannot open {}: {}", self.port, e)))?;
hlog!(info, "Lidar connected on {}", self.port);
Ok(())
}
fn tick(&mut self) { /* read scans */ }
}
Rules:
- Do heavy setup here, not in the struct constructor — keeps
scheduler.add()fast - If init fails,
tick()andshutdown()are never called for this node - Topics created in the constructor (before init) are valid — SHM is allocated on
Topic::new()
tick()
fn tick(&mut self)
The only required method. Called repeatedly by the scheduler at the configured rate. This is your main execution loop — read sensors, compute control, publish results.
The scheduler wraps every tick in timing measurement, profiling, budget checking, and watchdog feeding. If tick() panics, catch_unwind catches it and routes to on_error().
impl Node for MotorController {
fn name(&self) -> &str { "motor" }
fn tick(&mut self) {
if let Some(cmd) = self.cmd_sub.try_recv() {
let left = cmd.linear - cmd.angular * self.wheel_base / 2.0;
let right = cmd.linear + cmd.angular * self.wheel_base / 2.0;
self.motor_pub.send(MotorCommand {
left_rpm: left * self.rpm_scale,
right_rpm: right * self.rpm_scale,
});
}
}
}
Critical rules for tick():
- Never allocate — avoid
Vec::new(),String::from(),Box::new()in the hot path. Pre-allocate ininit()or the constructor - Never block on I/O — use
.try_recv(), not.recv_blocking(). Move I/O toasync_io()nodes - Never panic on recoverable errors — use
if let Some/match, not.unwrap() - Never call
std::thread::sleep()— the scheduler handles timing - Keep it fast — target <10μs for RT nodes. Use
--releasefor accurate measurement
shutdown()
fn shutdown(&mut self) -> Result<()>
Called once when the scheduler stops (Ctrl+C, SIGTERM, .stop(), or scope exit). Use for cleanup: zeroing motors, closing files, releasing hardware.
Default: Returns Ok(()).
When called: Nodes shut down in registration order. Each call is wrapped in catch_unwind — a panicking shutdown does not prevent other nodes from shutting down.
impl Node for MotorController {
fn name(&self) -> &str { "motor" }
fn tick(&mut self) { /* ... */ }
fn shutdown(&mut self) -> Result<()> {
// CRITICAL: Zero motors before exit
self.motor_pub.send(MotorCommand {
left_rpm: 0.0,
right_rpm: 0.0,
});
hlog!(info, "Motors zeroed");
Ok(())
}
}
Rules:
- Always zero actuators — if your node controls motors, servos, or any physical output, send a zero/stop command in shutdown
- Never panic — a panic in shutdown is caught but prevents clean resource release
- Don't assume ordering — while nodes shut down in registration order, another node's shutdown may have already released a shared resource
- Keep it fast — the scheduler has a 3-second timeout before detaching
is_safe_state()
fn is_safe_state(&self) -> bool
Called by the safety monitor each tick for nodes in Isolated health state. If this returns true, the node transitions back to Healthy and resumes normal ticking.
Default: Returns true (node is always considered safe).
When to override: Implement for nodes that need recovery verification — check that sensors are reading valid data, that actuators are in a known position, or that communication is restored.
impl Node for ArmController {
fn name(&self) -> &str { "arm" }
fn tick(&mut self) { /* ... */ }
fn is_safe_state(&self) -> bool {
// Only resume if all joints are within limits
self.joints.iter().all(|j| j.position.abs() < j.limit)
}
fn enter_safe_state(&mut self) {
// Freeze all joints
for joint in &mut self.joints {
joint.target_velocity = 0.0;
}
}
}
Rules:
- Called every tick while the node is
Isolated— keep it fast (no I/O) - Returning
falseindefinitely keeps the node isolated forever - The scheduler never calls
tick()on an Isolated node untilis_safe_state()returnstrue
enter_safe_state()
fn enter_safe_state(&mut self)
Called by the watchdog when a node reaches Critical severity (3x timeout elapsed) or when a Miss::SafeMode deadline miss fires. Transition to a safe configuration — stop motors, hold position, reduce rate.
Default: No-op.
When to override: Any node that controls physical actuators or makes safety-critical decisions.
fn enter_safe_state(&mut self) {
self.motor_pub.send(MotorCommand::zero());
self.in_safe_mode = true;
hlog!(warn, "Entered safe state — motors zeroed");
}
Rules:
- Must be fast — called from the watchdog path, not the normal tick path
- After this is called, the node's health transitions to
Isolated - The scheduler calls
is_safe_state()each tick to check for recovery - If you set
on_miss(Miss::SafeMode), this is called on every deadline miss
on_error()
fn on_error(&mut self, error: &str)
Called when tick() panics. The panic is caught by catch_unwind, and the panic message is passed as error. Override to add custom recovery logic, telemetry, or graceful degradation.
Default: Logs "Node error: {error}" via hlog!(error, ...).
fn on_error(&mut self, error: &str) {
self.error_count += 1;
hlog!(error, "Tick failed ({}x): {}", self.error_count, error);
if self.error_count > 10 {
self.motor_pub.send(MotorCommand::zero());
}
}
Rules:
- Called after
catch_unwind— the panic has been absorbed, the scheduler continues - The
FailurePolicydetermines what happens next (retry, skip, fatal stop) - Do not panic in
on_error()— it would cause a double-panic
publishers() and subscribers()
fn publishers(&self) -> Vec<TopicMetadata> // #[doc(hidden)]
fn subscribers(&self) -> Vec<TopicMetadata> // #[doc(hidden)]
Internal methods called after init() succeeds. Used by the monitoring system, recording/replay, and the SHM presence system to track which nodes are connected to which topics.
Default: Empty Vec::new().
You typically don't override these. The node! macro auto-generates them. If you implement Node manually and want monitoring to show your topic connections, return the metadata:
fn publishers(&self) -> Vec<TopicMetadata> {
vec![TopicMetadata {
topic_name: "motor.cmd".to_string(),
type_name: "MotorCommand".to_string(),
}]
}
Supporting Types
TopicMetadata
pub struct TopicMetadata {
pub topic_name: String,
pub type_name: String,
}
Describes a topic connection for monitoring. Used by publishers() and subscribers().
NodeState
Lifecycle state of a node, tracked by the scheduler.
| Variant | Description |
|---|---|
Uninitialized | Registered but init() not yet called |
Initializing | init() is running |
Running | Normal operation — tick() being called |
Stopping | shutdown() is running |
Stopped | Shutdown complete |
Error(String) | init() or tick() returned an error |
Crashed(String) | tick() panicked |
NodeHealthState
Watchdog health, stored as AtomicU8 for lock-free per-node tracking.
| Variant | Value | Trigger | Behavior |
|---|---|---|---|
Healthy | 0 | Normal operation | Node is ticked every cycle |
Warning | 1 | 1x watchdog timeout | Node still ticks, warning logged |
Unhealthy | 2 | 2x watchdog timeout | Node skipped in tick loop |
Isolated | 3 | 3x timeout on critical node | enter_safe_state() called, node skipped until is_safe_state() returns true |
NodeMetrics
Performance counters collected by the scheduler. Access via scheduler.get_node_metrics("name").
| Field | Type | Description |
|---|---|---|
name | String | Node name |
order | u32 | Execution order |
total_ticks | u64 | Total tick count |
successful_ticks | u64 | Ticks completed without error |
failed_ticks | u64 | Ticks that panicked or errored |
avg_tick_duration_ms | f64 | Running average tick time |
max_tick_duration_ms | f64 | Worst-case tick time |
min_tick_duration_ms | f64 | Best-case tick time |
last_tick_duration_ms | f64 | Most recent tick time |
messages_sent | u64 | Total messages published |
messages_received | u64 | Total messages received |
errors_count | u64 | Total errors |
warnings_count | u64 | Total warnings |
uptime_seconds | f64 | Time since init |
Miss
Deadline miss policy, set via .on_miss() on the node builder.
| Variant | Behavior |
|---|---|
Warn (default) | Log a warning, continue normally |
Skip | Skip this tick, resume next cycle |
SafeMode | Call enter_safe_state(), check is_safe_state() for recovery |
Stop | Stop the entire scheduler (last resort) |
FailurePolicy
Error recovery strategy, set via .failure_policy() on the node builder.
| Variant | Fields | Behavior |
|---|---|---|
Fatal | — | Node failure stops the scheduler immediately |
Restart | max_restarts: u32, initial_backoff: Duration | Re-initialize with exponential backoff. Escalates to fatal after max restarts |
Skip | max_failures: u32, cooldown: Duration | Tolerate failures with cooldown. After max consecutive failures, skip until cooldown expires |
Ignore | — | Log and continue — node keeps ticking regardless of errors |
Production Patterns
Standard Sensor Node
A complete sensor node with topics, error handling, and clean shutdown:
use horus::prelude::*;
struct LidarNode {
scan_pub: Topic<LaserScan>,
device: Option<SerialPort>,
port: String,
}
impl LidarNode {
fn new(port: &str) -> Result<Self> {
Ok(Self {
scan_pub: Topic::new("scan")?,
device: None,
port: port.to_string(),
})
}
}
impl Node for LidarNode {
fn name(&self) -> &str { "lidar" }
fn init(&mut self) -> Result<()> {
self.device = Some(SerialPort::open(&self.port)?);
hlog!(info, "Lidar connected on {}", self.port);
Ok(())
}
fn tick(&mut self) {
if let Some(ref mut dev) = self.device {
if let Ok(scan) = dev.read_scan() {
self.scan_pub.send(scan);
}
}
}
fn shutdown(&mut self) -> Result<()> {
if let Some(ref mut dev) = self.device {
dev.stop_motor()?;
}
hlog!(info, "Lidar stopped");
Ok(())
}
}
fn main() -> Result<()> {
let mut sched = Scheduler::new().tick_rate(10_u64.hz());
sched.add(LidarNode::new("/dev/ttyUSB0")?)
.order(0)
.failure_policy(FailurePolicy::Restart {
max_restarts: 3,
initial_backoff: 1_u64.ms().into(),
})
.build()?;
sched.run()
}
Safety-Critical Node
A motor controller with full safety integration:
use horus::prelude::*;
struct MotorController {
cmd_sub: Topic<CmdVel>,
motor_pub: Topic<MotorCommand>,
safe_mode: bool,
wheel_base: f32,
}
impl MotorController {
fn new() -> Result<Self> {
Ok(Self {
cmd_sub: Topic::new("cmd_vel")?,
motor_pub: Topic::new("motor.cmd")?,
safe_mode: false,
wheel_base: 0.3,
})
}
}
impl Node for MotorController {
fn name(&self) -> &str { "motor_controller" }
fn tick(&mut self) {
if self.safe_mode { return; }
if let Some(cmd) = self.cmd_sub.try_recv() {
self.motor_pub.send(MotorCommand {
left_rpm: (cmd.linear - cmd.angular * self.wheel_base / 2.0) * 100.0,
right_rpm: (cmd.linear + cmd.angular * self.wheel_base / 2.0) * 100.0,
});
}
}
fn shutdown(&mut self) -> Result<()> {
self.motor_pub.send(MotorCommand { left_rpm: 0.0, right_rpm: 0.0 });
Ok(())
}
fn enter_safe_state(&mut self) {
self.motor_pub.send(MotorCommand { left_rpm: 0.0, right_rpm: 0.0 });
self.safe_mode = true;
hlog!(warn, "Safe state — motors zeroed");
}
fn is_safe_state(&self) -> bool {
// Check that motors have actually stopped
!self.safe_mode || true // simplified — real impl checks encoder feedback
}
fn on_error(&mut self, error: &str) {
hlog!(error, "Motor error: {}", error);
self.enter_safe_state();
}
}
fn main() -> Result<()> {
let mut sched = Scheduler::new()
.tick_rate(100_u64.hz())
.watchdog(500_u64.ms().into());
sched.add(MotorController::new()?)
.order(1)
.rate(100_u64.hz())
.budget(200_u64.us().into())
.deadline(900_u64.us().into())
.on_miss(Miss::SafeMode)
.build()?;
sched.run()
}
Design Decisions
Why Send but not Sync? Nodes are moved to the scheduler, which may run them on dedicated RT threads. Send is required for this transfer. But nodes are never accessed from multiple threads simultaneously — the scheduler owns exclusive access during tick — so Sync is unnecessary. This lets nodes contain RefCell, UnsafeCell, or any non-Sync state.
Why is tick() the only required method? A minimal node just processes data each cycle. Everything else — init, shutdown, safety, monitoring — has sensible defaults. This keeps the "hello world" implementation to 4 lines while allowing full lifecycle control for production systems.
Why lazy initialization? init() is called on first run(), not at scheduler.add() time. This lets you build the full node graph before any hardware is opened or resources are allocated. It also means tick_once() (used in testing) triggers init on first call, making tests self-contained.
Why catch_unwind on every lifecycle method? A panicking node must not crash the entire robot. The scheduler catches panics in init(), tick(), and shutdown(), routing them through FailurePolicy. This is defense-in-depth — you should still avoid panics, but the system survives them.
Why are publishers() and subscribers() #[doc(hidden)]? These are internal to the scheduler's monitoring and recording systems. Most users never override them — the node! macro generates them automatically. Hiding them reduces API noise while keeping them available for advanced use.
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
| Trait-based (not struct-based) | Full control over state, zero overhead | More boilerplate than the node! macro |
&mut self on tick (not a context param) | Direct field access, no borrowing gymnastics | Node must own its topics and state |
Default is_safe_state() = true | Nodes work without safety config | Unsafe nodes silently recover from isolation |
catch_unwind on every method | System survives panics | ~5ns overhead per tick, hides bugs if relied upon |
Single tick() entry point | Simple mental model | Complex nodes need internal state machines |
See Also
- Node Concepts — Usage patterns, lifecycle diagram, communication patterns
- node! Macro — Reduce boilerplate with declarative node definition
- Scheduler API — Node registration, execution classes, runtime management
- Safety Monitor — Graduated watchdog, deadline enforcement, BlackBox
- Topic API — Publishing and subscribing to messages
- Macros Reference —
message!,node!,hlog!, and more