HORUS Python Bindings

Production-Ready Python API for the HORUS robotics framework - combines simplicity with advanced features for professional robotics applications.

Why HORUS Python?

  • Zero Boilerplate: Working node in 10 lines
  • Flexible API: Functional style or class inheritance - your choice
  • Production Performance: ~500ns latency (same shared memory as Rust)
  • Per-Node Rate Control: Different nodes at different frequencies (100Hz sensor, 10Hz logger)
  • Message Timestamps: Typed messages include timestamp_ns for timing
  • Typed Messages: Optional type-safe messages from Rust
  • Multiprocess Support: Process isolation and multi-language nodes
  • Pythonic: Feels like native Python, not a foreign function wrapper
  • Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.

Quick Start

Installation

Automatic (Recommended)

Python bindings are automatically installed when you run the HORUS installer:

# From HORUS root directory
./install.sh

The installer will detect Python 3.9+ and automatically build and install the bindings.

Manual Installation

If you prefer to install manually or need to rebuild:

# Install maturin (Python/Rust build tool)
# Option A: Via Cargo (recommended for Ubuntu 24.04+)
cargo install maturin

# Option B: Via pip (if not blocked by PEP 668)
# pip install maturin

# Build and install from source
cd horus_py
maturin develop --release

Requirements:

  • Python 3.9+
  • Rust 1.70+
  • Linux (for shared memory support)

Minimal Example

import horus

def process(node):
    node.send("output", "Hello HORUS!")

node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)

This minimal example demonstrates functional-style node creation without class boilerplate.


Core API

Creating a Node

def Node(
    name: str = "",              # Node name (auto-generated from tick function if empty)
    subs: str | list = "",       # Topics to subscribe (string, list of strings, or list of message types)
    pubs: str | list = "",       # Topics to publish (string, list of strings, or list of message types)
    tick: Callable = None,       # Function called every tick: tick(node) -> None
    init: Callable = None,       # Optional: called once on startup: init(node) -> None
    shutdown: Callable = None,   # Optional: called on graceful shutdown: shutdown(node) -> None
    rate: int = 30,              # Tick rate in Hz
    order: int = 100,            # Execution priority (lower = earlier)
    budget: float = None,        # Max tick duration in seconds (None = auto from rate)
    deadline: float = None,      # Hard deadline in seconds (None = auto from rate)
    on_miss: str = None,         # "warn", "skip", "safe_mode", "stop" (None = "warn")
) -> Node

Parameters:

  • name — Node identifier. If empty, derived from tick function name.
  • subs/pubs — Topic declarations. Accepts: "topic_name", ["topic1", "topic2"], or typed [CmdVel, LaserScan] (see formats below).
  • tick — Main loop function, called at rate Hz. Receives the node instance.
  • rate — Tick frequency in Hz. Default: 30. Setting rate auto-enables RT scheduling.
  • order — Priority within scheduler tick (0-9: critical, 10-49: sensors, 50-99: processing, 100+: background).

Example:

from horus import Node, CmdVel, LaserScan, Imu

node = Node(
    name="controller",
    pubs=[CmdVel],             # Typed — fast Pod zero-copy (~1.5μs)
    subs=[LaserScan, Imu],     # Auto-named: "scan", "imu"
    tick=control_fn,
    rate=100,
    order=0,
)

Topic declaration formats (determines performance path):

# FAST — typed (Pod zero-copy, ~2.7μs send+recv)
pubs=[CmdVel]                 # auto-name from type: "cmd_vel"
pubs=[CmdVel, Pose2D]         # multiple typed topics
pubs={"motor": CmdVel}        # custom name + type

# GENERIC — string (MessagePack, ~10μs send+recv)
pubs=["data"]                 # GenericMessage — for dicts and custom data
pubs="single_topic"           # shorthand for single string topic

Parameters:

  • name (str, optional): Node name (auto-generated if omitted)
  • pubs: Topics to publish — [CmdVel] (typed, fast) or ["name"] (generic)
  • subs: Topics to subscribe — same formats as pubs
  • tick (callable): Function called each cycle, receives (node) as argument
  • rate (float): Execution rate in Hz (default: 30)
  • init (callable, optional): Setup function, called once at start
  • shutdown (callable, optional): Cleanup function, called once at end
  • on_error (callable, optional): Error handler, called if tick raises an exception
  • default_capacity (int, optional): Buffer capacity for auto-created topics (default: 1024)

Alternative: Class as State Container

For nodes with complex state, use a plain class and pass its method as the tick function:

import horus

class SensorState:
    def __init__(self):
        self.reading = 0.0

    def tick(self, node):
        self.reading += 0.1
        node.send("temperature", self.reading)

    def init(self, node):
        print("Sensor initialized!")

    def shutdown(self, node):
        print("Sensor shutting down!")

# Use it
state = SensorState()
sensor = horus.Node(name="sensor", tick=state.tick, init=state.init,
                    shutdown=state.shutdown, pubs=["temperature"], rate=10)
horus.run(sensor)

Both patterns work! Use functional style for simplicity or class containers for complex nodes with state.

Node Functions

Your tick function receives the node as a parameter:

def my_tick(node):
    # Check for messages
    if node.has_msg("input"):
        data = node.recv("input")  # Get one message

    # Get all messages
    all_msgs = node.recv_all("input")

    # Send messages
    node.send("output", {"value": 42})

Node Methods:

send

def send(topic: str, data: Any) -> None

Publish a message to a topic. Non-blocking. Overwrites oldest if buffer is full.

