Python Examples
All examples use the standard horus.Node callback API — one pattern, no inheritance.
import horus
def my_tick(node):
node.send("output", data)
node = horus.Node(name="my_node", pubs=["output"], tick=my_tick, rate=30)
horus.run(node)
Basic Node
A minimal node that reads sensor data and publishes motor commands:
import horus
def controller(node):
"""Simple obstacle avoidance"""
if node.has_msg("sensor.distance"):
distance = node.recv("sensor.distance")
if distance < 0.5:
node.send("motor.cmd", {"linear": 0.0, "angular": 0.5})
else:
node.send("motor.cmd", {"linear": 1.0, "angular": 0.0})
node = horus.Node(
name="obstacle_avoider",
subs=["sensor.distance"],
pubs=["motor.cmd"],
tick=controller,
rate=10
)
horus.run(node)
Typed Topics
Using typed messages for proper logging and cross-language compatibility:
import horus
def controller(node):
if node.has_msg("localization.pose"):
pose = node.recv("localization.pose")
# Logs show: Pose2D { x: 2.31, y: 1.31, theta: 0.5 }
cmd = horus.CmdVel(linear=1.0, angular=0.0)
node.send("control.cmd", cmd)
node = horus.Node(
name="controller",
subs={"localization.pose": {"type": horus.Pose2D}},
pubs={"control.cmd": {"type": horus.CmdVel}},
tick=controller,
rate=30
)
horus.run(node)
Available typed messages: CmdVel, Pose2D, Imu, Odometry, LaserScan, JointState, MotorCommand, and 50+ more.
Multi-Node System
Multiple nodes with execution order:
import horus
def sensor_tick(node):
node.send("sensor.distance", 1.5)
def controller_tick(node):
if node.has_msg("sensor.distance"):
dist = node.recv("sensor.distance")
if dist < 0.5:
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.5))
else:
node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))
def logger_tick(node):
if node.has_msg("cmd_vel"):
cmd = node.recv("cmd_vel")
print(f"Command: linear={cmd.linear:.1f} angular={cmd.angular:.1f}")
sensor = horus.Node(name="sensor", pubs=["sensor.distance"], tick=sensor_tick, rate=30)
controller = horus.Node(
name="controller",
subs=["sensor.distance"],
pubs={"cmd_vel": {"type": horus.CmdVel}},
tick=controller_tick,
rate=30
)
logger = horus.Node(name="logger", subs=["cmd_vel"], tick=logger_tick, rate=10)
# Quick run — no scheduler needed
horus.run(sensor, controller, logger, duration=10.0)
Scheduler with Execution Control
When you need execution order, failure policies, or RT features:
import horus
sensor = horus.Node(name="sensor", pubs=["scan"], tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", subs=["scan"], pubs=["cmd"], tick=ctrl_tick, rate=100, order=1)
logger = horus.Node(name="logger", subs=["cmd"], tick=log_tick, rate=10, order=10)
scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)
# Critical — runs first
scheduler.add(sensor)
# RT control — runs after sensor
scheduler.add(controller)
# Non-critical — runs last
scheduler.add(logger)
scheduler.run()
Context Manager
sensor = horus.Node(name="sensor", tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_tick, rate=100, order=1)
with horus.Scheduler(tick_rate=100) as sched:
sched.add(sensor)
sched.add(controller)
sched.run(duration=30.0)
# auto-stop on exit
Advanced Node Configuration
For execution classes (compute, async I/O, event-driven), configure via Node kwargs:
scheduler = horus.Scheduler(tick_rate=500, rt=True)
# Event-triggered: ticks when "lidar_scan" topic receives data
scheduler.add(horus.Node(tick=detect_fn, on="lidar_scan", order=0))
# Compute pool: CPU-bound work on separate thread pool
scheduler.add(horus.Node(tick=plan_fn, compute=True, rate=10))
# Async I/O: non-blocking network/file operations
scheduler.add(horus.Node(tick=upload_fn, rate=1, failure_policy="ignore"))
scheduler.run()
Sensor Processing Pipeline
Processing laser scans for obstacle avoidance:
import horus
def obstacle_avoidance(node):
if node.has_msg("scan"):
scan = node.recv("scan")
if scan and hasattr(scan, 'ranges') and scan.ranges:
min_dist = min(r for r in scan.ranges if r > 0.01)
if min_dist < 0.5:
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.5))
else:
node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))
node = horus.Node(
name="obstacle_avoider",
subs={"scan": {"type": horus.LaserScan}},
pubs={"cmd_vel": {"type": horus.CmdVel}},
tick=obstacle_avoidance,
rate=10
)
horus.run(node)
Camera Image Pipeline
Send and receive images with zero-copy shared memory:
import horus
import numpy as np
# Create image backed by shared memory
img = horus.Image(480, 640, "rgb8")
# Fill from NumPy (zero-copy when possible)
pixels = np.zeros((480, 640, 3), dtype=np.uint8)
pixels[:, :, 2] = 255 # Blue
img = horus.Image.from_numpy(pixels)
# Send over topic
topic = horus.Topic("camera.rgb")
topic.send(img)
# Receive (in another node or process)
received = topic.recv()
if received is not None:
arr = received.to_numpy() # zero-copy NumPy
tensor = received.to_torch() # zero-copy PyTorch via DLPack
print(f"Received {arr.shape[1]}x{arr.shape[0]} image")
Cross-Language Communication
Python and Rust nodes communicate via shared memory topics — same machine, automatic:
Python publisher:
import horus
import time
topic = horus.Topic(horus.CmdVel)
while True:
topic.send(horus.CmdVel(linear=1.0, angular=0.2))
time.sleep(0.1)
Rust subscriber (separate process):
use horus::prelude::*;
let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
loop {
if let Some(cmd) = topic.recv() {
println!("linear={}, angular={}", cmd.linear, cmd.angular);
}
}
Time API
Framework clock for deterministic-compatible code:
import horus
def physics_tick(node):
dt = horus.dt() # Fixed in deterministic mode
elapsed = horus.elapsed() # Time since start
tick = horus.tick() # Tick number
rng = horus.rng_float() # Deterministic random [0, 1)
budget = horus.budget_remaining() # Seconds left in budget
# Physics integration
node.velocity += node.acceleration * dt
node.position += node.velocity * dt
node = horus.Node(name="physics", tick=physics_tick, rate=100)
horus.run(node)
| Function | Returns | Description |
|---|---|---|
horus.now() | float | Current time (seconds) |
horus.dt() | float | Timestep (seconds) |
horus.elapsed() | float | Time since start (seconds) |
horus.tick() | int | Current tick number |
horus.budget_remaining() | float | Budget left (seconds, inf if none) |
horus.rng_float() | float | Random in [0, 1) |
horus.timestamp_ns() | int | Nanosecond timestamp |
Async Node
Nodes with async def tick functions run on the async I/O thread pool — perfect for network requests and file I/O:
import horus
async def fetch_weather(node):
"""Fetch weather data from an API every tick"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("http://api.weather.local/current") as resp:
data = await resp.json()
node.send("weather", data)
weather = horus.Node(
name="weather_fetcher",
pubs=["weather"],
tick=fetch_weather, # async def auto-detected
rate=1 # 1 Hz — fetch every second
)
horus.run(weather)
HORUS auto-detects async def and runs it on the async executor. No manual event loop setup needed.
ML Inference
ONNX model inference with performance monitoring:
import horus
import numpy as np
def detect_tick(node):
if node.has_msg("camera.rgb"):
img = node.recv("camera.rgb")
arr = img.to_numpy()
# Preprocess
input_data = preprocess(arr)
# Run inference
results = model.run(None, {"input": input_data})[0]
# Publish detections
for det in parse_detections(results):
node.send("detections", det)
node = horus.Node(
name="detector",
subs={"camera.rgb": {"type": horus.Image}},
pubs=["detections"],
tick=detect_tick,
rate=30,
compute=True,
order=5
)
scheduler = horus.Scheduler(tick_rate=30)
scheduler.add(node) # Runs on compute pool
scheduler.run()
See Also
- Python Bindings - Core Python API
- Message Library - Available message types
- Time API (Rust reference) - Full time API documentation
- Deterministic Mode - Deterministic execution guide