HORUS Python Bindings
Production-Ready Python API for the HORUS robotics framework - combines simplicity with advanced features for professional robotics applications.
Why HORUS Python?
- Zero Boilerplate: Working node in 10 lines
- Flexible API: Functional style or class inheritance - your choice
- Production Performance: ~500ns latency (same shared memory as Rust)
- Per-Node Rate Control: Different nodes at different frequencies (100Hz sensor, 10Hz logger)
- Message Timestamps: Typed messages include
timestamp_nsfor timing - Typed Messages: Optional type-safe messages from Rust
- Multiprocess Support: Process isolation and multi-language nodes
- Pythonic: Feels like native Python, not a foreign function wrapper
- Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.
Quick Start
Installation
Automatic (Recommended)
Python bindings are automatically installed when you run the HORUS installer:
# From HORUS root directory
./install.sh
The installer will detect Python 3.9+ and automatically build and install the bindings.
Manual Installation
If you prefer to install manually or need to rebuild:
# Install maturin (Python/Rust build tool)
# Option A: Via Cargo (recommended for Ubuntu 24.04+)
cargo install maturin
# Option B: Via pip (if not blocked by PEP 668)
# pip install maturin
# Build and install from source
cd horus_py
maturin develop --release
Requirements:
- Python 3.9+
- Rust 1.70+
- Linux (for shared memory support)
Minimal Example
import horus
def process(node):
node.send("output", "Hello HORUS!")
node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)
This minimal example demonstrates functional-style node creation without class boilerplate.
Core API
Creating a Node
def Node(
name: str = "", # Node name (auto-generated from tick function if empty)
subs: str | list = "", # Topics to subscribe (string, list of strings, or list of message types)
pubs: str | list = "", # Topics to publish (string, list of strings, or list of message types)
tick: Callable = None, # Function called every tick: tick(node) -> None
init: Callable = None, # Optional: called once on startup: init(node) -> None
shutdown: Callable = None, # Optional: called on graceful shutdown: shutdown(node) -> None
rate: int = 30, # Tick rate in Hz
order: int = 100, # Execution priority (lower = earlier)
budget: float = None, # Max tick duration in seconds (None = auto from rate)
deadline: float = None, # Hard deadline in seconds (None = auto from rate)
on_miss: str = None, # "warn", "skip", "safe_mode", "stop" (None = "warn")
) -> Node
Parameters:
name— Node identifier. If empty, derived from tick function name.subs/pubs— Topic declarations. Accepts:"topic_name",["topic1", "topic2"], or typed[CmdVel, LaserScan](see formats below).tick— Main loop function, called atrateHz. Receives the node instance.rate— Tick frequency in Hz. Default: 30. Setting rate auto-enables RT scheduling.order— Priority within scheduler tick (0-9: critical, 10-49: sensors, 50-99: processing, 100+: background).
Example:
from horus import Node, CmdVel, LaserScan, Imu
node = Node(
name="controller",
pubs=[CmdVel], # Typed — fast Pod zero-copy (~1.5μs)
subs=[LaserScan, Imu], # Auto-named: "scan", "imu"
tick=control_fn,
rate=100,
order=0,
)
Topic declaration formats (determines performance path):
# FAST — typed (Pod zero-copy, ~2.7μs send+recv)
pubs=[CmdVel] # auto-name from type: "cmd_vel"
pubs=[CmdVel, Pose2D] # multiple typed topics
pubs={"motor": CmdVel} # custom name + type
# GENERIC — string (MessagePack, ~10μs send+recv)
pubs=["data"] # GenericMessage — for dicts and custom data
pubs="single_topic" # shorthand for single string topic
Parameters:
name(str, optional): Node name (auto-generated if omitted)pubs: Topics to publish —[CmdVel](typed, fast) or["name"](generic)subs: Topics to subscribe — same formats as pubstick(callable): Function called each cycle, receives(node)as argumentrate(float): Execution rate in Hz (default: 30)init(callable, optional): Setup function, called once at startshutdown(callable, optional): Cleanup function, called once at endon_error(callable, optional): Error handler, called if tick raises an exceptiondefault_capacity(int, optional): Buffer capacity for auto-created topics (default: 1024)
Alternative: Class as State Container
For nodes with complex state, use a plain class and pass its method as the tick function:
import horus
class SensorState:
def __init__(self):
self.reading = 0.0
def tick(self, node):
self.reading += 0.1
node.send("temperature", self.reading)
def init(self, node):
print("Sensor initialized!")
def shutdown(self, node):
print("Sensor shutting down!")
# Use it
state = SensorState()
sensor = horus.Node(name="sensor", tick=state.tick, init=state.init,
shutdown=state.shutdown, pubs=["temperature"], rate=10)
horus.run(sensor)
Both patterns work! Use functional style for simplicity or class containers for complex nodes with state.
Node Functions
Your tick function receives the node as a parameter:
def my_tick(node):
# Check for messages
if node.has_msg("input"):
data = node.recv("input") # Get one message
# Get all messages
all_msgs = node.recv_all("input")
# Send messages
node.send("output", {"value": 42})
Node Methods:
send
def send(topic: str, data: Any) -> None
Publish a message to a topic. Non-blocking. Overwrites oldest if buffer is full.
Parameters:
topic: str— Topic name (must be in the node'spubslist)data: Any— Message to send. Can be: a dict, a typed message (CmdVel,Image, etc.), or any serializable object
node.send("cmd_vel", {"linear": 1.0, "angular": 0.0}) # dict
node.send("cmd_vel", horus.CmdVel(1.0, 0.0)) # typed message
recv
def recv(topic: str) -> Optional[Any]
Receive one message from a topic (FIFO order). Returns None if no messages available.
Parameters:
topic: str— Topic name (must be in the node'ssubslist)
Returns: The message, or None if buffer is empty.
msg = node.recv("scan")
if msg is not None:
print(f"Got {len(msg.ranges)} ranges")
node.recv_all(topic) -> list
Receive ALL available messages as a list. Drains the buffer completely. Returns an empty list if none available.
Use this for batch processing when you need to handle every message, not just the latest:
def tick(node):
# Process all queued commands (don't drop any)
commands = node.recv_all("commands")
for cmd in commands:
execute_command(cmd)
node.log_debug(f"Processed {len(commands)} commands this tick")
node.has_msg(topic) -> bool
Check if at least one unread message is available on the topic without consuming it. The message is buffered internally and returned by the next recv() call.
def tick(node):
if node.has_msg("emergency_stop"):
stop = node.recv("emergency_stop")
node.log_warning("Emergency stop received!")
node.request_stop()
node.request_stop()
Request the scheduler to shut down gracefully after the current tick completes. Use this to stop execution programmatically from within a node.
def tick(node):
if horus.tick() >= 1000:
node.log_info("Reached 1000 ticks, stopping")
node.request_stop()
error = check_safety()
if error:
node.log_error(f"Safety violation: {error}")
node.request_stop()
node.publishers() -> List[str]
Returns the list of topic names this node publishes to.
def init(node):
node.log_info(f"Publishing to: {node.publishers()}")
node.log_info(f"Subscribing to: {node.subscribers()}")
node.subscribers() -> List[str]
Returns the list of topic names this node subscribes to.
Logging Methods
| Method | Description |
|---|---|
node.log_info(msg) | Log an informational message |
node.log_warning(msg) | Log a warning message |
node.log_error(msg) | Log an error message |
node.log_debug(msg) | Log a debug message |
Important: Logging only works during init(), tick(), or shutdown() callbacks. Calling outside the scheduler raises a RuntimeWarning and the message is silently dropped.
def tick(node):
node.log_info("Processing sensor data")
node.log_warning("Sensor reading is stale")
node.log_error("Failed to process data")
node.log_debug(f"Raw value: {value}")
# Outside scheduler — message is dropped with RuntimeWarning:
node = horus.Node(name="test", tick=tick)
node.log_info("This will be dropped!") # RuntimeWarning
Node scheduling kwargs (maps 1:1 to Rust NodeBuilder):
| Kwarg | Default | Rust equivalent |
|---|---|---|
rate | 30 | .rate() |
order | 100 | .order() |
budget | None | .budget() |
deadline | None | .deadline() |
on_miss | None (warn) | .on_miss() |
failure_policy | None (fatal) | .failure_policy() |
compute | False | .compute() |
on | None | .on(topic) |
priority | None | .priority() |
core | None | .core() |
watchdog | None | .watchdog() |
async tick | auto-detected | .async_io() |
:::tip Python Timing Guide Budget/deadline in Python detect overruns, not guarantee timing. Python ticks take milliseconds, not microseconds. Use realistic values:
# Rust node: microsecond budget
Node(tick=motor_ctrl, rate=1000, budget=300 * us) # 300μs — Rust can do this
# Python ML node: millisecond budget — detects when inference is too slow
Node(tick=run_model, rate=30, budget=50 * ms) # 50ms — triggers on_miss if exceeded
compute=True is useful when your tick calls C extensions that release the GIL (NumPy, PyTorch, OpenCV) — they run in parallel on the thread pool.
priority/core are for mixed Rust+Python systems — tell the OS to schedule Rust RT nodes before Python, and keep Python off RT cores. :::
Running Nodes
def run(*nodes: Node, duration: float = None) -> None
Convenience one-liner: creates a Scheduler, adds all nodes, and runs.
Parameters:
*nodes: Node— One or more Node instances to runduration: float— Optional. Run for this many seconds, then stop.None= run until Ctrl+C.
# Single node — runs until Ctrl+C
horus.run(node)
# Multiple nodes for 10 seconds
horus.run(node1, node2, node3, duration=10)
Examples
1. Simple Publisher
import horus
def publish_temperature(node):
node.send("temperature", 25.5)
sensor = horus.Node(
name="temp_sensor",
pubs="temperature",
tick=publish_temperature,
rate=1 # 1 Hz
)
horus.run(sensor, duration=10)
2. Subscriber
import horus
def display_temperature(node):
if node.has_msg("temperature"):
temp = node.recv("temperature")
print(f"Temperature: {temp}°C")
display = horus.Node(
name="display",
subs="temperature",
tick=display_temperature
)
horus.run(display)
3. Pub/Sub Pipeline
import horus
def publish(node):
node.send("raw", 42.0)
def process(node):
if node.has_msg("raw"):
data = node.recv("raw")
result = data * 2.0
node.send("processed", result)
def display(node):
if node.has_msg("processed"):
value = node.recv("processed")
print(f"Result: {value}")
# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)
# Run all together
horus.run(publisher, processor, displayer, duration=5)
4. Using Lambda Functions
import horus
# Producer (inline)
producer = horus.Node(
pubs="numbers",
tick=lambda n: n.send("numbers", 42),
rate=1
)
# Transformer (inline)
doubler = horus.Node(
subs="numbers",
pubs="doubled",
tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)
horus.run(producer, doubler, duration=5)
5. Multi-Topic Robot Controller
import horus
def robot_controller(node):
# Read from multiple sensors
lidar_data = None
camera_data = None
if node.has_msg("lidar"):
lidar_data = node.recv("lidar")
if node.has_msg("camera"):
camera_data = node.recv("camera")
# Compute commands
if lidar_data and camera_data:
cmd = compute_navigation(lidar_data, camera_data)
node.send("motors", cmd)
node.send("status", "navigating")
robot = horus.Node(
name="robot_controller",
subs=["lidar", "camera"],
pubs=["motors", "status"],
tick=robot_controller,
rate=50 # 50Hz control loop
)
6. Lifecycle Management
import horus
class Context:
def __init__(self):
self.count = 0
self.file = None
ctx = Context()
def init_handler(node):
print("Starting up!")
ctx.file = open("data.txt", "w")
def tick_handler(node):
ctx.count += 1
data = f"Tick {ctx.count}"
node.send("data", data)
ctx.file.write(data + "\n")
def shutdown_handler(node):
print(f"Processed {ctx.count} messages")
ctx.file.close()
node = horus.Node(
pubs="data",
init=init_handler,
tick=tick_handler,
shutdown=shutdown_handler,
rate=10
)
horus.run(node, duration=5)
Advanced Features (Production-Ready)
HORUS Python includes advanced features that match or exceed ROS2 capabilities while maintaining simplicity.
NodeState
The NodeState enum tracks which lifecycle phase a node is in:
from horus import NodeState
# Values:
NodeState.IDLE # Created but not yet running
NodeState.RUNNING # Actively ticking
NodeState.PAUSED # Temporarily suspended
NodeState.STOPPED # Clean shutdown complete
NodeState.ERROR # Error state
NodeState values are strings — you can compare directly:
if node_state == "running":
print("Node is active")
Scheduler
def Scheduler(
tick_rate: int = 100, # Global tick rate in Hz
rt: bool = False, # Enable real-time scheduling (prefer_rt)
watchdog_ms: int = 0, # Watchdog timeout in ms (0 = disabled)
deterministic: bool = False, # Deterministic execution mode
verbose: bool = True, # Enable verbose logging
) -> Scheduler
The Scheduler orchestrates node execution with priority ordering, per-node rate control, and real-time features.
Methods:
scheduler.add(node: Node) -> None— Register a nodescheduler.run() -> None— Start the main loop (blocks until Ctrl+C)scheduler.run_for(seconds: float) -> None— Run for a duration, then stopscheduler.tick_once() -> None— Execute one tick cycle (for testing)scheduler.stop() -> None— Request graceful shutdown
Creating a Scheduler:
# All config on Node(), scheduler.add() takes only the node
scheduler = horus.Scheduler()
scheduler.add(horus.Node(tick=motor_fn, rate=1000, order=0, budget=200))
scheduler.add(horus.Node(tick=planner_fn, order=5, compute=True))
scheduler.add(horus.Node(tick=telemetry_fn, rate=1, order=10))
Node configuration (kwargs on Node()):
| Method | Description |
|---|---|
.order(n) | Execution priority (lower = runs first) |
.rate(hz) | Node tick rate in Hz — auto-derives budget/deadline, marks as RT |
.budget(us) | Tick budget in microseconds |
.on_miss(policy) | "warn", "skip", "safe_mode", or "stop" |
.on(topic) | Event-driven — wakes only when topic has new data |
.compute() | Offload to worker thread pool (planning, ML) |
.async_io() | Run on async executor (network, disk) |
.failure_policy(name, ...) | "fatal", "restart", "skip", or "ignore" — optional kwargs: max_retries, backoff_ms, max_failures, cooldown_ms |
.build() | Finalize and register — returns Scheduler |
Adding Nodes:
All configuration (order, rate, budget, etc.) goes on the Node() constructor. scheduler.add() takes only the node:
sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=100, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)
Execution:
| Method | Description |
|---|---|
scheduler.run() | Run until Ctrl+C or .stop() |
scheduler.run(duration=10.0) | Run for a specific duration, then shut down |
scheduler.stop() | Signal graceful shutdown |
scheduler.current_tick() | Current tick count |
Monitoring:
| Method | Description |
|---|---|
scheduler.get_node_stats(name) | Stats dict: total_ticks, errors_count, avg_tick_duration_ms, etc. |
scheduler.set_node_rate(name, rate) | Change a node's tick rate at runtime |
scheduler.set_tick_budget(name, us) | Update per-node tick budget (microseconds) |
scheduler.get_all_nodes() | List all nodes with their configuration |
scheduler.get_node_count() | Number of registered nodes |
scheduler.has_node(name) | Check if a node is registered |
scheduler.get_node_names() | List of registered node names |
scheduler.remove_node(name) | Remove a node (returns True if found) |
scheduler.status() | Formatted status string |
scheduler.capabilities() | Dict of RT capabilities |
scheduler.has_full_rt() | True if all RT features available |
scheduler.safety_stats() | Dict of budget overruns, deadline misses, watchdog expirations |
Recording & Replay:
| Method | Description |
|---|---|
scheduler.is_recording() | Check if recording is active |
scheduler.is_replaying() | Check if replaying |
scheduler.stop_recording() | Stop recording, returns list of saved file paths |
Scheduler.list_recordings() | List available recordings (static method) |
Scheduler.delete_recording(name) | Delete a recording (static method) |
Context Manager:
The Scheduler supports the with statement for automatic cleanup:
with horus.Scheduler(tick_rate=100) as sched:
sched.add(horus.Node(tick=sensor_fn, rate=100, order=0))
sched.add(horus.Node(tick=ctrl_fn, rate=100, order=1))
sched.run(duration=10.0)
# stop() called automatically on exit, even if an exception occurs
Expanded Method Details:
scheduler.get_node_stats(name) -> dict
Returns a dictionary with detailed statistics for the named node:
stats = scheduler.get_node_stats("motor_ctrl")
print(f"Total ticks: {stats['total_ticks']}")
print(f"Avg tick: {stats.get('avg_tick_duration_ms', 0):.2f} ms")
print(f"Errors: {stats['errors_count']}")
Keys include: name, priority, total_ticks, successful_ticks, failed_ticks, avg_tick_duration_ms, max_tick_duration_ms, errors_count, uptime_seconds.
scheduler.status() -> str
Returns the current scheduler state: "idle", "running", or "stopped".
scheduler.current_tick() -> int
Returns the current tick count (0-indexed).
scheduler.set_node_rate(name, rate)
Change a node's tick rate at runtime. Useful for adaptive control:
# Slow down logging when battery is low
if battery_low:
scheduler.set_node_rate("logger", 1) # 1 Hz
else:
scheduler.set_node_rate("logger", 10) # 10 Hz
scheduler.run(duration=None)
Start the scheduler tick loop. Blocks until Ctrl+C, stop(), or duration expires.
scheduler.run() # Run forever (until Ctrl+C)
scheduler.run(duration=30.0) # Run for 30 seconds, then stop
scheduler.stop()
Signal graceful shutdown. All nodes' shutdown() callbacks run before exit.
# From another thread or a node's tick:
scheduler.stop()
scheduler.get_all_nodes() -> List[Dict]
Returns all registered nodes with their configuration.
for node in scheduler.get_all_nodes():
print(f"{node['name']}: order={node.get('order', '?')}")
scheduler.get_node_count() -> int
Number of registered nodes.
print(f"Running {scheduler.get_node_count()} nodes")
scheduler.has_node(name) -> bool
Check if a node with the given name is registered.
if scheduler.has_node("motor_ctrl"):
stats = scheduler.get_node_stats("motor_ctrl")
scheduler.get_node_names() -> List[str]
List of all registered node names.
print(f"Nodes: {scheduler.get_node_names()}")
scheduler.remove_node(name) -> bool
Remove a node by name. Returns True if found and removed.
if scheduler.remove_node("debug_logger"):
print("Debug logger removed")
scheduler.capabilities() -> Dict
Returns a dict of detected RT capabilities (SCHED_FIFO, memory locking, CPU affinity, etc.).
caps = scheduler.capabilities()
print(f"RT priority: {caps.get('max_priority', 'N/A')}")
print(f"Memory lock: {caps.get('memory_locking', False)}")
scheduler.has_full_rt() -> bool
Returns True if all requested RT features are available (no degradations).
if not scheduler.has_full_rt():
print("Warning: running with degraded RT — check capabilities()")
scheduler.safety_stats() -> Dict
Returns safety monitor statistics: budget overruns, deadline misses, watchdog expirations.
stats = scheduler.safety_stats()
if stats:
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
print(f"Budget overruns: {stats.get('budget_overruns', 0)}")
scheduler.is_recording() -> bool
Check if session recording is currently active.
scheduler.is_replaying() -> bool
Check if the scheduler is replaying a recorded session.
scheduler.stop_recording() -> List[str]
Stop recording and return the list of saved file paths.
if scheduler.is_recording():
paths = scheduler.stop_recording()
print(f"Saved recordings: {paths}")
Scheduler.list_recordings() -> List[str]
List available recording sessions (static method).
recordings = horus.Scheduler.list_recordings()
for r in recordings:
print(f" {r}")
Scheduler.delete_recording(name) -> bool
Delete a recording by name (static method).
scheduler.tick(node_names) -> None
Execute one tick cycle for the specified nodes only. Essential for deterministic testing — run exactly one tick and verify output.
scheduler = horus.Scheduler(tick_rate=100)
scheduler.add(sensor)
scheduler.add(controller)
# Test: run one tick, check output
scheduler.tick(["Sensor", "Controller"])
stats = scheduler.get_node_stats("Controller")
assert stats["total_ticks"] == 1
scheduler.tick_for(node_names, duration_seconds) -> None
Execute ticks for the specified nodes over a duration. Useful for benchmarking and timed test runs.
# Benchmark sensor processing for 5 seconds
scheduler.tick_for(["SensorProcessor"], 5.0)
stats = scheduler.get_node_stats("SensorProcessor")
print(f"Processed {stats['total_ticks']} ticks in 5s")
scheduler.is_running() -> bool
Check if the scheduler is currently executing its tick loop.
import threading
def monitor_thread(sched):
while sched.is_running():
print(f"Tick: {sched.current_tick()}")
time.sleep(1.0)
t = threading.Thread(target=monitor_thread, args=(scheduler,), daemon=True)
t.start()
scheduler.run(duration=10.0)
scheduler.get_node_info(name) -> Optional[int]
Get the execution order (priority) for a named node. Returns None if the node is not registered.
order = scheduler.get_node_info("motor_ctrl")
if order is not None:
print(f"motor_ctrl runs at order {order}")
scheduler.degradations() -> List[Dict]
Returns a list of RT feature degradations — features that were requested but couldn't be applied (e.g., SCHED_FIFO unavailable without root).
scheduler = horus.Scheduler(tick_rate=1000, rt=True)
# ... add nodes and run ...
for d in scheduler.degradations():
print(f"Degraded: {d.get('feature')} — {d.get('reason')}")
Each dict contains feature (what was requested) and reason (why it couldn't be applied).
horus.run() — The ONE way to run nodes:
from horus import Node, run, us
sensor = Node(tick=read_lidar, rate=10, order=0, pubs=["scan"])
ctrl = Node(tick=navigate, rate=30, order=1, subs=["scan"], pubs=["cmd"])
motor = Node(tick=drive, rate=1000, order=2, budget=300*us, subs=["cmd"])
# All scheduler config as kwargs
run(sensor, ctrl, motor, rt=True, watchdog_ms=500)
run(node, duration=10, deterministic=True)
All run() kwargs (maps to Rust Scheduler builder):
| Kwarg | Default | Rust equivalent |
|---|---|---|
duration | None (forever) | .run() / .run_for() |
tick_rate | 1000.0 | .tick_rate() |
rt | False | .prefer_rt() |
deterministic | False | .deterministic() |
watchdog_ms | 0 | .watchdog() |
blackbox_mb | 0 | .blackbox() |
recording | False | .with_recording() |
name | None | .name() |
cores | None | .cores() |
max_deadline_misses | None | .max_deadline_misses() |
verbose | False | .verbose() |
telemetry | None | .telemetry() |
Miss — Deadline Miss Policy
The Miss class defines what happens when a node exceeds its deadline:
from horus import Miss
# Available policies
Miss.WARN # Log warning and continue (default)
Miss.SKIP # Skip the node for this tick
Miss.SAFE_MODE # Call enter_safe_state() on the node
Miss.STOP # Stop the entire scheduler
Use via the Node constructor:
# Config on Node, then add
motor = horus.Node(name="motor", tick=motor_fn, rate=500, order=0, budget=200, on_miss="safe_mode")
scheduler.add(motor)
Scheduler Configuration
All configuration via constructor kwargs:
from horus import Scheduler, Node
# Development — simple
scheduler = Scheduler()
# Production — watchdog + RT
scheduler = Scheduler(tick_rate=1000, rt=True, watchdog_ms=500)
# With blackbox + telemetry
scheduler = Scheduler(
tick_rate=1000,
watchdog_ms=500,
blackbox_mb=64,
telemetry="http://localhost:9090",
verbose=True,
)
# Deterministic mode for simulation/testing
scheduler = Scheduler(tick_rate=100, deterministic=True)
Testing with short runs:
scheduler = Scheduler()
scheduler.add(Node(name="sensor", tick=sensor_fn, rate=100, order=0))
scheduler.add(Node(name="ctrl", tick=ctrl_fn, rate=100, order=1))
# Run for a short duration
scheduler.run(duration=0.1)
Message Timestamps
Timestamps are managed by the Rust Topic backend. Typed messages include a timestamp_ns field for nanosecond-precision timing:
import horus
import time
def control_tick(node):
if node.has_msg("sensor_data"):
msg = node.recv("sensor_data")
# Use message-level timestamps for latency checks
if hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
age_s = (time.time_ns() - msg.timestamp_ns) / 1e9
if age_s > 0.1: # More than 100ms old
node.log_warning(f"Stale data: {age_s*1000:.1f}ms old")
return
latency = age_s
print(f"Latency: {latency*1000:.1f}ms")
# Process fresh data
process(msg)
Timestamp access: Use msg.timestamp_ns on typed messages (CmdVel, Pose2D, Imu, etc.) for nanosecond timestamps set by the Rust backend.
Multiprocess Execution
Run Python nodes in separate processes for isolation and multi-language support:
# Run multiple Python files as separate processes
horus run node1.py node2.py node3.py
# Mix Python and Rust nodes
horus run sensor.rs controller.py visualizer.py
# Mix Rust and Python
horus run lidar_driver.rs planner.py motor_control.rs
All nodes in the same horus run session automatically communicate via shared memory!
Example - Distributed System:
# sensor_node.py
import horus
def sensor_tick(node):
data = read_lidar() # Your sensor code
node.send("lidar_data", data)
sensor = horus.Node(name="lidar", pubs="lidar_data", tick=sensor_tick)
horus.run(sensor)
# controller_node.py
import horus
def control_tick(node):
if node.has_msg("lidar_data"):
data = node.recv("lidar_data")
cmd = compute_control(data)
node.send("motor_cmd", cmd)
controller = horus.Node(
name="controller",
subs="lidar_data",
pubs="motor_cmd",
tick=control_tick
)
horus.run(controller)
# Run both in separate processes
horus run sensor_node.py controller_node.py
Benefits:
- Process isolation: One crash doesn't kill everything
- Multi-language: Mix Python and Rust nodes in the same application
- Parallel execution: True multicore utilization
- Zero configuration: Shared memory IPC automatically set up
Complete Example: All Features Together
import horus
import time
def sensor_tick(node):
"""High-frequency sensor (100Hz)"""
imu = {"accel_x": 1.0, "accel_y": 0.0, "accel_z": 9.8}
node.send("imu_data", imu)
node.log_info("Published IMU data")
def control_tick(node):
"""Medium-frequency control (50Hz)"""
if node.has_msg("imu_data"):
imu = node.recv("imu_data")
cmd = {"linear": 1.0, "angular": 0.0}
node.send("cmd_vel", cmd)
def logger_tick(node):
"""Low-frequency logging (10Hz)"""
if node.has_msg("cmd_vel"):
msg = node.recv("cmd_vel")
node.log_info(f"Command received: {msg}")
# Create nodes with rate and order configured on the Node
sensor = horus.Node(name="imu", pubs="imu_data", tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", subs="imu_data", pubs="cmd_vel", tick=control_tick, rate=50, order=1)
logger = horus.Node(name="log", subs="cmd_vel", tick=logger_tick, rate=10, order=2)
# Add nodes to scheduler
scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)
scheduler.run(duration=5.0)
# Check statistics
stats = scheduler.get_node_stats("imu")
print(f"Sensor: {stats['total_ticks']} ticks in 5 seconds")
Network Communication
HORUS Python supports network communication for distributed multi-machine systems. Topic, and Router all work transparently over the network.
Topic Network Endpoints
Add an endpoint parameter to communicate over the network:
from horus import Topic, CmdVel
# Local (shared memory) - default
local_topic = Topic(CmdVel)
# Network (UDP direct)
network_topic = Topic(CmdVel, endpoint="cmdvel@192.168.1.100:8000")
# Router (TCP broker for WAN/NAT traversal)
router_topic = Topic(CmdVel, endpoint="cmdvel@router")
Endpoint Syntax:
"topic"- Local shared memory (~500ns latency)"topic@host:port"- Direct UDP (<50μs latency)"topic@router"- Router broker (auto-discovery on localhost:7777)"topic@192.168.1.100:7777"- Router broker at specific address
Topic Methods
| Method | Description |
|---|---|
topic.send(msg, node=None) | Send a message. Pass optional node for automatic IPC logging. Returns True. |
topic.recv(node=None) | Receive one message. Returns the message or None if empty. |
topic.name | Property: the topic name string |
topic.backend_type | Property: the active backend name (e.g., "direct", "spsc_shm") |
topic.is_network_topic | Property: True if this topic uses network transport |
topic.endpoint | Property: the endpoint string, or None for local topics |
topic.stats() | Returns a dict with messages_sent, messages_received, send_failures, recv_failures, is_network, backend |
topic.is_generic() | Returns True if this is a generic (string-name) topic |
Example:
from horus import Topic, CmdVel
topic = Topic(CmdVel)
# Send and receive typed messages
topic.send(CmdVel(linear=1.0, angular=0.5))
msg = topic.recv()
if msg:
print(f"linear={msg.linear}, angular={msg.angular}")
# Check topic properties
print(f"Name: {topic.name}") # "cmd_vel"
print(f"Backend: {topic.backend_type}") # e.g. "mpmc_shm"
print(f"Stats: {topic.stats()}")
Generic Topics
When you create a Topic with a string name (instead of a typed class), you get a generic topic that accepts any JSON-serializable data:
from horus import Topic, CmdVel
# Generic topic (string name = dynamic typing)
topic = Topic("my_topic")
# Typed topic (class = static typing, better performance)
typed_topic = Topic(CmdVel)
Generic topics use the same send() and recv() methods as typed topics, but accept any JSON-serializable Python object. Data is serialized via MessagePack internally.
from horus import Topic
topic = Topic("sensor_data")
# Send dict, list, or any JSON-serializable data
topic.send({"temperature": 25.5, "humidity": 60.0})
topic.send([1.0, 2.0, 3.0, 4.0])
topic.send("status: OK")
# Receive (returns Python object)
msg = topic.recv() # {"temperature": 25.5, "humidity": 60.0}
# Check if generic
print(topic.is_generic()) # True
Typed vs Generic Performance:
| Topic Type | Serialization | Use Case |
|---|---|---|
Typed (Topic(CmdVel)) | Direct field extraction (no serde) | Production, cross-language, high-frequency |
Generic (Topic("name")) | Python → JSON → MessagePack | Dynamic schemas, prototyping, Python-only |
Automatic Transport Selection
HORUS automatically selects the fastest communication path based on where publishers and subscribers are located. You never need to configure this manually:
from horus import Topic, CmdVel
# Just create a topic — HORUS picks the fastest path automatically:
# Same-thread: ~3ns (when pub+sub are in the same node)
# Same-process: ~18-36ns (when pub+sub are in different nodes, same process)
# Cross-process: ~85-167ns (when pub+sub are in different processes)
topic = Topic("cmd_vel", CmdVel)
Automatic Transport Tiers:
| Scenario | Latency | When it applies |
|---|---|---|
| Same thread | ~3ns | Publisher and subscriber are in the same node |
| Same process (1:1) | ~18ns | One publisher, one subscriber, same process |
| Same process (many:1) | ~26ns | Multiple publishers, one subscriber, same process |
| Same process (many:many) | ~36ns | Multiple publishers and subscribers, same process |
| Cross-process (1:1) | ~85ns | One publisher, one subscriber, different processes |
| Cross-process (many:many) | ~167ns | Multiple publishers and subscribers, different processes |
Router Client (WAN/NAT Traversal)
For communication across networks, through NAT, or for large-scale deployments, use the Router:
from horus import RouterClient, Topic, CmdVel
# Create router client for explicit connection management
router = RouterClient("192.168.1.100", 7777)
# Build endpoints through the router
cmd_endpoint = router.endpoint("cmdvel") # Returns "cmdvel@192.168.1.100:7777"
pose_endpoint = router.endpoint("pose")
# Use endpoints with Topic
topic = Topic(CmdVel, endpoint=cmd_endpoint)
# Router properties
print(f"Address: {router.address}") # "192.168.1.100:7777"
print(f"Connected: {router.is_connected}") # True
print(f"Topics: {router.topics}") # ["cmdvel", "pose"]
print(f"Uptime: {router.uptime_seconds}s")
Helper Functions:
from horus import default_router_endpoint, router_endpoint
# Default router (localhost:7777)
ep1 = default_router_endpoint("cmdvel") # "cmdvel@router"
# Custom router address
ep2 = router_endpoint("cmdvel", "192.168.1.100", 7777) # "cmdvel@192.168.1.100:7777"
Router Server (for testing):
from horus import RouterServer
# Start a local router (for development/testing)
server = RouterServer(port=7777)
server.start()
# For production, use CLI instead:
# $ horus router start --port 7777
When to Use What
| Transport | Latency | Use Case |
|---|---|---|
Same-process (Topic(CmdVel)) | ~18-36ns | In-process communication (automatic) |
Cross-process, 1:1 (Topic(CmdVel)) | ~85ns | Same machine, one publisher and one subscriber |
Cross-process, many:many (Topic(CmdVel)) | ~167ns | Same machine, multiple publishers and subscribers |
Network (endpoint="topic@host:port") | <50μs | Multi-machine on LAN (direct UDP) |
Router (endpoint="topic@router") | 10-50ms | WAN, NAT traversal, cloud deployments |
Multi-Machine Example
# === ROBOT (192.168.1.50) ===
from horus import Topic, CmdVel, Imu, Odometry
# Local: Critical flight control (ultra-fast)
imu_topic = Topic(Imu) # ~85ns local shared memory
# Network: Telemetry to ground station
telemetry = Topic(Odometry, endpoint="telem@192.168.1.100:8000")
# Network: Commands from ground station
commands = Topic(CmdVel, endpoint="cmd@0.0.0.0:8001")
# === GROUND STATION (192.168.1.100) ===
from horus import Topic, CmdVel, Odometry
# Receive telemetry from robot
telemetry_sub = Topic(Odometry, endpoint="telem@0.0.0.0:8000")
# Send commands to robot
command_pub = Topic(CmdVel, endpoint="cmd@192.168.1.50:8001")
Integration with Python Ecosystem
NumPy Integration
import horus
import numpy as np
def process_array(node):
if node.has_msg("raw_data"):
data = node.recv("raw_data")
# Convert to NumPy array
arr = np.array(data)
# Process with NumPy
result = np.fft.fft(arr)
node.send("fft_result", result.tolist())
processor = horus.Node(
subs="raw_data",
pubs="fft_result",
tick=process_array
)
OpenCV Integration
import horus
import cv2
import numpy as np
def process_image(node):
if node.has_msg("camera"):
img_data = node.recv("camera")
# Convert to OpenCV format
img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))
# Apply OpenCV processing
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Publish result
node.send("edges", edges.flatten().tolist())
vision = horus.Node(
subs="camera",
pubs="edges",
tick=process_image,
rate=30
)
scikit-learn Integration
import horus
from sklearn.linear_model import LinearRegression
import numpy as np
model = LinearRegression()
def train_model(node):
if node.has_msg("training_data"):
data = node.recv("training_data")
X = np.array(data['features'])
y = np.array(data['labels'])
# Train model
model.fit(X, y)
score = model.score(X, y)
node.send("model_score", score)
trainer = horus.Node(
subs="training_data",
pubs="model_score",
tick=train_model
)
Advanced Patterns
State Management
import horus
class RobotState:
def __init__(self):
self.position = {"x": 0.0, "y": 0.0}
self.velocity = 0.0
self.last_update = 0
state = RobotState()
def update_state(node):
if node.has_msg("velocity"):
state.velocity = node.recv("velocity")
if node.has_msg("position"):
state.position = node.recv("position")
# Publish combined state
node.send("robot_state", {
"pos": state.position,
"vel": state.velocity
})
state_manager = horus.Node(
subs=["velocity", "position"],
pubs="robot_state",
tick=update_state
)
Rate Limiting
import horus
import time
class RateLimiter:
def __init__(self, min_interval):
self.min_interval = min_interval
self.last_send = 0
limiter = RateLimiter(min_interval=0.1) # 100ms minimum
def rate_limited_publish(node):
current_time = time.time()
if current_time - limiter.last_send >= limiter.min_interval:
node.send("output", "data")
limiter.last_send = current_time
node = horus.Node(
pubs="output",
tick=rate_limited_publish,
rate=100 # Node runs at 100Hz, but publishes at max 10Hz
)
Error Handling
import horus
def safe_processing(node):
try:
if node.has_msg("input"):
data = node.recv("input")
result = risky_operation(data)
node.send("output", result)
except Exception as e:
node.send("errors", str(e))
print(f"Error: {e}")
processor = horus.Node(
subs="input",
pubs=["output", "errors"],
tick=safe_processing
)
Performance Tips
1. Use Per-Node Rate Control
# Configure rate and order on the Node, then add to scheduler
sensor = horus.Node(name="sensor", tick=sensor_fn, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_fn, rate=50, order=1)
logger = horus.Node(name="logger", tick=log_fn, rate=10, order=2)
scheduler = horus.Scheduler()
scheduler.add(sensor)
scheduler.add(controller)
scheduler.add(logger)
scheduler.run()
# Monitor performance with get_node_stats()
stats = scheduler.get_node_stats("sensor")
print(f"Sensor executed {stats['total_ticks']} ticks")
2. Check Message Freshness
import time
def control_tick(node):
if node.has_msg("sensor_data"):
data = node.recv("sensor_data")
# Use message-level timestamps for staleness checks
if hasattr(data, 'timestamp_ns') and data.timestamp_ns:
age_s = (time.time_ns() - data.timestamp_ns) / 1e9
if age_s > 0.1:
node.log_warning("Skipping stale sensor data")
return
process(data)
3. Use Dicts for Messages
# Send messages as Python dicts (automatically serialized to JSON)
cmd = {"linear": 1.5, "angular": 0.8}
node.send("cmd_vel", cmd)
# For staleness checks, use typed messages with timestamp_ns
# or track send time at the application level
4. Batch Processing
# Use node.recv_all() to process all available messages at once
def batch_processor(node):
messages = node.recv_all("input")
if messages:
results = [process(msg) for msg in messages]
for result in results:
node.send("output", result)
5. Keep tick() Fast
# GOOD: Fast tick
def good_tick(node):
if node.has_msg("input"):
data = node.recv("input")
result = quick_operation(data)
node.send("output", result)
# BAD: Slow tick
def bad_tick(node):
time.sleep(1) # Don't block!
data = requests.get("http://api.example.com") # Don't do I/O!
6. Offload Heavy Processing
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def heavy_processing_node(node):
if node.has_msg("input"):
data = node.recv("input")
# Offload to thread pool
future = executor.submit(expensive_operation, data)
# Don't block - check result later or use callback
7. Use Multiprocess for CPU-Intensive Tasks
# Isolate heavy processing in separate processes
horus run sensor.py heavy_vision.py light_controller.py
# Each node gets its own CPU core
Development
Building from Source
# Debug build (fast compile, slow runtime)
cd horus_py
maturin develop
# Release build (slow compile, fast runtime)
maturin develop --release
# Build wheel for distribution
maturin build --release
Running Tests
# Install test dependencies
pip install pytest
# Run all tests
pytest tests/
# Run specific feature tests
horus run tests/test_rate_control.py # Phase 1: Per-node rates
horus run tests/test_timestamps.py # Phase 2: Timestamps
horus run tests/test_typed_messages.py # Phase 3: Typed messages
# With coverage
pytest --cov=horus tests/
# Test multiprocess execution (Phase 4)
horus run tests/multiprocess_publisher.py tests/multiprocess_subscriber.py
Mock Mode
HORUS Python includes a mock mode for testing without Rust bindings:
# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."
# Use for unit testing Python logic without HORUS running
Debugging Tips
# Check node statistics
scheduler = horus.Scheduler()
scheduler.add(my_node)
# Check node statistics
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}, Errors: {stats['errors_count']}")
# Monitor message timestamps via message-level fields
msg = node.recv("topic")
if msg and hasattr(msg, 'timestamp_ns') and msg.timestamp_ns:
age = (time.time_ns() - msg.timestamp_ns) / 1e9
print(f"Message age: {age*1000:.1f}ms")
Interoperability
With Rust Nodes
Important: For cross-language communication, use typed topics by passing a message type to Topic().
Cross-Language with Typed Topics
# Python node with typed topic
from horus import Topic, CmdVel
cmd_topic = Topic(CmdVel) # Typed topic
cmd_topic.send(CmdVel(linear=1.0, angular=0.5))
// Rust node receives
use horus::prelude::*;
let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
if let Some(cmd) = topic.recv() {
println!("Got: linear={}, angular={}", cmd.linear, cmd.angular);
}
Generic Topic (String Topics)
# Generic Topic - for custom topics
from horus import Topic
topic = Topic("my_topic") # Pass string for generic topic
topic.send({"linear": 1.0, "angular": 0.5}) # Uses JSON serialization
Typed topics: Use Topic(CmdVel), Topic(Pose2D) for cross-language communication.
See Python Message Library for details.
Time API
Framework-aware time functions. Use these instead of time.time() — they integrate with deterministic mode and SimClock.
Quick reference:
| Function | Returns | Description |
|---|---|---|
horus.now() | float | Current time in seconds |
horus.dt() | float | Timestep for this tick in seconds |
horus.elapsed() | float | Time since scheduler start |
horus.tick() | int | Current tick number |
horus.budget_remaining() | float | Time left in tick budget |
horus.rng_float() | float | Random float in [0, 1) |
horus.timestamp_ns() | int | Nanosecond timestamp |
horus.now() -> float
Current framework time in seconds.
- Normal mode: Wall clock (
time.time()equivalent) - Deterministic mode: Virtual SimClock that advances by fixed
dteach tick
def tick(node):
t = horus.now()
node.send("timestamp", t)
horus.dt() -> float
Timestep for this tick in seconds. Use this for physics integration instead of measuring elapsed time manually.
- Normal mode: Actual elapsed time since last tick
- Deterministic mode: Fixed
1.0 / rate— identical across runs
def tick(node):
# PID controller using dt() for correct integration
error = target - current
integral += error * horus.dt()
derivative = (error - prev_error) / horus.dt()
output = kp * error + ki * integral + kd * derivative
horus.elapsed() -> float
Time elapsed since the scheduler started, in seconds.
def tick(node):
if horus.elapsed() > 30.0:
node.log_info("Running for 30+ seconds, stabilized")
horus.tick() -> int
Current tick number (0-indexed, increments each scheduler cycle).
def tick(node):
if horus.tick() % 100 == 0:
node.log_info(f"Tick {horus.tick()}: system healthy")
horus.budget_remaining() -> float
Time remaining in this tick's budget, in seconds. Returns float('inf') if no budget is configured.
Use this for adaptive quality — do more work when time permits, skip expensive operations when tight.
def tick(node):
# Always do critical work
process_sensor_data()
# Only do expensive work if budget allows
if horus.budget_remaining() > 0.001: # >1ms remaining
run_expensive_optimization()
horus.rng_float() -> float
Random float in [0.0, 1.0).
- Normal mode: System entropy (non-deterministic)
- Deterministic mode: Tick-seeded RNG — produces identical sequences across runs
def tick(node):
# Simulated sensor noise (reproducible in deterministic mode)
noise = (horus.rng_float() - 0.5) * 0.1
reading = true_value + noise
horus.timestamp_ns() -> int
Current timestamp in nanoseconds. Use for TransformFrame queries and message timestamping.
def tick(node):
ts = horus.timestamp_ns()
transform = tf.lookup("camera", "base_link", ts)
Deterministic Mode
When using horus.run(..., deterministic=True), the time functions switch from wall clock to SimClock:
| Function | Normal Mode | Deterministic Mode |
|---|---|---|
now() | Wall clock | SimClock (virtual) |
dt() | Actual elapsed | Fixed 1/rate |
elapsed() | Real elapsed | Virtual elapsed |
rng_float() | System entropy | Tick-seeded (reproducible) |
tick() | Same | Same |
budget_remaining() | Same | Same |
timestamp_ns() | Real nanoseconds | Virtual nanoseconds |
This ensures identical behavior across runs — critical for simulation, testing, and replay.
Runtime Parameters
horus.Params provides dict-like access to runtime configuration stored in .horus/config/params.yaml. Change PID gains, speed limits, and thresholds without recompiling.
Params(path=None)
Create a parameter store.
Params()— loads from.horus/config/params.yaml(default)Params("path/to/file.yaml")— loads from explicit path
Methods
| Method | Returns | Description |
|---|---|---|
get(key, default=None) | Any | Get value, return default if missing |
params[key] | Any | Get value, raise KeyError if missing |
params[key] = value | — | Set value |
has(key) | bool | Check if key exists |
key in params | bool | Same as has(key) |
keys() | List[str] | All parameter names |
len(params) | int | Number of parameters |
save() | — | Persist to disk |
remove(key) | bool | Remove a key, returns True if existed |
reset() | — | Reset all parameters to defaults |
Example: Live PID Tuning
import horus
params = horus.Params()
def controller_tick(node):
# Read gains from params — change at runtime via CLI or monitor
kp = params.get("pid_kp", 1.0)
ki = params.get("pid_ki", 0.1)
kd = params.get("pid_kd", 0.01)
max_speed = params.get("max_speed", 1.5)
error = target - current
output = min(kp * error, max_speed)
node.send("cmd_vel", output)
controller = horus.Node(name="PIDController", tick=controller_tick,
rate=100, pubs=["cmd_vel"])
horus.run(controller)
Set parameters at runtime:
horus param set pid_kp 2.5
horus param set max_speed 0.8
horus param list
Rate Limiter
horus.Rate provides drift-compensated rate limiting for background threads and standalone loops. For nodes, use the rate= constructor kwarg instead.
Rate(hz)
Create a rate limiter targeting hz iterations per second.
Methods
| Method | Returns | Description |
|---|---|---|
sleep() | — | Sleep until next cycle. Compensates for work time to maintain target rate |
actual_hz() | float | Measured frequency (smoothed average) |
target_hz() | float | Target frequency |
period() | float | Target period in seconds (1/hz) |
is_late() | bool | True if current cycle exceeded the target period |
reset() | — | Reset timing (call after a pause to avoid burst catch-up) |
Example: Camera Capture Thread
import threading
from horus import Rate, Topic, Image
def camera_loop():
rate = Rate(30) # 30 FPS target
topic = Topic(Image)
while running:
frame = capture_camera()
topic.send(frame)
if rate.is_late():
print(f"Camera behind: {rate.actual_hz():.1f} Hz (target {rate.target_hz():.0f})")
rate.sleep()
thread = threading.Thread(target=camera_loop, daemon=True)
thread.start()
Hardware Drivers
horus.drivers loads hardware connections from horus.toml's [drivers] section.
Module Functions
| Function | Returns | Description |
|---|---|---|
drivers.load() | HardwareSet | Load drivers from horus.toml |
drivers.load_from(path) | HardwareSet | Load from explicit YAML path |
drivers.register_driver(name, cls) | — | Register a Python driver class |
HardwareSet
Returned by drivers.load(). Provides typed accessors for each hardware type.
| Method | Returns | Description |
|---|---|---|
hw.list() | List[str] | All driver names |
hw.has(name) | bool | Check if driver exists |
name in hw | bool | Same as has(name) |
len(hw) | int | Number of drivers |
hw.dynamixel(name) | DriverParams | Dynamixel servo bus config |
hw.serial(name) | DriverParams | Serial port config |
hw.i2c(name) | DriverParams | I2C bus config |
hw.can(name) | DriverParams | CAN bus config |
hw.gpio(name) | DriverParams | GPIO pin config |
All typed accessors: dynamixel, rplidar, realsense, i2c, serial, can, gpio, pwm, usb, webcam, input, bluetooth, net, ethercat, spi, adc, raw.
DriverParams
Dict-like access to a driver's configuration values.
| Method | Returns | Description |
|---|---|---|
params[key] | Any | Get value (KeyError if missing) |
params.get(key) | Any | Get value (KeyError if missing) |
params.get_or(key, default) | Any | Get with default if missing |
params.has(key) | bool | Check if key exists |
params.keys() | List[str] | All parameter names |
Example
# horus.toml
[drivers]
arm = { type = "dynamixel", port = "/dev/ttyUSB0", baudrate = 1000000 }
lidar = { type = "rplidar", port = "/dev/ttyUSB1" }
import horus
hw = horus.drivers.load()
if hw.has("arm"):
arm = hw.dynamixel("arm")
port = arm.get_or("port", "/dev/ttyUSB0")
baud = arm.get_or("baudrate", 115200)
print(f"Arm on {port} at {baud}")
print(f"Available drivers: {hw.list()}")
Unit Constants
from horus import us, ms
us # 1e-6 — microseconds to seconds
ms # 1e-3 — milliseconds to seconds
# Use with budget/deadline for readability
node = horus.Node(tick=fn, rate=1000, budget=300 * us, deadline=900 * us)
Error Types
from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError
try:
tf.tf("missing_frame", "base")
except HorusTransformError as e:
print(f"Transform failed: {e}")
try:
tf.wait_for_transform("src", "dst", timeout_sec=1.0)
except HorusTimeoutError:
print("Timed out waiting for transform")
| Exception | Rust source | Raised when |
|---|---|---|
HorusNotFoundError | NotFound(...) | Missing topic, frame, node |
HorusTransformError | Transform(...) | TF extrapolation, stale data |
HorusTimeoutError | Timeout(...) | Blocking operation timed out |
Other Rust errors map to stdlib: Config → ValueError, Io → IOError, Memory → MemoryError, etc.
See Also
- Transform Frame — Coordinate transforms (
TransformFrame,Transform) - Perception — Detection, landmarks, tracking (
DetectionList,PointXYZ,COCOPose) - Memory Types — Image, PointCloud, DepthImage (zero-copy)
- Async Nodes — Async tick functions
- ML Utilities — PyTorch/ONNX inference helpers
Common Patterns
Producer-Consumer
# Producer
producer = horus.Node(
pubs="queue",
tick=lambda n: n.send("queue", generate_work())
)
# Consumer
consumer = horus.Node(
subs="queue",
tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)
horus.run(producer, consumer)
Request-Response
def request_node(node):
node.send("requests", {"id": 1, "query": "data"})
def response_node(node):
if node.has_msg("requests"):
req = node.recv("requests")
response = handle_request(req)
node.send("responses", response)
req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)
Periodic Tasks
import time
class PeriodicTask:
def __init__(self, interval):
self.interval = interval
self.last_run = 0
task = PeriodicTask(interval=5.0) # Every 5 seconds
def periodic_tick(node):
current = time.time()
if current - task.last_run >= task.interval:
node.send("periodic", "task_executed")
task.last_run = current
node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)
Troubleshooting
Import Errors
# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release
Slow Performance
# Use release build (not debug)
maturin develop --release
# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30) # 30Hz is reasonable
Memory Issues
# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
all_data.append(node.recv("input")) # Memory leak!
# GOOD:
def good_tick(node):
data = node.recv("input")
process_and_discard(data) # Process immediately
Monitor Integration and Logging
Current Limitations
Python nodes currently do NOT appear in the HORUS monitor logs.
The Python bindings do not integrate with the Rust logging system:
# Python nodes use standard print() for logging
print("Debug message") # Visible in console, not in monitor
What this means:
- Python nodes communicate via shared memory
- All message passing functionality works
- Python log messages don't appear in monitor logs
- Use
print()for Python-side debugging
Monitoring Python Nodes
Since Python nodes don't integrate with the monitor logging system, use these alternatives:
- Node-level logging methods:
def tick(node):
node.log_info("Processing sensor data")
node.log_warning("Sensor reading is stale")
node.log_error("Failed to process data")
node.log_debug("Debug information")
# These print to console, not monitor
- Manual topic monitoring:
def tick(node):
if node.has_msg("input"):
data = node.recv("input")
print(f"[{node.name}] Received: {data}")
node.send("output", result)
print(f"[{node.name}] Published: {result}")
- Node statistics:
scheduler = horus.Scheduler()
scheduler.add(node)
scheduler.run(duration=10)
# Get stats after running
stats = scheduler.get_node_stats("my_node")
print(f"Ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")
Future Improvements
Monitor integration for Python nodes is planned for a future release. This will include:
- Full
NodeInfocontext in Python callbacks LogSummaryfor Python message types- Python node logs visible in the monitor TUI and web dashboard
Custom Exceptions
HORUS defines three custom exception types plus maps internal errors to standard Python exceptions:
from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError
try:
result = some_horus_operation()
except HorusNotFoundError:
print("Resource not found")
except HorusTransformError:
print("Transform computation failed")
except HorusTimeoutError:
print("Operation timed out")
Custom exceptions (inherit from Exception):
| Exception | When Raised | Rust Source |
|---|---|---|
HorusNotFoundError | Topic, frame, node, or parent frame not found | HorusError::NotFound |
HorusTransformError | Transform extrapolation or stale data | HorusError::Transform |
HorusTimeoutError | Blocking operation exceeded time limit | HorusError::Timeout |
Standard Python exceptions raised by HORUS operations:
| Python Exception | When Raised | Rust Source |
|---|---|---|
IOError | File or IPC I/O failures | HorusError::Io |
MemoryError | Shared memory or pool allocation failures | HorusError::Memory |
ValueError | Invalid parameters, bad config, parse errors | HorusError::InvalidInput, InvalidDescriptor, Parse, Config |
TypeError | Serialization/deserialization failures | HorusError::Serialization |
RuntimeError | Internal or unmapped errors | All other variants |
All exceptions preserve the original Rust error message, so you get full context:
try:
tf = tf_tree.tf("nonexistent", "world")
except HorusNotFoundError as e:
print(e) # "Frame not found: nonexistent"
try:
img = Image(height=-1, width=640, encoding="rgb8")
except ValueError as e:
print(e) # "Invalid input: height must be positive"
Catch hierarchy — order matters when catching:
try:
result = horus_operation()
except HorusNotFoundError:
pass # Specific: missing resource
except HorusTransformError:
pass # Specific: TF failure
except HorusTimeoutError:
pass # Specific: deadline exceeded
except (ValueError, TypeError):
pass # Bad input or serialization
except (IOError, MemoryError):
pass # System-level failures
except RuntimeError:
pass # Catch-all for internal errors
See Also
- Examples - More code examples
- Core Concepts - Understanding HORUS architecture
- Monitor - Real-time monitoring and visualization
- Python Message Library - Typed message classes
- Multi-Language Support - Cross-language communication
- Performance - Optimization guide
- Rust Scheduler API — Rust Scheduler API reference
- Rust Standard Messages — All Rust POD message types
Remember: With HORUS Python, you focus on what your robot does, not how the framework works!