Node API

The Node class is the primary building block of a HORUS application. Every component — sensors, controllers, planners, loggers — is a Node. All configuration happens in the constructor via kwargs — no class inheritance needed.

Rust: Available via impl Node for MyStruct. See Rust Node API.

import horus

def my_tick(node):
    node.send("heartbeat", {"alive": True})

node = horus.Node(name="pinger", pubs=["heartbeat"], tick=my_tick, rate=1)
horus.run(node)

Constructor

horus.Node(
    name="my_node",           # Optional: auto-generated UUID if None
    tick=my_tick_fn,          # Required for useful nodes: tick(node) or async def tick(node)
    rate=30,                  # Hz (must be positive)
    pubs=["cmd_vel"],         # Topics this node publishes to
    subs=["scan"],            # Topics this node subscribes to
    init=my_init_fn,          # Optional: init(node), called once at startup
    shutdown=my_shutdown_fn,  # Optional: shutdown(node), called once at exit
    on_error=my_error_fn,     # Optional: on_error(node, exception)
    order=100,                # Execution priority (lower = earlier)
    budget=300 * horus.us,    # Max tick time (seconds, use horus.us/horus.ms)
    deadline=900 * horus.us,  # Hard deadline per tick
    on_miss="warn",           # "warn", "skip", "safe_mode", "stop"
    failure_policy="fatal",   # "fatal", "restart", "skip", "ignore"
    compute=False,            # Run on thread pool (CPU-bound)
    on="scan",                # Event-driven: tick only when topic receives data
    priority=50,              # OS scheduling priority (1-99, requires RT)
    core=0,                   # Pin to CPU core
    watchdog=0.5,             # Per-node watchdog timeout (seconds)
    default_capacity=1024,    # Ring buffer capacity for auto-created topics
)

Parameters:

ParameterTypeDefaultDescription
namestr or NoneUUIDUnique node identifier
tickCallable[[Node], None]NoneMain loop function. Can be async def
ratefloat30Tick rate in Hz. Must be positive
pubslist, str, dict, or NoneNonePublisher topic specs
subslist, str, dict, or NoneNoneSubscriber topic specs
initCallable or NoneNoneOne-time setup. Can be async def
shutdownCallable or NoneNoneCleanup on exit. Can be async def
on_errorCallable or NoneNoneError handler: on_error(node, exception)
orderint100Execution order (lower = runs first in tick cycle)
budgetfloat or NoneNoneMax tick execution time in seconds
deadlinefloat or NoneNoneHard deadline in seconds
on_missstr or NoneNoneDeadline miss policy
failure_policystr or NoneNoneError recovery strategy
computeboolFalseRun on parallel thread pool
onstr or NoneNoneEvent-driven topic trigger
priorityint or NoneNoneOS SCHED_FIFO priority (1-99)
coreint or NoneNoneCPU core pinning
watchdogfloat or NoneNonePer-node watchdog timeout (seconds)
default_capacityint1024Default ring buffer capacity

Validation rules:

  • rate must be positive — ValueError raised for 0 or negative
  • name must be unique within a scheduler — duplicate names cause an error at add() time
  • compute=True is mutually exclusive with on="topic" and async def tick
  • budget and deadline are in seconds — use horus.us and horus.ms constants: budget=300 * horus.us

Lifecycle

The scheduler manages the Node lifecycle in a strict order:

Construction → Registration → Init → Tick Loop → Shutdown
    you         sched.add()    once    repeated     once
  1. Constructionhorus.Node(...) creates the node with your callbacks and config. No I/O happens here.
  2. Registrationscheduler.add(node) registers the node. Validates name uniqueness and config.
  3. Initialization — On first run() or tick_once(), the scheduler calls your init(node) callback (lazy). If init raises an exception, the failure_policy determines what happens.
  4. Tick Loop — Each cycle: scheduler calls tick(node). If tick raises, on_error(node, exception) is called, then failure_policy handles recovery.
  5. Shutdown — On Ctrl+C, SIGTERM, duration expiry, or request_stop(): shutdown(node) called on each node in registration order.

Key Python differences from Rust:

  • Init is lazy (first run() or tick_once()), not at add() time
  • shutdown() always runs even if init() was never called (returns silently if no shutdown callback)
  • on_error receives a Python exception object, not a string
  • GIL is acquired for each callback, released between ticks

