Node API
The Node class is the primary building block of a HORUS application. Every component — sensors, controllers, planners, loggers — is a Node. All configuration happens in the constructor via kwargs — no class inheritance needed.
Rust: Available via
impl Node for MyStruct. See Rust Node API.
import horus
def my_tick(node):
node.send("heartbeat", {"alive": True})
node = horus.Node(name="pinger", pubs=["heartbeat"], tick=my_tick, rate=1)
horus.run(node)
Constructor
horus.Node(
name="my_node", # Optional: auto-generated UUID if None
tick=my_tick_fn, # Required for useful nodes: tick(node) or async def tick(node)
rate=30, # Hz (must be positive)
pubs=["cmd_vel"], # Topics this node publishes to
subs=["scan"], # Topics this node subscribes to
init=my_init_fn, # Optional: init(node), called once at startup
shutdown=my_shutdown_fn, # Optional: shutdown(node), called once at exit
on_error=my_error_fn, # Optional: on_error(node, exception)
order=100, # Execution priority (lower = earlier)
budget=300 * horus.us, # Max tick time (seconds, use horus.us/horus.ms)
deadline=900 * horus.us, # Hard deadline per tick
on_miss="warn", # "warn", "skip", "safe_mode", "stop"
failure_policy="fatal", # "fatal", "restart", "skip", "ignore"
compute=False, # Run on thread pool (CPU-bound)
on="scan", # Event-driven: tick only when topic receives data
priority=50, # OS scheduling priority (1-99, requires RT)
core=0, # Pin to CPU core
watchdog=0.5, # Per-node watchdog timeout (seconds)
default_capacity=1024, # Ring buffer capacity for auto-created topics
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str or None | UUID | Unique node identifier |
tick | Callable[[Node], None] | None | Main loop function. Can be async def |
rate | float | 30 | Tick rate in Hz. Must be positive |
pubs | list, str, dict, or None | None | Publisher topic specs |
subs | list, str, dict, or None | None | Subscriber topic specs |
init | Callable or None | None | One-time setup. Can be async def |
shutdown | Callable or None | None | Cleanup on exit. Can be async def |
on_error | Callable or None | None | Error handler: on_error(node, exception) |
order | int | 100 | Execution order (lower = runs first in tick cycle) |
budget | float or None | None | Max tick execution time in seconds |
deadline | float or None | None | Hard deadline in seconds |
on_miss | str or None | None | Deadline miss policy |
failure_policy | str or None | None | Error recovery strategy |
compute | bool | False | Run on parallel thread pool |
on | str or None | None | Event-driven topic trigger |
priority | int or None | None | OS SCHED_FIFO priority (1-99) |
core | int or None | None | CPU core pinning |
watchdog | float or None | None | Per-node watchdog timeout (seconds) |
default_capacity | int | 1024 | Default ring buffer capacity |
Validation rules:
ratemust be positive —ValueErrorraised for 0 or negativenamemust be unique within a scheduler — duplicate names cause an error atadd()timecompute=Trueis mutually exclusive withon="topic"andasync def tickbudgetanddeadlineare in seconds — usehorus.usandhorus.msconstants:budget=300 * horus.us
Lifecycle
The scheduler manages the Node lifecycle in a strict order:
Construction → Registration → Init → Tick Loop → Shutdown
you sched.add() once repeated once
- Construction —
horus.Node(...)creates the node with your callbacks and config. No I/O happens here. - Registration —
scheduler.add(node)registers the node. Validates name uniqueness and config. - Initialization — On first
run()ortick_once(), the scheduler calls yourinit(node)callback (lazy). Ifinitraises an exception, thefailure_policydetermines what happens. - Tick Loop — Each cycle: scheduler calls
tick(node). If tick raises,on_error(node, exception)is called, thenfailure_policyhandles recovery. - Shutdown — On Ctrl+C, SIGTERM, duration expiry, or
request_stop():shutdown(node)called on each node in registration order.
Key Python differences from Rust:
- Init is lazy (first
run()ortick_once()), not atadd()time shutdown()always runs even ifinit()was never called (returns silently if noshutdowncallback)on_errorreceives a Python exception object, not a string- GIL is acquired for each callback, released between ticks
Topic Spec Formats
The pubs and subs parameters accept several formats:
# String — single topic, GenericMessage (~5-50μs)
pubs="cmd_vel"
# List of strings — multiple topics, GenericMessage
pubs=["cmd_vel", "status"]
# Typed class — zero-copy POD (~1.5μs)
pubs=[horus.CmdVel, horus.LaserScan]
# Dict with custom names — typed with explicit topic name
pubs={"cmd": horus.CmdVel, "scan": horus.LaserScan}
# Dict with config — full control
pubs={"cmd": {"type": horus.CmdVel, "capacity": 2048}}
Performance: Typed topics (horus.CmdVel) use zero-copy POD transport (~1.7μs). String topics use GenericMessage with MessagePack serialization (~6-50μs). Use typed topics for anything in a control loop or crossing the Rust/Python boundary.
Methods — Quick Reference
| Method | Signature | Returns | Description |
|---|---|---|---|
send | send(topic: str, data) -> bool | True on success | Publish data to a topic |
recv | recv(topic: str) -> Any or None | Message or None | Get latest message (pull-based) |
recv_all | recv_all(topic: str) -> list | List of messages | Drain all available messages |
has_msg | has_msg(topic: str) -> bool | bool | Check if a message is available |
log_info | log_info(msg: str) | — | Log at INFO level |
log_warning | log_warning(msg: str) | — | Log at WARNING level |
log_error | log_error(msg: str) | — | Log at ERROR level |
log_debug | log_debug(msg: str) | — | Log at DEBUG level |
request_stop | request_stop() | — | Signal scheduler to stop |
publishers | publishers() -> list[str] | Topic names | List of publisher topic names |
subscribers | subscribers() -> list[str] | Topic names | List of subscriber topic names |
Method Details
send()
node.send(topic: str, data: Any) -> bool
Publish data to a topic. Returns True on success.
Behavior:
- If the topic was declared in
pubs, uses the pre-created topic - If the topic was NOT declared in
pubs, auto-creates a GenericMessage topic (works but won't appear in monitoring) - For typed topics (
pubs=[horus.CmdVel]),datamust be the correct type - For string topics (
pubs=["data"]),datais serialized via MessagePack
Errors:
TypeErrorifdatais not serializable (lambda, socket, custom class without__dict__)- Never blocks — if the ring buffer is full, the oldest message is dropped
# Typed — zero-copy (~1.7μs)
node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.5))
# Dict — serialized (~6-50μs)
node.send("status", {"battery": 0.85, "mode": "auto"})
recv()
node.recv(topic: str) -> Any or None
Get the latest message from a topic. Returns None if no messages are available — never blocks.
Behavior:
- Pull-based: you decide when to check for messages
- Returns the next unread message from the ring buffer (FIFO order). Use
read_latest()on a standaloneTopicif you need only the most recent - If
has_msg()was called and buffered a message,recv()returns that buffered message first - Auto-creates topics for undeclared subscriptions (won't appear in monitoring)
def tick(node):
msg = node.recv("sensor")
if msg is None:
return # No data yet — normal, not an error
process(msg)
has_msg()
node.has_msg(topic: str) -> bool
Check if messages are available. Internally peeks by calling recv() — if a message exists, it's buffered and returned by the next recv() call.
Important: has_msg() consumes the message into a peek buffer. The next recv() returns the buffered message. This means has_msg() + recv() is safe but costs one recv internally:
# Safe pattern — the peeked message is returned by recv()
if node.has_msg("sensor"):
data = node.recv("sensor") # Returns the peeked message
recv_all()
node.recv_all(topic: str) -> list
Drain all available messages from the ring buffer. Returns an empty list if none. Use when you need to process every message (not just the latest):
# Process all accumulated messages
for msg in node.recv_all("events"):
handle_event(msg)
log_*()
node.log_info(msg: str)
node.log_warning(msg: str)
node.log_error(msg: str)
node.log_debug(msg: str)
Log messages via the HORUS logging system (backed by hlog! in Rust).
Important: These only work during tick(), init(), and shutdown() callbacks. Calling log outside these callbacks (e.g., at module level) drops the message silently — there's no active logging context.
request_stop()
node.request_stop()
Signal the scheduler to perform a graceful shutdown. All nodes complete their current tick, then shutdown() is called on each node in order.
Properties
| Property | Type | Description |
|---|---|---|
name | str | Node name |
rate | float | Tick rate in Hz |
order | int | Execution order |
info | NodeInfo | Scheduler-provided metrics (available during tick) |
User state: Assign any attribute in
init()(e.g.,node.state = {"counter": 0}) — it persists across ticks as a regular Python attribute. Not a built-in property.
Execution Classes
How your Node kwargs map to execution classes:
| Kwargs | Execution Class | Thread |
|---|---|---|
rate=100 (with budget or deadline) | Rt (auto) | RT thread with SCHED_FIFO |
compute=True | Compute | Worker thread pool |
on="topic_name" | Event | Triggered on topic data |
async def tick | AsyncIo | Async I/O thread pool |
| Default (none of the above) | BestEffort | Main tick thread |
RT is auto-detected: if you set rate with budget or deadline, the node is classified as real-time. No explicit rt=True needed on the node — the scheduler handles promotion.
Failure Policies
| String | Behavior | Use For |
|---|---|---|
"fatal" | First failure stops the scheduler | Motor controllers, safety monitors |
"restart" | Re-initialize with exponential backoff | Sensor drivers (USB reconnect) |
"skip" | Tolerate failures with cooldown | Logging, telemetry |
"ignore" | Swallow failures, keep ticking | Statistics, debug output |
# Safety-critical motor controller
motor = horus.Node(tick=motor_fn, failure_policy="fatal", on_miss="safe_mode")
# Recoverable sensor driver
lidar = horus.Node(tick=lidar_fn, failure_policy="restart")
# Non-critical telemetry
telemetry = horus.Node(tick=upload_fn, failure_policy="ignore")
Examples
Basic Node
import horus
def my_tick(node):
node.send("heartbeat", {"alive": True, "tick": horus.tick()})
node = horus.Node(name="pinger", pubs=["heartbeat"], tick=my_tick, rate=1)
horus.run(node)
Typed Topics (Zero-Copy)
import horus
def controller_tick(node):
scan = node.recv("scan")
if scan:
cmd = horus.CmdVel(linear=0.5, angular=0.0)
node.send("cmd_vel", cmd)
controller = horus.Node(
name="controller",
pubs=[horus.CmdVel],
subs=[horus.LaserScan],
tick=controller_tick,
rate=50
)
horus.run(controller)
Node with Full Lifecycle
import horus
def my_init(node):
node.state = {"counter": 0, "errors": 0}
node.log_info("Initialized")
def my_tick(node):
node.state["counter"] += 1
node.send("count", {"value": node.state["counter"]})
def my_shutdown(node):
node.log_info(f"Shutting down after {node.state['counter']} ticks, {node.state['errors']} errors")
def my_error(node, exception):
node.state["errors"] += 1
node.log_error(f"Error #{node.state['errors']}: {exception}")
sensor = horus.Node(
name="counter",
init=my_init,
tick=my_tick,
shutdown=my_shutdown,
on_error=my_error,
rate=10,
pubs=["count"],
failure_policy="restart",
)
horus.run(sensor)
Production: Safety-Critical Motor Controller
import horus
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is None:
# SAFETY: no command received — check stale timeout
node.state["stale_ticks"] += 1
if node.state["stale_ticks"] > 50: # 500ms at 100Hz
node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
node.log_warning("Stale command — motors zeroed")
return
node.state["stale_ticks"] = 0
# SAFETY: clamp to safe range
linear = max(-1.0, min(1.0, cmd.linear))
angular = max(-2.0, min(2.0, cmd.angular))
node.send("motor", horus.CmdVel(linear=linear, angular=angular))
def motor_init(node):
node.state = {"stale_ticks": 0}
def motor_shutdown(node):
# CRITICAL: zero motors before exit
node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
node.log_info("Motors zeroed")
def motor_error(node, exception):
node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
node.log_error(f"Motor error — zeroed: {exception}")
motor = horus.Node(
name="motor_ctrl",
init=motor_init,
tick=motor_tick,
shutdown=motor_shutdown,
on_error=motor_error,
rate=100,
order=0,
budget=500 * horus.us,
on_miss="safe_mode",
failure_policy="fatal",
subs=[horus.CmdVel],
pubs=["motor"],
)
horus.run(motor, tick_rate=100, rt=True, watchdog_ms=500)
Production: Sensor Node with Reconnection
import horus
hw_connected = [False]
def sensor_init(node):
try:
# Open hardware connection
node.state = {"device": open_hardware("/dev/ttyUSB0")}
hw_connected[0] = True
except OSError as e:
node.log_error(f"Hardware init failed: {e}")
raise # Let failure_policy="restart" handle it
def sensor_tick(node):
reading = node.state["device"].read()
node.send("sensor.data", horus.Imu(
accel_x=reading[0], accel_y=reading[1], accel_z=reading[2],
gyro_x=reading[3], gyro_y=reading[4], gyro_z=reading[5],
))
def sensor_shutdown(node):
if hw_connected[0]:
node.state["device"].close()
sensor = horus.Node(
name="imu_driver",
init=sensor_init,
tick=sensor_tick,
shutdown=sensor_shutdown,
rate=100,
order=0,
failure_policy="restart", # Auto-reconnect on USB disconnect
pubs=[horus.Imu],
)
horus.run(sensor)
Supporting Types
NodeInfo
Available as node.info during tick/init/shutdown. Provides scheduler-managed metrics:
| Method/Property | Returns | Description |
|---|---|---|
name | str | Node name |
state | str | Current lifecycle state |
tick_count() | int | Total ticks executed |
error_count() | int | Total errors |
successful_ticks() | int | Ticks without error |
failed_ticks() | int | Ticks that raised exceptions |
avg_tick_duration_ms() | float | Running average tick time |
get_uptime() | float | Seconds since init |
get_metrics() | dict | Full metrics snapshot (tick times, message counts, errors) |
request_stop() | — | Stop the scheduler |
set_custom_data(key, value) | — | Attach custom metadata |
get_custom_data(key) | str or None | Read custom metadata |
NodeState
String enumeration of node lifecycle states:
| State | Description |
|---|---|
UNINITIALIZED | Registered but init() not yet called |
INITIALIZING | init() is running |
RUNNING | Normal operation — tick() being called |
STOPPING | shutdown() is running |
STOPPED | Shutdown complete |
ERROR | init() or tick() returned an error |
CRASHED | tick() raised an unhandled exception |
Miss
Deadline miss policies:
| Constant | String | Behavior |
|---|---|---|
horus.Miss.WARN | "warn" | Log warning, continue normally |
horus.Miss.SKIP | "skip" | Skip this tick, resume next cycle |
horus.Miss.SAFE_MODE | "safe_mode" | Enter safe state, continue ticking |
horus.Miss.STOP | "stop" | Stop the entire scheduler |
Design Decisions
Why kwargs, not class inheritance? class MyNode(horus.Node): def tick(self):... requires boilerplate and doesn't work with plain functions or lambdas. The kwargs API (horus.Node(tick=my_fn, rate=30)) is more Pythonic, matches FastAPI/Click patterns, and all config happens in one call.
Why recv() returns data, not a callback? Pull-based reception keeps timing deterministic — your tick controls when data is consumed. Push-based callbacks fire at unpredictable times, making budget compliance harder.
Why has_msg() uses peek buffering? has_msg() internally calls recv() and buffers the result. This avoids a separate "peek" API while keeping the common if node.has_msg("x"): data = node.recv("x") pattern zero-overhead.
Why undeclared topics auto-create? Reduces boilerplate for prototyping. But undeclared topics don't appear in monitoring — always declare in pubs/subs for production.
Why no is_safe_state / enter_safe_state in Python? These are Rust-only Node trait methods requiring mutable borrows that don't map cleanly to Python's callback model. Safety-critical nodes should use Rust. Python nodes use on_miss="safe_mode" and on_error callbacks instead.
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
| Kwargs over inheritance | Concise, works with lambdas | No IDE auto-complete for tick signature |
| Auto-create topics | Zero boilerplate for prototyping | Undeclared topics invisible to monitoring |
has_msg() peek buffering | Clean API, no separate peek method | Consumes message on check |
No is_safe_state / enter_safe_state | Simpler Python API | Safety-critical nodes must use Rust |
Pull-based recv() | Deterministic timing | Must check every tick (no push notification) |
| GIL acquired per callback | Python works correctly | ~3μs overhead per tick |
See Also
- Scheduler API — Node orchestration and execution
- Topic API — Standalone pub/sub outside node lifecycle
- Async Nodes — Async I/O patterns and best practices
- Node Lifecycle — Init, tick, shutdown deep dive
- Fault Tolerance — Failure policies in depth
- GIL & Performance — Tick rate ceilings, optimization
- Getting Started (Python) — First Python application
- Rust Node API — Rust equivalent (582 lines)