Parameters:

  • topic: str — Topic name (must be in the node's pubs list)
  • data: Any — Message to send. Can be: a dict, a typed message (CmdVel, Image, etc.), or any serializable object
node.send("cmd_vel", {"linear": 1.0, "angular": 0.0})  # dict
node.send("cmd_vel", horus.CmdVel(1.0, 0.0))           # typed message

recv

def recv(topic: str) -> Optional[Any]

Receive one message from a topic (FIFO order). Returns None if no messages available.

Parameters:

  • topic: str — Topic name (must be in the node's subs list)

Returns: The message, or None if buffer is empty.

msg = node.recv("scan")
if msg is not None:
    print(f"Got {len(msg.ranges)} ranges")

node.recv_all(topic) -> list

Receive ALL available messages as a list. Drains the buffer completely. Returns an empty list if none available.

Use this for batch processing when you need to handle every message, not just the latest:

def tick(node):
    # Process all queued commands (don't drop any)
    commands = node.recv_all("commands")
    for cmd in commands:
        execute_command(cmd)
    node.log_debug(f"Processed {len(commands)} commands this tick")

node.has_msg(topic) -> bool

Check if at least one unread message is available on the topic without consuming it. The message is buffered internally and returned by the next recv() call.

def tick(node):
    if node.has_msg("emergency_stop"):
        stop = node.recv("emergency_stop")
        node.log_warning("Emergency stop received!")
        node.request_stop()

node.request_stop()

Request the scheduler to shut down gracefully after the current tick completes. Use this to stop execution programmatically from within a node.

def tick(node):
    if horus.tick() >= 1000:
        node.log_info("Reached 1000 ticks, stopping")
        node.request_stop()

    error = check_safety()
    if error:
        node.log_error(f"Safety violation: {error}")
        node.request_stop()

node.publishers() -> List[str]

Returns the list of topic names this node publishes to.

def init(node):
    node.log_info(f"Publishing to: {node.publishers()}")
    node.log_info(f"Subscribing to: {node.subscribers()}")

node.subscribers() -> List[str]

Returns the list of topic names this node subscribes to.

Logging Methods

MethodDescription
node.log_info(msg)Log an informational message
node.log_warning(msg)Log a warning message
node.log_error(msg)Log an error message
node.log_debug(msg)Log a debug message

Important: Logging only works during init(), tick(), or shutdown() callbacks. Calling outside the scheduler raises a RuntimeWarning and the message is silently dropped.

def tick(node):
    node.log_info("Processing sensor data")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to process data")
    node.log_debug(f"Raw value: {value}")

# Outside scheduler — message is dropped with RuntimeWarning:
node = horus.Node(name="test", tick=tick)
node.log_info("This will be dropped!")  # RuntimeWarning

Node scheduling kwargs (maps 1:1 to Rust NodeBuilder):

KwargDefaultRust equivalent
rate30.rate()
order100.order()
budgetNone.budget()
deadlineNone.deadline()
on_missNone (warn).on_miss()
failure_policyNone (fatal).failure_policy()
computeFalse.compute()
onNone.on(topic)
priorityNone.priority()
coreNone.core()
watchdogNone.watchdog()
async tickauto-detected.async_io()

:::tip Python Timing Guide Budget/deadline in Python detect overruns, not guarantee timing. Python ticks take milliseconds, not microseconds. Use realistic values:

# Rust node: microsecond budget
Node(tick=motor_ctrl, rate=1000, budget=300 * us)  # 300μs — Rust can do this

# Python ML node: millisecond budget — detects when inference is too slow
Node(tick=run_model, rate=30, budget=50 * ms)  # 50ms — triggers on_miss if exceeded

compute=True is useful when your tick calls C extensions that release the GIL (NumPy, PyTorch, OpenCV) — they run in parallel on the thread pool.

priority/core are for mixed Rust+Python systems — tell the OS to schedule Rust RT nodes before Python, and keep Python off RT cores. :::

Running Nodes

def run(*nodes: Node, duration: float = None) -> None

Convenience one-liner: creates a Scheduler, adds all nodes, and runs.

Parameters:

  • *nodes: Node — One or more Node instances to run
  • duration: float — Optional. Run for this many seconds, then stop. None = run until Ctrl+C.
# Single node — runs until Ctrl+C
horus.run(node)

# Multiple nodes for 10 seconds
horus.run(node1, node2, node3, duration=10)

Examples

1. Simple Publisher

import horus

def publish_temperature(node):
    node.send("temperature", 25.5)

sensor = horus.Node(
    name="temp_sensor",
    pubs="temperature",
    tick=publish_temperature,
    rate=1  # 1 Hz
)

horus.run(sensor, duration=10)

2. Subscriber

import horus

def display_temperature(node):
    if node.has_msg("temperature"):
        temp = node.recv("temperature")
        print(f"Temperature: {temp}°C")

display = horus.Node(
    name="display",
    subs="temperature",
    tick=display_temperature
)

horus.run(display)

3. Pub/Sub Pipeline

import horus

def publish(node):
    node.send("raw", 42.0)

def process(node):
    if node.has_msg("raw"):
        data = node.recv("raw")
        result = data * 2.0
        node.send("processed", result)

def display(node):
    if node.has_msg("processed"):
        value = node.recv("processed")
        print(f"Result: {value}")

# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)

# Run all together
horus.run(publisher, processor, displayer, duration=5)

4. Using Lambda Functions

import horus

# Producer (inline)
producer = horus.Node(
    pubs="numbers",
    tick=lambda n: n.send("numbers", 42),
    rate=1
)

# Transformer (inline)
doubler = horus.Node(
    subs="numbers",
    pubs="doubled",
    tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)

horus.run(producer, doubler, duration=5)

5. Multi-Topic Robot Controller

import horus

def robot_controller(node):
    # Read from multiple sensors
    lidar_data = None
    camera_data = None

    if node.has_msg("lidar"):
        lidar_data = node.recv("lidar")

    if node.has_msg("camera"):
        camera_data = node.recv("camera")

    # Compute commands
    if lidar_data and camera_data:
        cmd = compute_navigation(lidar_data, camera_data)
        node.send("motors", cmd)
        node.send("status", "navigating")

robot = horus.Node(
    name="robot_controller",
    subs=["lidar", "camera"],
    pubs=["motors", "status"],
    tick=robot_controller,
    rate=50  # 50Hz control loop
)

6. Lifecycle Management

import horus

class Context:
    def __init__(self):
        self.count = 0
        self.file = None

ctx = Context()

def init_handler(node):
    print("Starting up!")
    ctx.file = open("data.txt", "w")

def tick_handler(node):
    ctx.count += 1
    data = f"Tick {ctx.count}"
    node.send("data", data)
    ctx.file.write(data + "\n")

def shutdown_handler(node):
    print(f"Processed {ctx.count} messages")
    ctx.file.close()

node = horus.Node(
    pubs="data",
    init=init_handler,
    tick=tick_handler,
    shutdown=shutdown_handler,
    rate=10
)

horus.run(node, duration=5)

Advanced Features (Production-Ready)

HORUS Python includes advanced features that match or exceed ROS2 capabilities while maintaining simplicity.

NodeState

The NodeState enum tracks which lifecycle phase a node is in:

from horus import NodeState

# Values:
NodeState.IDLE       # Created but not yet running
NodeState.RUNNING    # Actively ticking
NodeState.PAUSED     # Temporarily suspended
NodeState.STOPPED    # Clean shutdown complete
NodeState.ERROR      # Error state

NodeState values are strings — you can compare directly:

if node_state == "running":
    print("Node is active")

Scheduler

def Scheduler(
    tick_rate: int = 100,        # Global tick rate in Hz
    rt: bool = False,            # Enable real-time scheduling (prefer_rt)
    watchdog_ms: int = 0,        # Watchdog timeout in ms (0 = disabled)
    deterministic: bool = False, # Deterministic execution mode
    verbose: bool = True,        # Enable verbose logging
) -> Scheduler

The Scheduler orchestrates node execution with priority ordering, per-node rate control, and real-time features.

Methods:

  • scheduler.add(node: Node) -> None — Register a node
  • scheduler.run() -> None — Start the main loop (blocks until Ctrl+C)
  • scheduler.run_for(seconds: float) -> None — Run for a duration, then stop
  • scheduler.tick_once() -> None — Execute one tick cycle (for testing)
  • scheduler.stop() -> None — Request graceful shutdown

Creating a Scheduler:

# All config on Node(), scheduler.add() takes only the node
scheduler = horus.Scheduler()
scheduler.add(horus.Node(tick=motor_fn, rate=1000, order=0, budget=200))
scheduler.add(horus.Node(tick=planner_fn, order=5, compute=True))
scheduler.add(horus.Node(tick=telemetry_fn, rate=1, order=10))

Node configuration (kwargs on Node()):

MethodDescription
.order(n)Execution priority (lower = runs first)
.rate(hz)Node tick rate in Hz — auto-derives budget/deadline, marks as RT
.budget(us)Tick budget in microseconds
.on_miss(policy)"warn", "skip", "safe_mode", or "stop"
.on(topic)Event-driven — wakes only when topic has new data
.compute()Offload to worker thread pool (planning, ML)
.async_io()Run on async executor (network, disk)
.failure_policy(name, ...)"fatal", "restart", "skip", or "ignore" — optional kwargs: max_retries, backoff_ms, max_failures, cooldown_ms
.build()Finalize and register — returns Scheduler

Adding Nodes:

All configuration (order, rate, budget, etc.) goes on the Node() constructor. scheduler.add() takes only the node:

sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=100, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)

scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

Execution:

MethodDescription
scheduler.run()Run until Ctrl+C or .stop()
scheduler.run(duration=10.0)Run for a specific duration, then shut down
scheduler.stop()Signal graceful shutdown
scheduler.current_tick()Current tick count

Monitoring:

MethodDescription
scheduler.get_node_stats(name)Stats dict: total_ticks, errors_count, avg_tick_duration_ms, etc.
scheduler.set_node_rate(name, rate)Change a node's tick rate at runtime
scheduler.set_tick_budget(name, us)Update per-node tick budget (microseconds)
scheduler.get_all_nodes()List all nodes with their configuration
scheduler.get_node_count()Number of registered nodes
scheduler.has_node(name)Check if a node is registered
scheduler.get_node_names()List of registered node names
scheduler.remove_node(name)Remove a node (returns True if found)
scheduler.status()Formatted status string
scheduler.capabilities()Dict of RT capabilities
scheduler.has_full_rt()True if all RT features available
scheduler.safety_stats()Dict of budget overruns, deadline misses, watchdog expirations

Recording & Replay:

MethodDescription
scheduler.is_recording()Check if recording is active
scheduler.is_replaying()Check if replaying
scheduler.stop_recording()Stop recording, returns list of saved file paths
Scheduler.list_recordings()List available recordings (static method)
Scheduler.delete_recording(name)Delete a recording (static method)

Context Manager:

The Scheduler supports the with statement for automatic cleanup:

with horus.Scheduler(tick_rate=100) as sched:
    sched.add(horus.Node(tick=sensor_fn, rate=100, order=0))
    sched.add(horus.Node(tick=ctrl_fn, rate=100, order=1))
    sched.run(duration=10.0)
# stop() called automatically on exit, even if an exception occurs

Expanded Method Details:

scheduler.get_node_stats(name) -> dict

Returns a dictionary with detailed statistics for the named node:

stats = scheduler.get_node_stats("motor_ctrl")
print(f"Total ticks: {stats['total_ticks']}")
print(f"Avg tick: {stats.get('avg_tick_duration_ms', 0):.2f} ms")
print(f"Errors: {stats['errors_count']}")

Keys include: name, priority, total_ticks, successful_ticks, failed_ticks, avg_tick_duration_ms, max_tick_duration_ms, errors_count, uptime_seconds.

scheduler.status() -> str

Returns the current scheduler state: "idle", "running", or "stopped".

scheduler.current_tick() -> int

Returns the current tick count (0-indexed).

scheduler.set_node_rate(name, rate)

Change a node's tick rate at runtime. Useful for adaptive control:

# Slow down logging when battery is low
if battery_low:
    scheduler.set_node_rate("logger", 1)  # 1 Hz
else:
    scheduler.set_node_rate("logger", 10)  # 10 Hz

scheduler.run(duration=None)

Start the scheduler tick loop. Blocks until Ctrl+C, stop(), or duration expires.

scheduler.run()              # Run forever (until Ctrl+C)
scheduler.run(duration=30.0) # Run for 30 seconds, then stop

scheduler.stop()

Signal graceful shutdown. All nodes' shutdown() callbacks run before exit.

# From another thread or a node's tick:
scheduler.stop()

scheduler.get_all_nodes() -> List[Dict]

Returns all registered nodes with their configuration.

for node in scheduler.get_all_nodes():
    print(f"{node['name']}: order={node.get('order', '?')}")

scheduler.get_node_count() -> int

Number of registered nodes.

print(f"Running {scheduler.get_node_count()} nodes")

scheduler.has_node(name) -> bool

Check if a node with the given name is registered.

if scheduler.has_node("motor_ctrl"):
    stats = scheduler.get_node_stats("motor_ctrl")

scheduler.get_node_names() -> List[str]

List of all registered node names.

print(f"Nodes: {scheduler.get_node_names()}")

scheduler.remove_node(name) -> bool

Remove a node by name. Returns True if found and removed.

if scheduler.remove_node("debug_logger"):
    print("Debug logger removed")

scheduler.capabilities() -> Dict

Returns a dict of detected RT capabilities (SCHED_FIFO, memory locking, CPU affinity, etc.).

caps = scheduler.capabilities()
print(f"RT priority: {caps.get('max_priority', 'N/A')}")
print(f"Memory lock: {caps.get('memory_locking', False)}")

scheduler.has_full_rt() -> bool

Returns True if all requested RT features are available (no degradations).

if not scheduler.has_full_rt():
    print("Warning: running with degraded RT — check capabilities()")

scheduler.safety_stats() -> Dict

Returns safety monitor statistics: budget overruns, deadline misses, watchdog expirations.

stats = scheduler.safety_stats()
if stats:
    print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
    print(f"Budget overruns: {stats.get('budget_overruns', 0)}")

scheduler.is_recording() -> bool

Check if session recording is currently active.

scheduler.is_replaying() -> bool

Check if the scheduler is replaying a recorded session.

scheduler.stop_recording() -> List[str]

Stop recording and return the list of saved file paths.

if scheduler.is_recording():
    paths = scheduler.stop_recording()
    print(f"Saved recordings: {paths}")

Scheduler.list_recordings() -> List[str]

List available recording sessions (static method).

recordings = horus.Scheduler.list_recordings()
for r in recordings:
    print(f"  {r}")

Scheduler.delete_recording(name) -> bool

Delete a recording by name (static method).

scheduler.tick(node_names) -> None

Execute one tick cycle for the specified nodes only. Essential for deterministic testing — run exactly one tick and verify output.

scheduler = horus.Scheduler(tick_rate=100)
scheduler.add(sensor)
scheduler.add(controller)

# Test: run one tick, check output
scheduler.tick(["Sensor", "Controller"])
stats = scheduler.get_node_stats("Controller")
assert stats["total_ticks"] == 1

scheduler.tick_for(node_names, duration_seconds) -> None

Execute ticks for the specified nodes over a duration. Useful for benchmarking and timed test runs.

# Benchmark sensor processing for 5 seconds
scheduler.tick_for(["SensorProcessor"], 5.0)
stats = scheduler.get_node_stats("SensorProcessor")
print(f"Processed {stats['total_ticks']} ticks in 5s")

scheduler.is_running() -> bool

Check if the scheduler is currently executing its tick loop.

import threading

def monitor_thread(sched):
    while sched.is_running():
        print(f"Tick: {sched.current_tick()}")
        time.sleep(1.0)

t = threading.Thread(target=monitor_thread, args=(scheduler,), daemon=True)
t.start()
scheduler.run(duration=10.0)

scheduler.get_node_info(name) -> Optional[int]

Get the execution order (priority) for a named node. Returns None if the node is not registered.

order = scheduler.get_node_info("motor_ctrl")
if order is not None:
    print(f"motor_ctrl runs at order {order}")

scheduler.degradations() -> List[Dict]

Returns a list of RT feature degradations — features that were requested but couldn't be applied (e.g., SCHED_FIFO unavailable without root).

scheduler = horus.Scheduler(tick_rate=1000, rt=True)
# ... add nodes and run ...

for d in scheduler.degradations():
    print(f"Degraded: {d.get('feature')} — {d.get('reason')}")

Each dict contains feature (what was requested) and reason (why it couldn't be applied).

horus.run() — The ONE way to run nodes:

from horus import Node, run, us

sensor = Node(tick=read_lidar, rate=10, order=0, pubs=["scan"])
ctrl = Node(tick=navigate, rate=30, order=1, subs=["scan"], pubs=["cmd"])
motor = Node(tick=drive, rate=1000, order=2, budget=300*us, subs=["cmd"])

# All scheduler config as kwargs
run(sensor, ctrl, motor, rt=True, watchdog_ms=500)
run(node, duration=10, deterministic=True)

All run() kwargs (maps to Rust Scheduler builder):

KwargDefaultRust equivalent
durationNone (forever).run() / .run_for()
tick_rate1000.0.tick_rate()
rtFalse.prefer_rt()
deterministicFalse.deterministic()
watchdog_ms0.watchdog()
blackbox_mb0.blackbox()
recordingFalse.with_recording()
nameNone.name()
coresNone.cores()
max_deadline_missesNone.max_deadline_misses()
verboseFalse.verbose()
telemetryNone.telemetry()

Miss — Deadline Miss Policy

The Miss class defines what happens when a node exceeds its deadline:

from horus import Miss

# Available policies
Miss.WARN        # Log warning and continue (default)
Miss.SKIP        # Skip the node for this tick
Miss.SAFE_MODE   # Call enter_safe_state() on the node
Miss.STOP        # Stop the entire scheduler

Use via the Node constructor:

# Config on Node, then add
motor = horus.Node(name="motor", tick=motor_fn, rate=500, order=0, budget=200, on_miss="safe_mode")
scheduler.add(motor)

Scheduler Configuration

All configuration via constructor kwargs:

from horus import Scheduler, Node

# Development — simple
scheduler = Scheduler()

# Production — watchdog + RT
scheduler = Scheduler(tick_rate=1000, rt=True, watchdog_ms=500)

# With blackbox + telemetry
scheduler = Scheduler(
    tick_rate=1000,
    watchdog_ms=500,
    blackbox_mb=64,
    telemetry="http://localhost:9090",
    verbose=True,
)

# Deterministic mode for simulation/testing
scheduler = Scheduler(tick_rate=100, deterministic=True)

Testing with short runs:

scheduler = Scheduler()
scheduler.add(Node(name="sensor", tick=sensor_fn, rate=100, order=0))
scheduler.add(Node(name="ctrl", tick=ctrl_fn, rate=100, order=1))

# Run for a short duration
scheduler.run(duration=0.1)

Message Timestamps

Timestamps are managed by the Rust Topic backend. Typed messages include a timestamp_ns field for nanosecond-precision timing:

import horus
import time

def control_tick(node):
    if node.has_msg("sensor_data"):
        msg = node.recv("sensor_data")

        # Use message-level timestamps for latency checks
        if hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
            age_s = (time.time_ns() - msg.timestamp_ns) / 1e9
            if age_s > 0.1:  # More than 100ms old
                node.log_warning(f"Stale data: {age_s*1000:.1f}ms old")
                return

            latency = age_s
            print(f"Latency: {latency*1000:.1f}ms")

        # Process fresh data
        process(msg)

Timestamp access: Use msg.timestamp_ns on typed messages (CmdVel, Pose2D, Imu, etc.) for nanosecond timestamps set by the Rust backend.

Multiprocess Execution

Run Python nodes in separate processes for isolation and multi-language support:

# Run multiple Python files as separate processes
horus run node1.py node2.py node3.py

# Mix Python and Rust nodes
horus run sensor.rs controller.py visualizer.py

# Mix Rust and Python
horus run lidar_driver.rs planner.py motor_control.rs

All nodes in the same horus run session automatically communicate via shared memory!

Example - Distributed System:

# sensor_node.py
import horus

def sensor_tick(node):
    data = read_lidar()  # Your sensor code
    node.send("lidar_data", data)

sensor = horus.Node(name="lidar", pubs="lidar_data", tick=sensor_tick)
horus.run(sensor)
# controller_node.py
import horus

def control_tick(node):
    if node.has_msg("lidar_data"):
        data = node.recv("lidar_data")
        cmd = compute_control(data)
        node.send("motor_cmd", cmd)

controller = horus.Node(
    name="controller",
    subs="lidar_data",
    pubs="motor_cmd",
    tick=control_tick
)
horus.run(controller)
# Run both in separate processes
horus run sensor_node.py controller_node.py

Benefits:

  • Process isolation: One crash doesn't kill everything
  • Multi-language: Mix Python and Rust nodes in the same application
  • Parallel execution: True multicore utilization
  • Zero configuration: Shared memory IPC automatically set up

Complete Example: All Features Together

import horus
import time

def sensor_tick(node):
    """High-frequency sensor (100Hz)"""
    imu = {"accel_x": 1.0, "accel_y": 0.0, "accel_z": 9.8}
    node.send("imu_data", imu)
    node.log_info("Published IMU data")

def control_tick(node):
    """Medium-frequency control (50Hz)"""
    if node.has_msg("imu_data"):
        imu = node.recv("imu_data")
        cmd = {"linear": 1.0, "angular": 0.0}
        node.send("cmd_vel", cmd)

def logger_tick(node):
    """Low-frequency logging (10Hz)"""
    if node.has_msg("cmd_vel"):
        msg = node.recv("cmd_vel")
        node.log_info(f"Command received: {msg}")

# Create nodes with rate and order configured on the Node
sensor = horus.Node(name="imu", pubs="imu_data", tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", subs="imu_data", pubs="cmd_vel", tick=control_tick, rate=50, order=1)
logger = horus.Node(name="log", subs="cmd_vel", tick=logger_tick, rate=10, order=2)

# Add nodes to scheduler
scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

scheduler.run(duration=5.0)

# Check statistics
stats = scheduler.get_node_stats("imu")
print(f"Sensor: {stats['total_ticks']} ticks in 5 seconds")

Network Communication

HORUS Python supports network communication for distributed multi-machine systems. Topic, and Router all work transparently over the network.

Topic Network Endpoints

Add an endpoint parameter to communicate over the network:

from horus import Topic, CmdVel

# Local (shared memory) - default
local_topic = Topic(CmdVel)

# Network (UDP direct)
network_topic = Topic(CmdVel, endpoint="cmdvel@192.168.1.100:8000")

# Router (TCP broker for WAN/NAT traversal)
router_topic = Topic(CmdVel, endpoint="cmdvel@router")

Endpoint Syntax:

  • "topic" - Local shared memory (~500ns latency)
  • "topic@host:port" - Direct UDP (<50μs latency)
  • "topic@router" - Router broker (auto-discovery on localhost:7777)
  • "topic@192.168.1.100:7777" - Router broker at specific address

Topic Methods

MethodDescription
topic.send(msg, node=None)Send a message. Pass optional node for automatic IPC logging. Returns True.
topic.recv(node=None)Receive one message. Returns the message or None if empty.
topic.nameProperty: the topic name string
topic.backend_typeProperty: the active backend name (e.g., "direct", "spsc_shm")
topic.is_network_topicProperty: True if this topic uses network transport
topic.endpointProperty: the endpoint string, or None for local topics
topic.stats()Returns a dict with messages_sent, messages_received, send_failures, recv_failures, is_network, backend
topic.is_generic()Returns True if this is a generic (string-name) topic

Example:

from horus import Topic, CmdVel

topic = Topic(CmdVel)

# Send and receive typed messages
topic.send(CmdVel(linear=1.0, angular=0.5))
msg = topic.recv()
if msg:
    print(f"linear={msg.linear}, angular={msg.angular}")

# Check topic properties
print(f"Name: {topic.name}")           # "cmd_vel"
print(f"Backend: {topic.backend_type}") # e.g. "mpmc_shm"
print(f"Stats: {topic.stats()}")

Generic Topics

When you create a Topic with a string name (instead of a typed class), you get a generic topic that accepts any JSON-serializable data:

from horus import Topic, CmdVel

# Generic topic (string name = dynamic typing)
topic = Topic("my_topic")

# Typed topic (class = static typing, better performance)
typed_topic = Topic(CmdVel)

Generic topics use the same send() and recv() methods as typed topics, but accept any JSON-serializable Python object. Data is serialized via MessagePack internally.

from horus import Topic

topic = Topic("sensor_data")

# Send dict, list, or any JSON-serializable data
topic.send({"temperature": 25.5, "humidity": 60.0})
topic.send([1.0, 2.0, 3.0, 4.0])
topic.send("status: OK")

# Receive (returns Python object)
msg = topic.recv()  # {"temperature": 25.5, "humidity": 60.0}

# Check if generic
print(topic.is_generic())  # True

Typed vs Generic Performance:

Topic TypeSerializationUse Case
Typed (Topic(CmdVel))Direct field extraction (no serde)Production, cross-language, high-frequency
Generic (Topic("name"))Python → JSON → MessagePackDynamic schemas, prototyping, Python-only

Automatic Transport Selection

HORUS automatically selects the fastest communication path based on where publishers and subscribers are located. You never need to configure this manually:

from horus import Topic, CmdVel

# Just create a topic — HORUS picks the fastest path automatically:
# Same-thread:     ~3ns  (when pub+sub are in the same node)
# Same-process:    ~18-36ns  (when pub+sub are in different nodes, same process)
# Cross-process:   ~85-167ns  (when pub+sub are in different processes)
topic = Topic("cmd_vel", CmdVel)

Automatic Transport Tiers:

ScenarioLatencyWhen it applies
Same thread~3nsPublisher and subscriber are in the same node
Same process (1:1)~18nsOne publisher, one subscriber, same process
Same process (many:1)~26nsMultiple publishers, one subscriber, same process
Same process (many:many)~36nsMultiple publishers and subscribers, same process
Cross-process (1:1)~85nsOne publisher, one subscriber, different processes
Cross-process (many:many)~167nsMultiple publishers and subscribers, different processes

Router Client (WAN/NAT Traversal)

For communication across networks, through NAT, or for large-scale deployments, use the Router:

from horus import RouterClient, Topic, CmdVel

# Create router client for explicit connection management
router = RouterClient("192.168.1.100", 7777)

# Build endpoints through the router
cmd_endpoint = router.endpoint("cmdvel")  # Returns "cmdvel@192.168.1.100:7777"
pose_endpoint = router.endpoint("pose")

# Use endpoints with Topic
topic = Topic(CmdVel, endpoint=cmd_endpoint)

# Router properties
print(f"Address: {router.address}")        # "192.168.1.100:7777"
print(f"Connected: {router.is_connected}") # True
print(f"Topics: {router.topics}")          # ["cmdvel", "pose"]
print(f"Uptime: {router.uptime_seconds}s")

Helper Functions:

from horus import default_router_endpoint, router_endpoint

# Default router (localhost:7777)
ep1 = default_router_endpoint("cmdvel")  # "cmdvel@router"

# Custom router address
ep2 = router_endpoint("cmdvel", "192.168.1.100", 7777)  # "cmdvel@192.168.1.100:7777"

Router Server (for testing):

from horus import RouterServer

# Start a local router (for development/testing)
server = RouterServer(port=7777)
server.start()

# For production, use CLI instead:
# $ horus router start --port 7777

When to Use What

TransportLatencyUse Case
Same-process (Topic(CmdVel))~18-36nsIn-process communication (automatic)
Cross-process, 1:1 (Topic(CmdVel))~85nsSame machine, one publisher and one subscriber
Cross-process, many:many (Topic(CmdVel))~167nsSame machine, multiple publishers and subscribers
Network (endpoint="topic@host:port")<50μsMulti-machine on LAN (direct UDP)
Router (endpoint="topic@router")10-50msWAN, NAT traversal, cloud deployments

Multi-Machine Example

# === ROBOT (192.168.1.50) ===
from horus import Topic, CmdVel, Imu, Odometry

# Local: Critical flight control (ultra-fast)
imu_topic = Topic(Imu)  # ~85ns local shared memory

# Network: Telemetry to ground station
telemetry = Topic(Odometry, endpoint="telem@192.168.1.100:8000")

# Network: Commands from ground station
commands = Topic(CmdVel, endpoint="cmd@0.0.0.0:8001")


# === GROUND STATION (192.168.1.100) ===
from horus import Topic, CmdVel, Odometry

# Receive telemetry from robot
telemetry_sub = Topic(Odometry, endpoint="telem@0.0.0.0:8000")

# Send commands to robot
command_pub = Topic(CmdVel, endpoint="cmd@192.168.1.50:8001")

Integration with Python Ecosystem

NumPy Integration

import horus
import numpy as np

def process_array(node):
    if node.has_msg("raw_data"):
        data = node.recv("raw_data")
        # Convert to NumPy array
        arr = np.array(data)
        # Process with NumPy
        result = np.fft.fft(arr)
        node.send("fft_result", result.tolist())

processor = horus.Node(
    subs="raw_data",
    pubs="fft_result",
    tick=process_array
)

OpenCV Integration

import horus
import cv2
import numpy as np

def process_image(node):
    if node.has_msg("camera"):
        img_data = node.recv("camera")
        # Convert to OpenCV format
        img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))

        # Apply OpenCV processing
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        # Publish result
        node.send("edges", edges.flatten().tolist())

vision = horus.Node(
    subs="camera",
    pubs="edges",
    tick=process_image,
    rate=30
)

scikit-learn Integration

import horus
from sklearn.linear_model import LinearRegression
import numpy as np

model = LinearRegression()

def train_model(node):
    if node.has_msg("training_data"):
        data = node.recv("training_data")
        X = np.array(data['features'])
        y = np.array(data['labels'])

        # Train model
        model.fit(X, y)
        score = model.score(X, y)

        node.send("model_score", score)

trainer = horus.Node(
    subs="training_data",
    pubs="model_score",
    tick=train_model
)

Advanced Patterns

State Management

import horus

class RobotState:
    def __init__(self):
        self.position = {"x": 0.0, "y": 0.0}
        self.velocity = 0.0
        self.last_update = 0

state = RobotState()

def update_state(node):
    if node.has_msg("velocity"):
        state.velocity = node.recv("velocity")

    if node.has_msg("position"):
        state.position = node.recv("position")

    # Publish combined state
    node.send("robot_state", {
        "pos": state.position,
        "vel": state.velocity
    })

state_manager = horus.Node(
    subs=["velocity", "position"],
    pubs="robot_state",
    tick=update_state
)

Rate Limiting

import horus
import time

class RateLimiter:
    def __init__(self, min_interval):
        self.min_interval = min_interval
        self.last_send = 0

limiter = RateLimiter(min_interval=0.1)  # 100ms minimum

def rate_limited_publish(node):
    current_time = time.time()

    if current_time - limiter.last_send >= limiter.min_interval:
        node.send("output", "data")
        limiter.last_send = current_time

node = horus.Node(
    pubs="output",
    tick=rate_limited_publish,
    rate=100  # Node runs at 100Hz, but publishes at max 10Hz
)

Error Handling

import horus

def safe_processing(node):
    try:
        if node.has_msg("input"):
            data = node.recv("input")
            result = risky_operation(data)
            node.send("output", result)
    except Exception as e:
        node.send("errors", str(e))
        print(f"Error: {e}")

processor = horus.Node(
    subs="input",
    pubs=["output", "errors"],
    tick=safe_processing
)

Performance Tips

1. Use Per-Node Rate Control

# Configure rate and order on the Node, then add to scheduler
sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=50, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)

scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)

scheduler.run()

# Monitor performance with get_node_stats()
stats = scheduler.get_node_stats("sensor")
print(f"Sensor executed {stats['total_ticks']} ticks")

2. Check Message Freshness

import time

def control_tick(node):
    if node.has_msg("sensor_data"):
        data = node.recv("sensor_data")
        # Use message-level timestamps for staleness checks
        if hasattr(data, 'timestamp_ns') and data.timestamp_ns:
            age_s = (time.time_ns() - data.timestamp_ns) / 1e9
            if age_s > 0.1:
                node.log_warning("Skipping stale sensor data")
                return
        process(data)

3. Use Dicts for Messages

# Send messages as Python dicts (automatically serialized to JSON)
cmd = {"linear": 1.5, "angular": 0.8}
node.send("cmd_vel", cmd)

# For staleness checks, use typed messages with timestamp_ns
# or track send time at the application level

4. Batch Processing

# Use node.recv_all() to process all available messages at once
def batch_processor(node):
    messages = node.recv_all("input")
    if messages:
        results = [process(msg) for msg in messages]
        for result in results:
            node.send("output", result)

5. Keep tick() Fast

# GOOD: Fast tick
def good_tick(node):
    if node.has_msg("input"):
        data = node.recv("input")
        result = quick_operation(data)
        node.send("output", result)

