Node API

The Node trait is the most fundamental type in HORUS. Every component in your robotics system — sensors, controllers, planners, loggers — implements Node. The scheduler calls your trait methods in a defined lifecycle: init() once at startup, tick() repeatedly, and shutdown() once at exit.

Python: Available via horus.Node(name, tick, rate, pubs, subs). See Python API.

use horus::prelude::*;

Quick Reference — Trait Methods

MethodRequiredDefaultDescription
name(&self) -> &strNoType nameUnique identifier for this node
init(&mut self) -> Result<()>NoOk(())One-time initialization at startup
tick(&mut self)YesMain execution loop, called every cycle
shutdown(&mut self) -> Result<()>NoOk(())Cleanup on exit
is_safe_state(&self) -> boolNotrueSafety monitor checks this for recovery
enter_safe_state(&mut self)Nono-opCalled by watchdog on critical timeout
on_error(&mut self, error: &str)Nologs errorCalled when tick() panics
publishers(&self) -> Vec<TopicMetadata>No[]Internal: topic discovery
subscribers(&self) -> Vec<TopicMetadata>No[]Internal: topic discovery

Quick Reference — Supporting Types

TypePurpose
NodeStateLifecycle state (Uninitialized, Running, Stopped, etc.)
NodeHealthStateWatchdog health (Healthy, Warning, Unhealthy, Isolated)
NodeMetricsPerformance counters (tick times, message counts, errors)
MissDeadline miss policy (Warn, Skip, SafeMode, Stop)
FailurePolicyError recovery strategy (Fatal, Restart, Skip, Ignore)
TopicMetadataTopic name + type name for monitoring

Lifecycle

The scheduler manages Node lifecycle in a strict order:

Construction → Registration → Init → Tick Loop → Shutdown
     you         scheduler     once    repeated      once
  1. Construction — You create your node struct with topics, state, and configuration
  2. Registrationscheduler.add(node).build() validates config and registers the node
  3. Initialization — On first run() or tick_once(), the scheduler calls init() (lazy). Wrapped in catch_unwind for panic safety. After init succeeds, publishers() and subscribers() are called for monitoring
  4. Tick Loop — Each cycle: check health state, feed watchdog, call tick(), measure timing, check budget/deadline. If tick() panics, on_error() is called
  5. Shutdown — On Ctrl+C, SIGTERM, or .stop(): shutdown() called on each node in order. Wrapped in catch_unwind. Timing report printed

Trait Methods

name()

fn name(&self) -> &str

Returns a unique identifier for this node within the scheduler.

Default: Extracts the struct's type name from std::any::type_name::<Self>(), stripping the module path. For my_crate::sensors::ImuReader, the default is "ImuReader".

Override when: You create multiple instances of the same struct, or want a human-readable name for monitoring.

struct ImuReader {
    name: String,
    // ...
}

impl Node for ImuReader {
    fn name(&self) -> &str { &self.name }
    fn tick(&mut self) { /* ... */ }
}

Rules:

  • Must be unique within a scheduler — duplicate names cause a build error
  • Used in horus node list, horus log, horus monitor, and horus blackbox
  • Must be stable across restarts for recording/replay to match

init()

fn init(&mut self) -> Result<()>

Called once when the scheduler first starts (run() or tick_once()). Use for setup that may fail: opening hardware, connecting to networks, loading calibration files.

Default: Returns Ok(()).

When called: Lazily on first run, not at scheduler.add() time. Wrapped in catch_unwind — panics are caught and handled by the FailurePolicy.

On error: The configured FailurePolicy determines behavior. Fatal (default) stops the scheduler. Restart retries with exponential backoff.

impl Node for LidarDriver {
    fn name(&self) -> &str { "lidar" }

    fn init(&mut self) -> Result<()> {
        self.device = SerialPort::open(&self.port)
            .map_err(|e| Error::config(format!("Cannot open {}: {}", self.port, e)))?;
        hlog!(info, "Lidar connected on {}", self.port);
        Ok(())
    }

    fn tick(&mut self) { /* read scans */ }
}

Rules:

  • Do heavy setup here, not in the struct constructor — keeps scheduler.add() fast
  • If init fails, tick() and shutdown() are never called for this node
  • Topics created in the constructor (before init) are valid — SHM is allocated on Topic::new()

tick()

fn tick(&mut self)

The only required method. Called repeatedly by the scheduler at the configured rate. This is your main execution loop — read sensors, compute control, publish results.

The scheduler wraps every tick in timing measurement, profiling, budget checking, and watchdog feeding. If tick() panics, catch_unwind catches it and routes to on_error().