Topic Spec Formats

The pubs and subs parameters accept several formats:

# String — single topic, GenericMessage (~5-50μs)
pubs="cmd_vel"

# List of strings — multiple topics, GenericMessage
pubs=["cmd_vel", "status"]

# Typed class — zero-copy POD (~1.5μs)
pubs=[horus.CmdVel, horus.LaserScan]

# Dict with custom names — typed with explicit topic name
pubs={"cmd": horus.CmdVel, "scan": horus.LaserScan}

# Dict with config — full control
pubs={"cmd": {"type": horus.CmdVel, "capacity": 2048}}

Performance: Typed topics (horus.CmdVel) use zero-copy POD transport (~1.7μs). String topics use GenericMessage with MessagePack serialization (~6-50μs). Use typed topics for anything in a control loop or crossing the Rust/Python boundary.


Methods — Quick Reference

MethodSignatureReturnsDescription
sendsend(topic: str, data) -> boolTrue on successPublish data to a topic
recvrecv(topic: str) -> Any or NoneMessage or NoneGet latest message (pull-based)
recv_allrecv_all(topic: str) -> listList of messagesDrain all available messages
has_msghas_msg(topic: str) -> boolboolCheck if a message is available
log_infolog_info(msg: str)Log at INFO level
log_warninglog_warning(msg: str)Log at WARNING level
log_errorlog_error(msg: str)Log at ERROR level
log_debuglog_debug(msg: str)Log at DEBUG level
request_stoprequest_stop()Signal scheduler to stop
publisherspublishers() -> list[str]Topic namesList of publisher topic names
subscriberssubscribers() -> list[str]Topic namesList of subscriber topic names

Method Details

send()

node.send(topic: str, data: Any) -> bool

Publish data to a topic. Returns True on success.

Behavior:

  • If the topic was declared in pubs, uses the pre-created topic
  • If the topic was NOT declared in pubs, auto-creates a GenericMessage topic (works but won't appear in monitoring)
  • For typed topics (pubs=[horus.CmdVel]), data must be the correct type
  • For string topics (pubs=["data"]), data is serialized via MessagePack

Errors:

  • TypeError if data is not serializable (lambda, socket, custom class without __dict__)
  • Never blocks — if the ring buffer is full, the oldest message is dropped
# Typed — zero-copy (~1.7μs)
node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.5))

# Dict — serialized (~6-50μs)
node.send("status", {"battery": 0.85, "mode": "auto"})

recv()

node.recv(topic: str) -> Any or None

Get the latest message from a topic. Returns None if no messages are available — never blocks.