# BAD: Slow tick
def bad_tick(node):
    time.sleep(1)  # Don't block!
    data = requests.get("http://api.example.com")  # Don't do I/O!

6. Offload Heavy Processing

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def heavy_processing_node(node):
    if node.has_msg("input"):
        data = node.recv("input")
        # Offload to thread pool
        future = executor.submit(expensive_operation, data)
        # Don't block - check result later or use callback

7. Use Multiprocess for CPU-Intensive Tasks

# Isolate heavy processing in separate processes
horus run sensor.py heavy_vision.py light_controller.py

# Each node gets its own CPU core

Development

Building from Source

# Debug build (fast compile, slow runtime)
cd horus_py
maturin develop

# Release build (slow compile, fast runtime)
maturin develop --release

# Build wheel for distribution
maturin build --release

Running Tests

# Install test dependencies
pip install pytest

# Run all tests
pytest tests/

# Run specific feature tests
horus run tests/test_rate_control.py    # Phase 1: Per-node rates
horus run tests/test_timestamps.py      # Phase 2: Timestamps
horus run tests/test_typed_messages.py  # Phase 3: Typed messages

# With coverage
pytest --cov=horus tests/

# Test multiprocess execution (Phase 4)
horus run tests/multiprocess_publisher.py tests/multiprocess_subscriber.py