impl Node for MotorController {
    fn name(&self) -> &str { "motor" }

    fn tick(&mut self) {
        if let Some(cmd) = self.cmd_sub.try_recv() {
            let left = cmd.linear - cmd.angular * self.wheel_base / 2.0;
            let right = cmd.linear + cmd.angular * self.wheel_base / 2.0;
            self.motor_pub.send(MotorCommand {
                left_rpm: left * self.rpm_scale,
                right_rpm: right * self.rpm_scale,
            });
        }
    }
}

Critical rules for tick():

  • Never allocate — avoid Vec::new(), String::from(), Box::new() in the hot path. Pre-allocate in init() or the constructor
  • Never block on I/O — use .try_recv(), not .recv_blocking(). Move I/O to async_io() nodes
  • Never panic on recoverable errors — use if let Some / match, not .unwrap()
  • Never call std::thread::sleep() — the scheduler handles timing
  • Keep it fast — target <10μs for RT nodes. Use --release for accurate measurement

shutdown()

fn shutdown(&mut self) -> Result<()>

Called once when the scheduler stops (Ctrl+C, SIGTERM, .stop(), or scope exit). Use for cleanup: zeroing motors, closing files, releasing hardware.

Default: Returns Ok(()).

When called: Nodes shut down in registration order. Each call is wrapped in catch_unwind — a panicking shutdown does not prevent other nodes from shutting down.

impl Node for MotorController {
    fn name(&self) -> &str { "motor" }
    fn tick(&mut self) { /* ... */ }

    fn shutdown(&mut self) -> Result<()> {
        // CRITICAL: Zero motors before exit
        self.motor_pub.send(MotorCommand {
            left_rpm: 0.0,
            right_rpm: 0.0,
        });
        hlog!(info, "Motors zeroed");
        Ok(())
    }
}

Rules:

  • Always zero actuators — if your node controls motors, servos, or any physical output, send a zero/stop command in shutdown
  • Never panic — a panic in shutdown is caught but prevents clean resource release
  • Don't assume ordering — while nodes shut down in registration order, another node's shutdown may have already released a shared resource
  • Keep it fast — the scheduler has a 3-second timeout before detaching

is_safe_state()

fn is_safe_state(&self) -> bool

Called by the safety monitor each tick for nodes in Isolated health state. If this returns true, the node transitions back to Healthy and resumes normal ticking.

Default: Returns true (node is always considered safe).

When to override: Implement for nodes that need recovery verification — check that sensors are reading valid data, that actuators are in a known position, or that communication is restored.

impl Node for ArmController {
    fn name(&self) -> &str { "arm" }
    fn tick(&mut self) { /* ... */ }

    fn is_safe_state(&self) -> bool {
        // Only resume if all joints are within limits
        self.joints.iter().all(|j| j.position.abs() < j.limit)
    }

    fn enter_safe_state(&mut self) {
        // Freeze all joints
        for joint in &mut self.joints {
            joint.target_velocity = 0.0;
        }
    }
}

Rules:

  • Called every tick while the node is Isolated — keep it fast (no I/O)
  • Returning false indefinitely keeps the node isolated forever
  • The scheduler never calls tick() on an Isolated node until is_safe_state() returns true

enter_safe_state()

fn enter_safe_state(&mut self)

Called by the watchdog when a node reaches Critical severity (3x timeout elapsed) or when a Miss::SafeMode deadline miss fires. Transition to a safe configuration — stop motors, hold position, reduce rate.

Default: No-op.

When to override: Any node that controls physical actuators or makes safety-critical decisions.

fn enter_safe_state(&mut self) {
    self.motor_pub.send(MotorCommand::zero());
    self.in_safe_mode = true;
    hlog!(warn, "Entered safe state — motors zeroed");
}

Rules:

  • Must be fast — called from the watchdog path, not the normal tick path
  • After this is called, the node's health transitions to Isolated
  • The scheduler calls is_safe_state() each tick to check for recovery
  • If you set on_miss(Miss::SafeMode), this is called on every deadline miss

on_error()

fn on_error(&mut self, error: &str)

Called when tick() panics. The panic is caught by catch_unwind, and the panic message is passed as error. Override to add custom recovery logic, telemetry, or graceful degradation.

Default: Logs "Node error: {error}" via hlog!(error, ...).

fn on_error(&mut self, error: &str) {
    self.error_count += 1;
    hlog!(error, "Tick failed ({}x): {}", self.error_count, error);
    if self.error_count > 10 {
        self.motor_pub.send(MotorCommand::zero());
    }
}