Behavior:

  • Pull-based: you decide when to check for messages
  • Returns the next unread message from the ring buffer (FIFO order). Use read_latest() on a standalone Topic if you need only the most recent
  • If has_msg() was called and buffered a message, recv() returns that buffered message first
  • Auto-creates topics for undeclared subscriptions (won't appear in monitoring)
def tick(node):
    msg = node.recv("sensor")
    if msg is None:
        return  # No data yet — normal, not an error
    process(msg)

has_msg()

node.has_msg(topic: str) -> bool

Check if messages are available. Internally peeks by calling recv() — if a message exists, it's buffered and returned by the next recv() call.

Important: has_msg() consumes the message into a peek buffer. The next recv() returns the buffered message. This means has_msg() + recv() is safe but costs one recv internally:

# Safe pattern — the peeked message is returned by recv()
if node.has_msg("sensor"):
    data = node.recv("sensor")  # Returns the peeked message

recv_all()

node.recv_all(topic: str) -> list

Drain all available messages from the ring buffer. Returns an empty list if none. Use when you need to process every message (not just the latest):

# Process all accumulated messages
for msg in node.recv_all("events"):
    handle_event(msg)

log_*()

node.log_info(msg: str)
node.log_warning(msg: str)
node.log_error(msg: str)
node.log_debug(msg: str)

Log messages via the HORUS logging system (backed by hlog! in Rust).

Important: These only work during tick(), init(), and shutdown() callbacks. Calling log outside these callbacks (e.g., at module level) drops the message silently — there's no active logging context.

request_stop()

node.request_stop()

Signal the scheduler to perform a graceful shutdown. All nodes complete their current tick, then shutdown() is called on each node in order.


Properties

PropertyTypeDescription
namestrNode name
ratefloatTick rate in Hz
orderintExecution order
infoNodeInfoScheduler-provided metrics (available during tick)

User state: Assign any attribute in init() (e.g., node.state = {"counter": 0}) — it persists across ticks as a regular Python attribute. Not a built-in property.


Execution Classes

How your Node kwargs map to execution classes:

KwargsExecution ClassThread
rate=100 (with budget or deadline)Rt (auto)RT thread with SCHED_FIFO
compute=TrueComputeWorker thread pool
on="topic_name"EventTriggered on topic data
async def tickAsyncIoAsync I/O thread pool
Default (none of the above)BestEffortMain tick thread

RT is auto-detected: if you set rate with budget or deadline, the node is classified as real-time. No explicit rt=True needed on the node — the scheduler handles promotion.


Failure Policies

StringBehaviorUse For
"fatal"First failure stops the schedulerMotor controllers, safety monitors
"restart"Re-initialize with exponential backoffSensor drivers (USB reconnect)
"skip"Tolerate failures with cooldownLogging, telemetry
"ignore"Swallow failures, keep tickingStatistics, debug output
# Safety-critical motor controller
motor = horus.Node(tick=motor_fn, failure_policy="fatal", on_miss="safe_mode")

# Recoverable sensor driver
lidar = horus.Node(tick=lidar_fn, failure_policy="restart")

# Non-critical telemetry
telemetry = horus.Node(tick=upload_fn, failure_policy="ignore")

Examples

Basic Node

import horus

def my_tick(node):
    node.send("heartbeat", {"alive": True, "tick": horus.tick()})

node = horus.Node(name="pinger", pubs=["heartbeat"], tick=my_tick, rate=1)
horus.run(node)

Typed Topics (Zero-Copy)

import horus

def controller_tick(node):
    scan = node.recv("scan")
    if scan:
        cmd = horus.CmdVel(linear=0.5, angular=0.0)
        node.send("cmd_vel", cmd)

controller = horus.Node(
    name="controller",
    pubs=[horus.CmdVel],
    subs=[horus.LaserScan],
    tick=controller_tick,
    rate=50
)
horus.run(controller)

Node with Full Lifecycle

import horus

def my_init(node):
    node.state = {"counter": 0, "errors": 0}
    node.log_info("Initialized")

def my_tick(node):
    node.state["counter"] += 1
    node.send("count", {"value": node.state["counter"]})

def my_shutdown(node):
    node.log_info(f"Shutting down after {node.state['counter']} ticks, {node.state['errors']} errors")

def my_error(node, exception):
    node.state["errors"] += 1
    node.log_error(f"Error #{node.state['errors']}: {exception}")

sensor = horus.Node(
    name="counter",
    init=my_init,
    tick=my_tick,
    shutdown=my_shutdown,
    on_error=my_error,
    rate=10,
    pubs=["count"],
    failure_policy="restart",
)
horus.run(sensor)

Production: Safety-Critical Motor Controller

import horus

def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd is None:
        # SAFETY: no command received — check stale timeout
        node.state["stale_ticks"] += 1
        if node.state["stale_ticks"] > 50:  # 500ms at 100Hz
            node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
            node.log_warning("Stale command — motors zeroed")
        return

    node.state["stale_ticks"] = 0

    # SAFETY: clamp to safe range
    linear = max(-1.0, min(1.0, cmd.linear))
    angular = max(-2.0, min(2.0, cmd.angular))
    node.send("motor", horus.CmdVel(linear=linear, angular=angular))

def motor_init(node):
    node.state = {"stale_ticks": 0}

def motor_shutdown(node):
    # CRITICAL: zero motors before exit
    node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
    node.log_info("Motors zeroed")

def motor_error(node, exception):
    node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
    node.log_error(f"Motor error — zeroed: {exception}")

motor = horus.Node(
    name="motor_ctrl",
    init=motor_init,
    tick=motor_tick,
    shutdown=motor_shutdown,
    on_error=motor_error,
    rate=100,
    order=0,
    budget=500 * horus.us,
    on_miss="safe_mode",
    failure_policy="fatal",
    subs=[horus.CmdVel],
    pubs=["motor"],
)

horus.run(motor, tick_rate=100, rt=True, watchdog_ms=500)

Production: Sensor Node with Reconnection

import horus

hw_connected = [False]

def sensor_init(node):
    try:
        # Open hardware connection
        node.state = {"device": open_hardware("/dev/ttyUSB0")}
        hw_connected[0] = True
    except OSError as e:
        node.log_error(f"Hardware init failed: {e}")
        raise  # Let failure_policy="restart" handle it

def sensor_tick(node):
    reading = node.state["device"].read()
    node.send("sensor.data", horus.Imu(
        accel_x=reading[0], accel_y=reading[1], accel_z=reading[2],
        gyro_x=reading[3], gyro_y=reading[4], gyro_z=reading[5],
    ))

def sensor_shutdown(node):
    if hw_connected[0]:
        node.state["device"].close()

sensor = horus.Node(
    name="imu_driver",
    init=sensor_init,
    tick=sensor_tick,
    shutdown=sensor_shutdown,
    rate=100,
    order=0,
    failure_policy="restart",  # Auto-reconnect on USB disconnect
    pubs=[horus.Imu],
)
horus.run(sensor)

Supporting Types

NodeInfo

Available as node.info during tick/init/shutdown. Provides scheduler-managed metrics:

Method/PropertyReturnsDescription
namestrNode name
statestrCurrent lifecycle state
tick_count()intTotal ticks executed
error_count()intTotal errors
successful_ticks()intTicks without error
failed_ticks()intTicks that raised exceptions
avg_tick_duration_ms()floatRunning average tick time
get_uptime()floatSeconds since init
get_metrics()dictFull metrics snapshot (tick times, message counts, errors)
request_stop()Stop the scheduler
set_custom_data(key, value)Attach custom metadata
get_custom_data(key)str or NoneRead custom metadata

NodeState

String enumeration of node lifecycle states:

StateDescription
UNINITIALIZEDRegistered but init() not yet called
INITIALIZINGinit() is running
RUNNINGNormal operation — tick() being called
STOPPINGshutdown() is running
STOPPEDShutdown complete
ERRORinit() or tick() returned an error
CRASHEDtick() raised an unhandled exception

Miss

Deadline miss policies:

ConstantStringBehavior
horus.Miss.WARN"warn"Log warning, continue normally
horus.Miss.SKIP"skip"Skip this tick, resume next cycle
horus.Miss.SAFE_MODE"safe_mode"Enter safe state, continue ticking
horus.Miss.STOP"stop"Stop the entire scheduler

Design Decisions

Why kwargs, not class inheritance? class MyNode(horus.Node): def tick(self):... requires boilerplate and doesn't work with plain functions or lambdas. The kwargs API (horus.Node(tick=my_fn, rate=30)) is more Pythonic, matches FastAPI/Click patterns, and all config happens in one call.

Why recv() returns data, not a callback? Pull-based reception keeps timing deterministic — your tick controls when data is consumed. Push-based callbacks fire at unpredictable times, making budget compliance harder.

Why has_msg() uses peek buffering? has_msg() internally calls recv() and buffers the result. This avoids a separate "peek" API while keeping the common if node.has_msg("x"): data = node.recv("x") pattern zero-overhead.

Why undeclared topics auto-create? Reduces boilerplate for prototyping. But undeclared topics don't appear in monitoring — always declare in pubs/subs for production.

Why no is_safe_state / enter_safe_state in Python? These are Rust-only Node trait methods requiring mutable borrows that don't map cleanly to Python's callback model. Safety-critical nodes should use Rust. Python nodes use on_miss="safe_mode" and on_error callbacks instead.


Trade-offs

ChoiceBenefitCost
Kwargs over inheritanceConcise, works with lambdasNo IDE auto-complete for tick signature
Auto-create topicsZero boilerplate for prototypingUndeclared topics invisible to monitoring
has_msg() peek bufferingClean API, no separate peek methodConsumes message on check
No is_safe_state / enter_safe_stateSimpler Python APISafety-critical nodes must use Rust
Pull-based recv()Deterministic timingMust check every tick (no push notification)
GIL acquired per callbackPython works correctly~3μs overhead per tick

See Also