Mock Mode

HORUS Python includes a mock mode for testing without Rust bindings:

# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."

# Use for unit testing Python logic without HORUS running

Debugging Tips

# Check node statistics
scheduler = horus.Scheduler()
scheduler.add(my_node)

# Check node statistics
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}, Errors: {stats['errors_count']}")

# Monitor message timestamps via message-level fields
msg = node.recv("topic")
if msg and hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
    age = (time.time_ns() - msg.timestamp_ns) / 1e9
    print(f"Message age: {age*1000:.1f}ms")

Interoperability

With Rust Nodes

Important: For cross-language communication, use typed topics by passing a message type to Topic().

Cross-Language with Typed Topics

# Python node with typed topic
from horus import Topic, CmdVel

cmd_topic = Topic(CmdVel)  # Typed topic
cmd_topic.send(CmdVel(linear=1.0, angular=0.5))
// Rust node receives
use horus::prelude::*;

let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
if let Some(cmd) = topic.recv() {
    println!("Got: linear={}, angular={}", cmd.linear, cmd.angular);
}

Generic Topic (String Topics)

# Generic Topic - for custom topics
from horus import Topic

topic = Topic("my_topic")  # Pass string for generic topic
topic.send({"linear": 1.0, "angular": 0.5})  # Uses JSON serialization