Rules:

  • Called after catch_unwind — the panic has been absorbed, the scheduler continues
  • The FailurePolicy determines what happens next (retry, skip, fatal stop)
  • Do not panic in on_error() — it would cause a double-panic

publishers() and subscribers()

fn publishers(&self) -> Vec<TopicMetadata>   // #[doc(hidden)]
fn subscribers(&self) -> Vec<TopicMetadata>  // #[doc(hidden)]

Internal methods called after init() succeeds. Used by the monitoring system, recording/replay, and the SHM presence system to track which nodes are connected to which topics.

Default: Empty Vec::new().

You typically don't override these. The node! macro auto-generates them. If you implement Node manually and want monitoring to show your topic connections, return the metadata:

fn publishers(&self) -> Vec<TopicMetadata> {
    vec![TopicMetadata {
        topic_name: "motor.cmd".to_string(),
        type_name: "MotorCommand".to_string(),
    }]
}

Supporting Types

TopicMetadata

pub struct TopicMetadata {
    pub topic_name: String,
    pub type_name: String,
}

Describes a topic connection for monitoring. Used by publishers() and subscribers().

NodeState

Lifecycle state of a node, tracked by the scheduler.

VariantDescription
UninitializedRegistered but init() not yet called
Initializinginit() is running
RunningNormal operation — tick() being called
Stoppingshutdown() is running
StoppedShutdown complete
Error(String)init() or tick() returned an error
Crashed(String)tick() panicked

NodeHealthState

Watchdog health, stored as AtomicU8 for lock-free per-node tracking.

VariantValueTriggerBehavior
Healthy0Normal operationNode is ticked every cycle
Warning11x watchdog timeoutNode still ticks, warning logged
Unhealthy22x watchdog timeoutNode skipped in tick loop
Isolated33x timeout on critical nodeenter_safe_state() called, node skipped until is_safe_state() returns true

NodeMetrics

Performance counters collected by the scheduler. Access via scheduler.get_node_metrics("name").

FieldTypeDescription
nameStringNode name
orderu32Execution order
total_ticksu64Total tick count
successful_ticksu64Ticks completed without error
failed_ticksu64Ticks that panicked or errored
avg_tick_duration_msf64Running average tick time
max_tick_duration_msf64Worst-case tick time
min_tick_duration_msf64Best-case tick time
last_tick_duration_msf64Most recent tick time
messages_sentu64Total messages published
messages_receivedu64Total messages received
errors_countu64Total errors
warnings_countu64Total warnings
uptime_secondsf64Time since init

Miss

Deadline miss policy, set via .on_miss() on the node builder.

VariantBehavior
Warn (default)Log a warning, continue normally
SkipSkip this tick, resume next cycle
SafeModeCall enter_safe_state(), check is_safe_state() for recovery
StopStop the entire scheduler (last resort)

FailurePolicy

Error recovery strategy, set via .failure_policy() on the node builder.

VariantFieldsBehavior
FatalNode failure stops the scheduler immediately
Restartmax_restarts: u32, initial_backoff: DurationRe-initialize with exponential backoff. Escalates to fatal after max restarts
Skipmax_failures: u32, cooldown: DurationTolerate failures with cooldown. After max consecutive failures, skip until cooldown expires
IgnoreLog and continue — node keeps ticking regardless of errors

Production Patterns

Standard Sensor Node

A complete sensor node with topics, error handling, and clean shutdown:

use horus::prelude::*;

struct LidarNode {
    scan_pub: Topic<LaserScan>,
    device: Option<SerialPort>,
    port: String,
}

impl LidarNode {
    fn new(port: &str) -> Result<Self> {
        Ok(Self {
            scan_pub: Topic::new("scan")?,
            device: None,
            port: port.to_string(),
        })
    }
}

impl Node for LidarNode {
    fn name(&self) -> &str { "lidar" }

    fn init(&mut self) -> Result<()> {
        self.device = Some(SerialPort::open(&self.port)?);
        hlog!(info, "Lidar connected on {}", self.port);
        Ok(())
    }

