Advanced Patterns (Python)
Beyond the basics of Node, Scheduler, and run(), HORUS provides execution classes, runtime mutation, deterministic simulation, and recording — all accessible from Python kwargs. This page covers every advanced pattern with working code.
Compute Nodes
By default, nodes run on the main tick thread sequentially. compute=True moves a node to a parallel thread pool, freeing the main thread for time-critical nodes.
When to Use
Use compute=True for CPU-heavy work that would block other nodes: ML inference, SLAM computation, path planning, image processing. The scheduler runs compute nodes concurrently with the main tick thread.
import horus
import numpy as np
model = load_model("yolov8n.onnx")
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
frame = img.to_numpy()
detections = model.predict(frame)
for det in detections:
node.send("detections", {
"class": det.class_name,
"confidence": float(det.confidence),
"bbox": [det.x1, det.y1, det.x2, det.y2],
})
detector = horus.Node(
name="yolo_detector",
subs=[horus.Image],
pubs=["detections"],
tick=detect_tick,
rate=30,
compute=True, # Runs on thread pool, not main tick thread
budget=30 * horus.ms, # 30ms budget per frame
on_miss="skip", # Skip frame if inference overruns
)
horus.run(detector, tick_rate=100)
How It Works
Without compute=True, a 25ms ML inference at 30Hz would block the main tick thread for 25ms every 33ms — starving any 100Hz control node on the same scheduler. With compute=True, inference runs on a separate thread pool and does not delay the main tick.
| Configuration | Main Thread | Thread Pool |
|---|---|---|
| Default (no flags) | Runs this node | Not used |
compute=True | Free for other nodes | Runs this node |
async def tick | Free for other nodes | Runs on async I/O pool |
Constraint: compute=True is mutually exclusive with async def tick and on="topic". A node can only have one execution class.
Async I/O Nodes
For network requests, database queries, file I/O, and WebSocket connections, use async def tick. The scheduler auto-detects async functions and runs them on a dedicated I/O thread pool.
import horus
import aiohttp
async def cloud_tick(node):
if node.has_msg("telemetry"):
data = node.recv("telemetry")
try:
async with aiohttp.ClientSession() as session:
await session.post(
"https://api.myrobot.io/telemetry",
json=data,
timeout=aiohttp.ClientTimeout(total=2.0),
)
except aiohttp.ClientError as e:
node.log_warning(f"Upload failed: {e}")
uploader = horus.Node(
name="cloud_uploader",
subs=["telemetry"],
tick=cloud_tick, # async def auto-detected
rate=1, # 1Hz — upload every second
)
horus.run(uploader)
No configuration needed beyond writing async def. The scheduler inspects the function and routes it to the I/O executor automatically.
Async Init and Shutdown
init and shutdown callbacks can also be async — useful for establishing database connections or closing network sessions:
import asyncpg
async def setup(node):
node.db = await asyncpg.connect("postgresql://localhost/robotics")
async def store(node):
if node.has_msg("sensor.data"):
data = node.recv("sensor.data")
await node.db.execute(
"INSERT INTO readings (value, ts) VALUES ($1, $2)",
data["value"], data["timestamp"],
)
async def cleanup(node):
await node.db.close()
db_node = horus.Node(
name="db_logger",
subs=["sensor.data"],
tick=store,
init=setup,
shutdown=cleanup,
rate=10,
)
Timeout Discipline
Async ticks that hang will block scheduler shutdown. Always wrap network calls:
import asyncio
async def safe_tick(node):
try:
result = await asyncio.wait_for(fetch_data(), timeout=2.0)
node.send("data", result)
except asyncio.TimeoutError:
node.log_warning("Fetch timed out, skipping tick")
Event-Driven Nodes
Event-driven nodes tick only when a specific topic receives data, rather than at a fixed rate. Use on="topic.name" for sparse events like emergency stops, button presses, or configuration changes.
import horus
def emergency_handler(node):
event = node.recv("emergency.stop")
if event:
node.log_warning("Emergency stop triggered!")
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
node.send("status", {"state": "emergency_stopped"})
estop = horus.Node(
name="estop_handler",
subs=["emergency.stop"],
pubs=[horus.CmdVel, "status"],
tick=emergency_handler,
on="emergency.stop", # Only tick when this topic has data
)
horus.run(estop)
Behavior
- The node does not tick on the scheduler's fixed rate
- When a message arrives on the trigger topic, the node ticks once
- Multiple messages arriving between scheduler cycles result in one tick (not one per message)
node.recv()inside the tick returns the latest message
Constraint: on="topic" is mutually exclusive with compute=True and async def tick.
Rate-Limited Events
Combine on="topic" with rate= to cap how often an event-driven node can fire:
# Ticks on new config, but no more than once per second
config_node = horus.Node(
name="config_watcher",
subs=["system.config"],
tick=apply_config,
on="system.config",
rate=1, # Max 1Hz even if config changes faster
)
Rate-Limited Compute
Combine compute=True with rate= to run CPU-heavy work at a lower frequency than the scheduler tick rate:
# Scheduler ticks at 100Hz, but SLAM only runs at 10Hz on the thread pool
slam_node = horus.Node(
name="slam",
subs=[horus.LaserScan, horus.Odometry],
pubs=["map", "localization.pose"],
tick=slam_tick,
compute=True,
rate=10, # 10Hz SLAM updates
budget=80 * horus.ms,
on_miss="warn",
)
# Controller runs at full 100Hz on the main tick thread
controller = horus.Node(
name="controller",
subs=["localization.pose"],
pubs=[horus.CmdVel],
tick=controller_tick,
rate=100,
order=1,
)
horus.run(slam_node, controller, tick_rate=100)
The scheduler skips the compute node on ticks where it is not scheduled, so the main thread never waits for it.
Runtime Mutation
The scheduler supports modifying nodes while running — adjusting rates, budgets, and even adding or removing nodes.
Changing Node Rate
sched = horus.Scheduler(tick_rate=100)
sched.add(sensor)
sched.add(controller)
# Later (e.g., from a monitoring thread or on_error callback):
sched.set_node_rate("sensor", 200) # Speed up sensor to 200Hz
sched.set_node_rate("controller", 50) # Slow down controller to 50Hz
Changing Tick Budget
# Tighten budget after profiling shows headroom
sched.set_tick_budget("motor_controller", 150) # 150 μs
# Loosen budget during degraded mode
sched.set_tick_budget("motor_controller", 500) # 500 μs
Removing Nodes
# Remove a malfunctioning node at runtime
if error_count > threshold:
removed = sched.remove_node("flaky_sensor")
if removed:
print("Removed flaky_sensor from scheduler")
Adaptive Rate Pattern
Adjust node rate based on system load:
def monitor_tick(node):
stats = node.info.get_metrics()
avg_ms = stats.get("avg_tick_duration_ms", 0)
if avg_ms > 8.0: # Getting close to 10ms budget
node.log_warning(f"High load: {avg_ms:.1f}ms avg tick")
# Could signal another node to reduce rate via topic
monitor = horus.Node(
name="load_monitor",
tick=monitor_tick,
rate=1,
order=999, # Run last
)
Deterministic Mode
Deterministic mode makes every run produce identical output — essential for testing, simulation, and debugging. It enables SimClock (fixed dt), seeded RNG, and sequential execution.
Basic Usage
sched = horus.Scheduler(tick_rate=100, deterministic=True)
In deterministic mode:
horus.dt()returns exactly1/rate(fixed timestep, not wall clock)horus.now()advances by exactlydteach tick (SimClock)horus.rng_float()returns tick-seeded values (same sequence every run)- Nodes execute sequentially in
order— no thread pool randomness
tick_once() for Testing
Step through ticks manually. This is the primary testing tool for deterministic logic:
import horus
outputs = []
def sensor_tick(node):
node.send("temp", {"value": 20.0 + horus.tick() * 0.5})
def logger_tick(node):
msg = node.recv("temp")
if msg:
outputs.append(msg["value"])
sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)
# Step 5 ticks
for _ in range(5):
sched.tick_once()
assert len(outputs) == 5
assert outputs[0] == 20.0
assert outputs[4] == 22.0
Filtered tick_once()
Tick only specific nodes — useful for isolating behavior:
# Tick only the sensor node (logger does not run)
sched.tick_once(node_names=["sensor"])
# Tick only the logger (consumes whatever sensor published)
sched.tick_once(node_names=["logger"])
tick_for()
Run the tick loop for a bounded duration, then return:
# Run for exactly 1 second (100 ticks at 100Hz in deterministic mode)
sched.tick_for(duration=1.0)
assert sched.current_tick() == 100
SimClock for Reproducibility
In deterministic mode, time is simulated. Physics integration is bit-exact across runs:
positions = []
def physics_tick(node):
dt = horus.dt() # Exactly 0.01 at 100Hz
noise = horus.rng_float() * 0.001 # Same noise every run
position = dt * 10.0 + noise
positions.append(position)
node = horus.Node(name="physics", tick=physics_tick, rate=100)
# Run 1
sched1 = horus.Scheduler(tick_rate=100, deterministic=True)
sched1.add(node)
sched1.run(duration=1.0)
run1 = positions.copy()
# Run 2 — identical output
positions.clear()
sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
sched2.add(horus.Node(name="physics", tick=physics_tick, rate=100))
sched2.run(duration=1.0)
run2 = positions.copy()
assert run1 == run2 # Bit-exact
Record and Replay
Record all topic data during a run for later analysis, debugging, or test regression.
Recording a Session
sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.add(controller)
# Run for 10 seconds
sched.run(duration=10.0)
# Stop recording and get file paths
files = sched.stop_recording()
print(f"Session saved to: {files}")
Managing Recordings
# List all saved recordings
for session in sched.list_recordings():
print(f" {session}")
# Delete a recording
sched.delete_recording(session_name)
# Check recording state
if sched.is_recording():
print("Recording in progress")
if sched.is_replaying():
print("Replay in progress")
Recording with Deterministic Mode
Combine recording and deterministic mode for fully reproducible test sessions:
sched = horus.Scheduler(
tick_rate=100,
deterministic=True,
recording=True,
)
sched.add(sensor)
sched.add(controller)
sched.run(duration=5.0)
files = sched.stop_recording()
# This recording can be replayed and will produce identical output
Custom Messages with GenericMessage
For ad-hoc data that does not warrant a typed message, dict topics (GenericMessage) provide maximum flexibility:
def diagnostics_tick(node):
node.send("diagnostics", {
"cpu_temp": get_cpu_temp(),
"battery_pct": get_battery(),
"error_log": get_recent_errors(),
"uptime_sec": horus.elapsed(),
"tick_count": horus.tick(),
})
Size and Type Constraints
- Max size: 4KB per message (larger messages spill to TensorPool automatically)
- Supported types:
dict,list,str,int,float,bool,None,bytes - Unsupported: Custom classes, lambdas, sockets, file handles
- Cross-language: GenericMessage does NOT cross to Rust nodes
For structured data that crosses to Rust, use typed messages or custom messages:
from horus.msggen import define_message
RobotDiagnostics = define_message("RobotDiagnostics", "diagnostics", [
("cpu_temp", "f32"),
("battery_pct", "f32"),
("error_code", "i32"),
("uptime_sec", "f64"),
])
def diagnostics_tick(node):
diag = RobotDiagnostics(
cpu_temp=65.0,
battery_pct=85.0,
error_code=0,
uptime_sec=horus.elapsed(),
)
node.send("diagnostics", diag.to_bytes())
See Custom Messages for runtime vs compiled message details.
Testing Patterns
tick_once with Mock Data
The primary testing pattern: set up nodes, inject data via topics, step through ticks, assert outputs.
import horus
def test_obstacle_avoidance():
results = []
def sensor_tick(node):
# Simulate obstacle at 0.3m
node.send("distance", {"value": 0.3})
def controller_tick(node):
msg = node.recv("distance")
if msg:
if msg["value"] < 0.5:
cmd = horus.CmdVel(linear=0.0, angular=0.5)
else:
cmd = horus.CmdVel(linear=1.0, angular=0.0)
node.send("cmd_vel", cmd)
results.append(cmd)
sensor = horus.Node(name="sensor", pubs=["distance"], tick=sensor_tick, rate=100, order=0)
ctrl = horus.Node(
name="controller",
subs=["distance"],
pubs=[horus.CmdVel],
tick=controller_tick,
rate=100,
order=1,
)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(ctrl)
sched.tick_once()
assert len(results) == 1
assert results[0].linear == 0.0 # Should stop
assert results[0].angular == 0.5 # Should turn
Deterministic RNG for Reproducible Tests
def test_noisy_sensor():
readings = []
def noisy_sensor(node):
base = 25.0
noise = horus.rng_float() * 0.5 # Deterministic noise
readings.append(base + noise)
node = horus.Node(name="sensor", tick=noisy_sensor, rate=100)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(node)
for _ in range(10):
sched.tick_once()
# Same 10 readings every run
expected = readings.copy()
readings.clear()
sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
sched2.add(horus.Node(name="sensor", tick=noisy_sensor, rate=100))
for _ in range(10):
sched2.tick_once()
assert readings == expected
Testing Node Interactions
Test that multiple nodes communicate correctly:
def test_pipeline():
final_outputs = []
def producer_tick(node):
node.send("raw", {"value": 42})
def transformer_tick(node):
msg = node.recv("raw")
if msg:
node.send("processed", {"doubled": msg["value"] * 2})
def consumer_tick(node):
msg = node.recv("processed")
if msg:
final_outputs.append(msg["doubled"])
producer = horus.Node(name="producer", pubs=["raw"], tick=producer_tick, rate=100, order=0)
transformer = horus.Node(
name="transformer", subs=["raw"], pubs=["processed"],
tick=transformer_tick, rate=100, order=1,
)
consumer = horus.Node(
name="consumer", subs=["processed"],
tick=consumer_tick, rate=100, order=2,
)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(producer)
sched.add(transformer)
sched.add(consumer)
sched.tick_once()
assert final_outputs == [84]
Testing Safety Behavior
def test_deadline_miss_policy():
import time
miss_count = [0]
def slow_tick(node):
time.sleep(0.01) # 10ms — exceeds 1ms budget
node.send("output", {"tick": horus.tick()})
def on_error(node, exc):
miss_count[0] += 1
node = horus.Node(
name="slow",
pubs=["output"],
tick=slow_tick,
on_error=on_error,
rate=100,
budget=1 * horus.ms,
on_miss="warn",
)
sched = horus.Scheduler(tick_rate=100)
sched.add(node)
sched.run(duration=0.5)
stats = sched.safety_stats()
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
Introspection in Tests
Query node and scheduler state for assertions:
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(controller)
for _ in range(100):
sched.tick_once()
# Assert node health
assert sched.has_node("sensor")
assert sched.has_node("controller")
assert sched.get_node_count() == 2
# Assert performance
sensor_stats = sched.get_node_stats("sensor")
assert sensor_stats["total_ticks"] == 100
assert sensor_stats["error_count"] == 0
assert sensor_stats["avg_tick_duration_ms"] < 1.0
Design Decisions
Why compute=True as a kwarg instead of a separate ComputeNode class? One Node class with kwargs is simpler than a class hierarchy. compute=True is a scheduling hint, not a different kind of node. The same tick function, the same send/recv API, the same error handling. Changing a node from default to compute is a one-character edit, not a class refactor.
Why auto-detect async def instead of requiring async=True? Python's async def already declares the function's execution model at the language level. Requiring a separate async=True kwarg would be redundant and error-prone (what happens if async=True but tick is a regular def?). Auto-detection means the declaration is the configuration.
Why is on="topic" mutually exclusive with compute=True? These are different execution models. Event-driven nodes wake on data arrival (interrupt-like). Compute nodes run on a fixed schedule on a thread pool (periodic). Combining them creates ambiguous semantics: should the node run when data arrives AND on a fixed schedule? The explicit choice avoids confusion.
Why deterministic mode uses SimClock instead of slowed wall clock? SimClock advances by exactly 1/rate per tick regardless of actual execution time. This means a 1-second simulation produces the same physics whether the machine takes 0.5s or 5s to compute it. Wall clock, even slowed, would produce different results depending on system load.
Why tick_once() instead of mocking the scheduler? tick_once() runs the real scheduler for exactly one tick — same init sequence, same ordering, same safety checks. Mocking the scheduler would test your mock, not your node. tick_once() gives you deterministic, real execution with zero test infrastructure.
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
compute=True for CPU work | Main thread stays responsive | Thread pool scheduling adds ~0.1ms jitter |
async def for I/O | Non-blocking, natural Python async | ~1ms event loop overhead, pending awaits block shutdown |
on="topic" for events | Zero CPU when idle | Cannot guarantee tick rate, mutually exclusive with compute |
rate= on compute nodes | Prevents thread pool saturation | Node might miss data between ticks |
Runtime mutation (set_node_rate) | Adaptive systems, degraded mode | Potential for race conditions if multiple threads mutate |
| Deterministic mode | Reproducible tests, debugging | Single-threaded, slower than real-time |
| Recording | Post-mortem debugging, regression tests | Disk I/O overhead, storage growth |
| Dict topics for prototyping | Any Python data, zero setup | ~5-50 us vs ~1.5 us typed, Python-only |
tick_once() for testing | Real scheduler, real ordering | No parallel execution in deterministic mode |
See Also
- Python API -- Node constructor, Scheduler, Clock API
- Async Nodes -- Deep dive on async patterns
- Custom Messages -- Runtime and compiled message definitions
- Performance Guide -- Optimization techniques and latency numbers
- Deterministic Mode -- Full deterministic execution guide
- Python Examples -- Working code examples