Typed topics: Use Topic(CmdVel), Topic(Pose2D) for cross-language communication. See Python Message Library for details.


Time API

Framework-aware time functions. Use these instead of time.time() — they integrate with deterministic mode and SimClock.

Quick reference:

FunctionReturnsDescription
horus.now()floatCurrent time in seconds
horus.dt()floatTimestep for this tick in seconds
horus.elapsed()floatTime since scheduler start
horus.tick()intCurrent tick number
horus.budget_remaining()floatTime left in tick budget
horus.rng_float()floatRandom float in [0, 1)
horus.timestamp_ns()intNanosecond timestamp

horus.now() -> float

Current framework time in seconds.

  • Normal mode: Wall clock (time.time() equivalent)
  • Deterministic mode: Virtual SimClock that advances by fixed dt each tick
def tick(node):
    t = horus.now()
    node.send("timestamp", t)

horus.dt() -> float

Timestep for this tick in seconds. Use this for physics integration instead of measuring elapsed time manually.

  • Normal mode: Actual elapsed time since last tick
  • Deterministic mode: Fixed 1.0 / rate — identical across runs
def tick(node):
    # PID controller using dt() for correct integration
    error = target - current
    integral += error * horus.dt()
    derivative = (error - prev_error) / horus.dt()
    output = kp * error + ki * integral + kd * derivative