    fn tick(&mut self) {
        if let Some(ref mut dev) = self.device {
            if let Ok(scan) = dev.read_scan() {
                self.scan_pub.send(scan);
            }
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        if let Some(ref mut dev) = self.device {
            dev.stop_motor()?;
        }
        hlog!(info, "Lidar stopped");
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut sched = Scheduler::new().tick_rate(10_u64.hz());
    sched.add(LidarNode::new("/dev/ttyUSB0")?)
        .order(0)
        .failure_policy(FailurePolicy::Restart {
            max_restarts: 3,
            initial_backoff: 1_u64.ms().into(),
        })
        .build()?;
    sched.run()
}

Safety-Critical Node

A motor controller with full safety integration:

use horus::prelude::*;

struct MotorController {
    cmd_sub: Topic<CmdVel>,
    motor_pub: Topic<MotorCommand>,
    safe_mode: bool,
    wheel_base: f32,
}

impl MotorController {
    fn new() -> Result<Self> {
        Ok(Self {
            cmd_sub: Topic::new("cmd_vel")?,
            motor_pub: Topic::new("motor.cmd")?,
            safe_mode: false,
            wheel_base: 0.3,
        })
    }
}

impl Node for MotorController {
    fn name(&self) -> &str { "motor_controller" }

    fn tick(&mut self) {
        if self.safe_mode { return; }
        if let Some(cmd) = self.cmd_sub.try_recv() {
            self.motor_pub.send(MotorCommand {
                left_rpm: (cmd.linear - cmd.angular * self.wheel_base / 2.0) * 100.0,
                right_rpm: (cmd.linear + cmd.angular * self.wheel_base / 2.0) * 100.0,
            });
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        self.motor_pub.send(MotorCommand { left_rpm: 0.0, right_rpm: 0.0 });
        Ok(())
    }

    fn enter_safe_state(&mut self) {
        self.motor_pub.send(MotorCommand { left_rpm: 0.0, right_rpm: 0.0 });
        self.safe_mode = true;
        hlog!(warn, "Safe state — motors zeroed");
    }

    fn is_safe_state(&self) -> bool {
        // Check that motors have actually stopped
        !self.safe_mode || true // simplified — real impl checks encoder feedback
    }

    fn on_error(&mut self, error: &str) {
        hlog!(error, "Motor error: {}", error);
        self.enter_safe_state();
    }
}

fn main() -> Result<()> {
    let mut sched = Scheduler::new()
        .tick_rate(100_u64.hz())
        .watchdog(500_u64.ms().into());

    sched.add(MotorController::new()?)
        .order(1)
        .rate(100_u64.hz())
        .budget(200_u64.us().into())
        .deadline(900_u64.us().into())
        .on_miss(Miss::SafeMode)
        .build()?;

    sched.run()
}

Design Decisions

Why Send but not Sync? Nodes are moved to the scheduler, which may run them on dedicated RT threads. Send is required for this transfer. But nodes are never accessed from multiple threads simultaneously — the scheduler owns exclusive access during tick — so Sync is unnecessary. This lets nodes contain RefCell, UnsafeCell, or any non-Sync state.

Why is tick() the only required method? A minimal node just processes data each cycle. Everything else — init, shutdown, safety, monitoring — has sensible defaults. This keeps the "hello world" implementation to 4 lines while allowing full lifecycle control for production systems.

Why lazy initialization? init() is called on first run(), not at scheduler.add() time. This lets you build the full node graph before any hardware is opened or resources are allocated. It also means tick_once() (used in testing) triggers init on first call, making tests self-contained.

Why catch_unwind on every lifecycle method? A panicking node must not crash the entire robot. The scheduler catches panics in init(), tick(), and shutdown(), routing them through FailurePolicy. This is defense-in-depth — you should still avoid panics, but the system survives them.

Why are publishers() and subscribers() #[doc(hidden)]? These are internal to the scheduler's monitoring and recording systems. Most users never override them — the node! macro generates them automatically. Hiding them reduces API noise while keeping them available for advanced use.


Trade-offs

ChoiceBenefitCost
Trait-based (not struct-based)Full control over state, zero overheadMore boilerplate than the node! macro
&mut self on tick (not a context param)Direct field access, no borrowing gymnasticsNode must own its topics and state
Default is_safe_state() = trueNodes work without safety configUnsafe nodes silently recover from isolation
catch_unwind on every methodSystem survives panics~5ns overhead per tick, hides bugs if relied upon
Single tick() entry pointSimple mental modelComplex nodes need internal state machines

See Also

  • Node Concepts — Usage patterns, lifecycle diagram, communication patterns
  • node! Macro — Reduce boilerplate with declarative node definition
  • Scheduler API — Node registration, execution classes, runtime management
  • Safety Monitor — Graduated watchdog, deadline enforcement, BlackBox
  • Topic API — Publishing and subscribing to messages
  • Macros Referencemessage!, node!, hlog!, and more