horus.elapsed() -> float

Time elapsed since the scheduler started, in seconds.

def tick(node):
    if horus.elapsed() > 30.0:
        node.log_info("Running for 30+ seconds, stabilized")

horus.tick() -> int

Current tick number (0-indexed, increments each scheduler cycle).

def tick(node):
    if horus.tick() % 100 == 0:
        node.log_info(f"Tick {horus.tick()}: system healthy")

horus.budget_remaining() -> float

Time remaining in this tick's budget, in seconds. Returns float('inf') if no budget is configured.

Use this for adaptive quality — do more work when time permits, skip expensive operations when tight.

def tick(node):
    # Always do critical work
    process_sensor_data()

    # Only do expensive work if budget allows
    if horus.budget_remaining() > 0.001:  # >1ms remaining
        run_expensive_optimization()

horus.rng_float() -> float

Random float in [0.0, 1.0).

  • Normal mode: System entropy (non-deterministic)
  • Deterministic mode: Tick-seeded RNG — produces identical sequences across runs
def tick(node):
    # Simulated sensor noise (reproducible in deterministic mode)
    noise = (horus.rng_float() - 0.5) * 0.1
    reading = true_value + noise

horus.timestamp_ns() -> int

Current timestamp in nanoseconds. Use for TransformFrame queries and message timestamping.

def tick(node):
    ts = horus.timestamp_ns()
    transform = tf.lookup("camera", "base_link", ts)

Deterministic Mode

When using horus.run(..., deterministic=True), the time functions switch from wall clock to SimClock:

FunctionNormal ModeDeterministic Mode
now()Wall clockSimClock (virtual)
dt()Actual elapsedFixed 1/rate
elapsed()Real elapsedVirtual elapsed
rng_float()System entropyTick-seeded (reproducible)
tick()SameSame
budget_remaining()SameSame
timestamp_ns()Real nanosecondsVirtual nanoseconds

This ensures identical behavior across runs — critical for simulation, testing, and replay.


Runtime Parameters

horus.Params provides dict-like access to runtime configuration stored in .horus/config/params.yaml. Change PID gains, speed limits, and thresholds without recompiling.

Params(path=None)

Create a parameter store.

  • Params() — loads from .horus/config/params.yaml (default)
  • Params("path/to/file.yaml") — loads from explicit path

Methods

MethodReturnsDescription
get(key, default=None)AnyGet value, return default if missing
params[key]AnyGet value, raise KeyError if missing
params[key] = valueSet value
has(key)boolCheck if key exists
key in paramsboolSame as has(key)
keys()List[str]All parameter names
len(params)intNumber of parameters
save()Persist to disk
remove(key)boolRemove a key, returns True if existed
reset()Reset all parameters to defaults

Example: Live PID Tuning

import horus

params = horus.Params()

def controller_tick(node):
    # Read gains from params — change at runtime via CLI or monitor
    kp = params.get("pid_kp", 1.0)
    ki = params.get("pid_ki", 0.1)
    kd = params.get("pid_kd", 0.01)
    max_speed = params.get("max_speed", 1.5)

    error = target - current
    output = min(kp * error, max_speed)
    node.send("cmd_vel", output)

controller = horus.Node(name="PIDController", tick=controller_tick,
                        rate=100, pubs=["cmd_vel"])
horus.run(controller)

Set parameters at runtime:

horus param set pid_kp 2.5
horus param set max_speed 0.8
horus param list

Rate Limiter

horus.Rate provides drift-compensated rate limiting for background threads and standalone loops. For nodes, use the rate= constructor kwarg instead.

Rate(hz)

Create a rate limiter targeting hz iterations per second.

Methods

MethodReturnsDescription
sleep()Sleep until next cycle. Compensates for work time to maintain target rate
actual_hz()floatMeasured frequency (smoothed average)
target_hz()floatTarget frequency
period()floatTarget period in seconds (1/hz)
is_late()boolTrue if current cycle exceeded the target period
reset()Reset timing (call after a pause to avoid burst catch-up)

Example: Camera Capture Thread

import threading
from horus import Rate, Topic, Image

def camera_loop():
    rate = Rate(30)  # 30 FPS target
    topic = Topic(Image)

    while running:
        frame = capture_camera()
        topic.send(frame)

        if rate.is_late():
            print(f"Camera behind: {rate.actual_hz():.1f} Hz (target {rate.target_hz():.0f})")

        rate.sleep()

thread = threading.Thread(target=camera_loop, daemon=True)
thread.start()

Hardware Drivers

horus.drivers loads hardware connections from horus.toml's [drivers] section.

Module Functions

FunctionReturnsDescription
drivers.load()HardwareSetLoad drivers from horus.toml
drivers.load_from(path)HardwareSetLoad from explicit YAML path
drivers.register_driver(name, cls)Register a Python driver class

HardwareSet

Returned by drivers.load(). Provides typed accessors for each hardware type.

MethodReturnsDescription
hw.list()List[str]All driver names
hw.has(name)boolCheck if driver exists
name in hwboolSame as has(name)
len(hw)intNumber of drivers
hw.dynamixel(name)DriverParamsDynamixel servo bus config
hw.serial(name)DriverParamsSerial port config
hw.i2c(name)DriverParamsI2C bus config
hw.can(name)DriverParamsCAN bus config
hw.gpio(name)DriverParamsGPIO pin config

All typed accessors: dynamixel, rplidar, realsense, i2c, serial, can, gpio, pwm, usb, webcam, input, bluetooth, net, ethercat, spi, adc, raw.

DriverParams

Dict-like access to a driver's configuration values.

MethodReturnsDescription
params[key]AnyGet value (KeyError if missing)
params.get(key)AnyGet value (KeyError if missing)
params.get_or(key, default)AnyGet with default if missing
params.has(key)boolCheck if key exists
params.keys()List[str]All parameter names

Example

# horus.toml
[drivers]
arm = { type = "dynamixel", port = "/dev/ttyUSB0", baudrate = 1000000 }
lidar = { type = "rplidar", port = "/dev/ttyUSB1" }
import horus

hw = horus.drivers.load()

if hw.has("arm"):
    arm = hw.dynamixel("arm")
    port = arm.get_or("port", "/dev/ttyUSB0")
    baud = arm.get_or("baudrate", 115200)
    print(f"Arm on {port} at {baud}")

print(f"Available drivers: {hw.list()}")

Unit Constants

from horus import us, ms

us  # 1e-6 — microseconds to seconds
ms  # 1e-3 — milliseconds to seconds

# Use with budget/deadline for readability
node = horus.Node(tick=fn, rate=1000, budget=300 * us, deadline=900 * us)

Error Types

from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError

try:
    tf.tf("missing_frame", "base")
except HorusTransformError as e:
    print(f"Transform failed: {e}")

try:
    tf.wait_for_transform("src", "dst", timeout_sec=1.0)
except HorusTimeoutError:
    print("Timed out waiting for transform")
ExceptionRust sourceRaised when
HorusNotFoundErrorNotFound(...)Missing topic, frame, node
HorusTransformErrorTransform(...)TF extrapolation, stale data
HorusTimeoutErrorTimeout(...)Blocking operation timed out

Other Rust errors map to stdlib: ConfigValueError, IoIOError, MemoryMemoryError, etc.


See Also


Common Patterns

Producer-Consumer

# Producer
producer = horus.Node(
    pubs="queue",
    tick=lambda n: n.send("queue", generate_work())
)

# Consumer
consumer = horus.Node(
    subs="queue",
    tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)

horus.run(producer, consumer)

Request-Response

def request_node(node):
    node.send("requests", {"id": 1, "query": "data"})

def response_node(node):
    if node.has_msg("requests"):
        req = node.recv("requests")
        response = handle_request(req)
        node.send("responses", response)

req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)

Periodic Tasks

import time

class PeriodicTask:
    def __init__(self, interval):
        self.interval = interval
        self.last_run = 0

task = PeriodicTask(interval=5.0)  # Every 5 seconds

def periodic_tick(node):
    current = time.time()
    if current - task.last_run >= task.interval:
        node.send("periodic", "task_executed")
        task.last_run = current

node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)

Troubleshooting

Import Errors

# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release

Slow Performance

# Use release build (not debug)
maturin develop --release

# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30)  # 30Hz is reasonable

Memory Issues

# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
    all_data.append(node.recv("input"))  # Memory leak!

# GOOD:
def good_tick(node):
    data = node.recv("input")
    process_and_discard(data)  # Process immediately

Monitor Integration and Logging

Current Limitations

Python nodes currently do NOT appear in the HORUS monitor logs.

The Python bindings do not integrate with the Rust logging system:

# Python nodes use standard print() for logging
print("Debug message")  # Visible in console, not in monitor

What this means:

  • Python nodes communicate via shared memory
  • All message passing functionality works
  • Python log messages don't appear in monitor logs
  • Use print() for Python-side debugging

Monitoring Python Nodes

Since Python nodes don't integrate with the monitor logging system, use these alternatives:

  1. Node-level logging methods:
def tick(node):
    node.log_info("Processing sensor data")
    node.log_warning("Sensor reading is stale")
    node.log_error("Failed to process data")
    node.log_debug("Debug information")

# These print to console, not monitor
  1. Manual topic monitoring:
def tick(node):
    if node.has_msg("input"):
        data = node.recv("input")
        print(f"[{node.name}] Received: {data}")
        node.send("output", result)
        print(f"[{node.name}] Published: {result}")
  1. Node statistics:
scheduler = horus.Scheduler()
scheduler.add(node)
scheduler.run(duration=10)

# Get stats after running
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")

Future Improvements

Monitor integration for Python nodes is planned for a future release. This will include:

  • Full NodeInfo context in Python callbacks
  • LogSummary for Python message types
  • Python node logs visible in the monitor TUI and web dashboard

Custom Exceptions

HORUS defines three custom exception types plus maps internal errors to standard Python exceptions:

from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError

try:
    result = some_horus_operation()
except HorusNotFoundError:
    print("Resource not found")
except HorusTransformError:
    print("Transform computation failed")
except HorusTimeoutError:
    print("Operation timed out")

Custom exceptions (inherit from Exception):

ExceptionWhen RaisedRust Source
HorusNotFoundErrorTopic, frame, node, or parent frame not foundHorusError::NotFound
HorusTransformErrorTransform extrapolation or stale dataHorusError::Transform
HorusTimeoutErrorBlocking operation exceeded time limitHorusError::Timeout

Standard Python exceptions raised by HORUS operations:

Python ExceptionWhen RaisedRust Source
IOErrorFile or IPC I/O failuresHorusError::Io
MemoryErrorShared memory or pool allocation failuresHorusError::Memory
ValueErrorInvalid parameters, bad config, parse errorsHorusError::InvalidInput, InvalidDescriptor, Parse, Config
TypeErrorSerialization/deserialization failuresHorusError::Serialization
RuntimeErrorInternal or unmapped errorsAll other variants

All exceptions preserve the original Rust error message, so you get full context:

try:
    tf = tf_tree.tf("nonexistent", "world")
except HorusNotFoundError as e:
    print(e)  # "Frame not found: nonexistent"

try:
    img = Image(height=-1, width=640, encoding="rgb8")
except ValueError as e:
    print(e)  # "Invalid input: height must be positive"

Catch hierarchy — order matters when catching:

try:
    result = horus_operation()
except HorusNotFoundError:
    pass  # Specific: missing resource
except HorusTransformError:
    pass  # Specific: TF failure
except HorusTimeoutError:
    pass  # Specific: deadline exceeded
except (ValueError, TypeError):
    pass  # Bad input or serialization
except (IOError, MemoryError):
    pass  # System-level failures
except RuntimeError:
    pass  # Catch-all for internal errors

See Also


Remember: With HORUS Python, you focus on what your robot does, not how the framework works!