Advanced Patterns (Python)

Beyond the basics of Node, Scheduler, and run(), HORUS provides execution classes, runtime mutation, deterministic simulation, and recording — all accessible from Python kwargs. This page covers every advanced pattern with working code.


Compute Nodes

By default, nodes run on the main tick thread sequentially. compute=True moves a node to a parallel thread pool, freeing the main thread for time-critical nodes.

When to Use

Use compute=True for CPU-heavy work that would block other nodes: ML inference, SLAM computation, path planning, image processing. The scheduler runs compute nodes concurrently with the main tick thread.

import horus
import numpy as np

model = load_model("yolov8n.onnx")

def detect_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    frame = img.to_numpy()
    detections = model.predict(frame)

    for det in detections:
        node.send("detections", {
            "class": det.class_name,
            "confidence": float(det.confidence),
            "bbox": [det.x1, det.y1, det.x2, det.y2],
        })

detector = horus.Node(
    name="yolo_detector",
    subs=[horus.Image],
    pubs=["detections"],
    tick=detect_tick,
    rate=30,
    compute=True,           # Runs on thread pool, not main tick thread
    budget=30 * horus.ms,   # 30ms budget per frame
    on_miss="skip",         # Skip frame if inference overruns
)

horus.run(detector, tick_rate=100)

How It Works

Without compute=True, a 25ms ML inference at 30Hz would block the main tick thread for 25ms every 33ms — starving any 100Hz control node on the same scheduler. With compute=True, inference runs on a separate thread pool and does not delay the main tick.

ConfigurationMain ThreadThread Pool
Default (no flags)Runs this nodeNot used
compute=TrueFree for other nodesRuns this node
async def tickFree for other nodesRuns on async I/O pool

Constraint: compute=True is mutually exclusive with async def tick and on="topic". A node can only have one execution class.


Async I/O Nodes

For network requests, database queries, file I/O, and WebSocket connections, use async def tick. The scheduler auto-detects async functions and runs them on a dedicated I/O thread pool.

import horus
import aiohttp

async def cloud_tick(node):
    if node.has_msg("telemetry"):
        data = node.recv("telemetry")
        try:
            async with aiohttp.ClientSession() as session:
                await session.post(
                    "https://api.myrobot.io/telemetry",
                    json=data,
                    timeout=aiohttp.ClientTimeout(total=2.0),
                )
        except aiohttp.ClientError as e:
            node.log_warning(f"Upload failed: {e}")

uploader = horus.Node(
    name="cloud_uploader",
    subs=["telemetry"],
    tick=cloud_tick,        # async def auto-detected
    rate=1,                 # 1Hz — upload every second
)

horus.run(uploader)

No configuration needed beyond writing async def. The scheduler inspects the function and routes it to the I/O executor automatically.

Async Init and Shutdown

init and shutdown callbacks can also be async — useful for establishing database connections or closing network sessions:

import asyncpg

async def setup(node):
    node.db = await asyncpg.connect("postgresql://localhost/robotics")

async def store(node):
    if node.has_msg("sensor.data"):
        data = node.recv("sensor.data")
        await node.db.execute(
            "INSERT INTO readings (value, ts) VALUES ($1, $2)",
            data["value"], data["timestamp"],
        )

async def cleanup(node):
    await node.db.close()

db_node = horus.Node(
    name="db_logger",
    subs=["sensor.data"],
    tick=store,
    init=setup,
    shutdown=cleanup,
    rate=10,
)

Timeout Discipline

Async ticks that hang will block scheduler shutdown. Always wrap network calls:

import asyncio

async def safe_tick(node):
    try:
        result = await asyncio.wait_for(fetch_data(), timeout=2.0)
        node.send("data", result)
    except asyncio.TimeoutError:
        node.log_warning("Fetch timed out, skipping tick")

Event-Driven Nodes

Event-driven nodes tick only when a specific topic receives data, rather than at a fixed rate. Use on="topic.name" for sparse events like emergency stops, button presses, or configuration changes.

import horus

def emergency_handler(node):
    event = node.recv("emergency.stop")
    if event:
        node.log_warning("Emergency stop triggered!")
        node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
        node.send("status", {"state": "emergency_stopped"})

estop = horus.Node(
    name="estop_handler",
    subs=["emergency.stop"],
    pubs=[horus.CmdVel, "status"],
    tick=emergency_handler,
    on="emergency.stop",    # Only tick when this topic has data
)

horus.run(estop)

Behavior

  • The node does not tick on the scheduler's fixed rate
  • When a message arrives on the trigger topic, the node ticks once
  • Multiple messages arriving between scheduler cycles result in one tick (not one per message)
  • node.recv() inside the tick returns the latest message

Constraint: on="topic" is mutually exclusive with compute=True and async def tick.

Rate-Limited Events

Combine on="topic" with rate= to cap how often an event-driven node can fire:

# Ticks on new config, but no more than once per second
config_node = horus.Node(
    name="config_watcher",
    subs=["system.config"],
    tick=apply_config,
    on="system.config",
    rate=1,  # Max 1Hz even if config changes faster
)

Rate-Limited Compute

Combine compute=True with rate= to run CPU-heavy work at a lower frequency than the scheduler tick rate:

# Scheduler ticks at 100Hz, but SLAM only runs at 10Hz on the thread pool
slam_node = horus.Node(
    name="slam",
    subs=[horus.LaserScan, horus.Odometry],
    pubs=["map", "localization.pose"],
    tick=slam_tick,
    compute=True,
    rate=10,            # 10Hz SLAM updates
    budget=80 * horus.ms,
    on_miss="warn",
)

# Controller runs at full 100Hz on the main tick thread
controller = horus.Node(
    name="controller",
    subs=["localization.pose"],
    pubs=[horus.CmdVel],
    tick=controller_tick,
    rate=100,
    order=1,
)

horus.run(slam_node, controller, tick_rate=100)

The scheduler skips the compute node on ticks where it is not scheduled, so the main thread never waits for it.


Runtime Mutation

The scheduler supports modifying nodes while running — adjusting rates, budgets, and even adding or removing nodes.

Changing Node Rate

sched = horus.Scheduler(tick_rate=100)
sched.add(sensor)
sched.add(controller)

# Later (e.g., from a monitoring thread or on_error callback):
sched.set_node_rate("sensor", 200)      # Speed up sensor to 200Hz
sched.set_node_rate("controller", 50)   # Slow down controller to 50Hz

Changing Tick Budget

# Tighten budget after profiling shows headroom
sched.set_tick_budget("motor_controller", 150)  # 150 μs

# Loosen budget during degraded mode
sched.set_tick_budget("motor_controller", 500)  # 500 μs

Removing Nodes

# Remove a malfunctioning node at runtime
if error_count > threshold:
    removed = sched.remove_node("flaky_sensor")
    if removed:
        print("Removed flaky_sensor from scheduler")

Adaptive Rate Pattern

Adjust node rate based on system load:

def monitor_tick(node):
    stats = node.info.get_metrics()
    avg_ms = stats.get("avg_tick_duration_ms", 0)

    if avg_ms > 8.0:  # Getting close to 10ms budget
        node.log_warning(f"High load: {avg_ms:.1f}ms avg tick")
        # Could signal another node to reduce rate via topic

monitor = horus.Node(
    name="load_monitor",
    tick=monitor_tick,
    rate=1,
    order=999,  # Run last
)

Deterministic Mode

Deterministic mode makes every run produce identical output — essential for testing, simulation, and debugging. It enables SimClock (fixed dt), seeded RNG, and sequential execution.

Basic Usage

sched = horus.Scheduler(tick_rate=100, deterministic=True)

In deterministic mode:

  • horus.dt() returns exactly 1/rate (fixed timestep, not wall clock)
  • horus.now() advances by exactly dt each tick (SimClock)
  • horus.rng_float() returns tick-seeded values (same sequence every run)
  • Nodes execute sequentially in order — no thread pool randomness

tick_once() for Testing

Step through ticks manually. This is the primary testing tool for deterministic logic:

import horus

outputs = []

def sensor_tick(node):
    node.send("temp", {"value": 20.0 + horus.tick() * 0.5})

def logger_tick(node):
    msg = node.recv("temp")
    if msg:
        outputs.append(msg["value"])

sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)

sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)

# Step 5 ticks
for _ in range(5):
    sched.tick_once()

assert len(outputs) == 5
assert outputs[0] == 20.0
assert outputs[4] == 22.0

Filtered tick_once()

Tick only specific nodes — useful for isolating behavior:

# Tick only the sensor node (logger does not run)
sched.tick_once(node_names=["sensor"])

# Tick only the logger (consumes whatever sensor published)
sched.tick_once(node_names=["logger"])

tick_for()

Run the tick loop for a bounded duration, then return:

# Run for exactly 1 second (100 ticks at 100Hz in deterministic mode)
sched.tick_for(duration=1.0)

assert sched.current_tick() == 100

SimClock for Reproducibility

In deterministic mode, time is simulated. Physics integration is bit-exact across runs:

positions = []

def physics_tick(node):
    dt = horus.dt()          # Exactly 0.01 at 100Hz
    noise = horus.rng_float() * 0.001  # Same noise every run
    position = dt * 10.0 + noise
    positions.append(position)

node = horus.Node(name="physics", tick=physics_tick, rate=100)

# Run 1
sched1 = horus.Scheduler(tick_rate=100, deterministic=True)
sched1.add(node)
sched1.run(duration=1.0)
run1 = positions.copy()

# Run 2 — identical output
positions.clear()
sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
sched2.add(horus.Node(name="physics", tick=physics_tick, rate=100))
sched2.run(duration=1.0)
run2 = positions.copy()

assert run1 == run2  # Bit-exact

Record and Replay

Record all topic data during a run for later analysis, debugging, or test regression.

Recording a Session

sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.add(controller)

# Run for 10 seconds
sched.run(duration=10.0)

# Stop recording and get file paths
files = sched.stop_recording()
print(f"Session saved to: {files}")

Managing Recordings

# List all saved recordings
for session in sched.list_recordings():
    print(f"  {session}")

# Delete a recording
sched.delete_recording(session_name)

# Check recording state
if sched.is_recording():
    print("Recording in progress")
if sched.is_replaying():
    print("Replay in progress")

Recording with Deterministic Mode

Combine recording and deterministic mode for fully reproducible test sessions:

sched = horus.Scheduler(
    tick_rate=100,
    deterministic=True,
    recording=True,
)
sched.add(sensor)
sched.add(controller)
sched.run(duration=5.0)

files = sched.stop_recording()
# This recording can be replayed and will produce identical output

Custom Messages with GenericMessage

For ad-hoc data that does not warrant a typed message, dict topics (GenericMessage) provide maximum flexibility:

def diagnostics_tick(node):
    node.send("diagnostics", {
        "cpu_temp": get_cpu_temp(),
        "battery_pct": get_battery(),
        "error_log": get_recent_errors(),
        "uptime_sec": horus.elapsed(),
        "tick_count": horus.tick(),
    })

Size and Type Constraints

  • Max size: 4KB per message (larger messages spill to TensorPool automatically)
  • Supported types: dict, list, str, int, float, bool, None, bytes
  • Unsupported: Custom classes, lambdas, sockets, file handles
  • Cross-language: GenericMessage does NOT cross to Rust nodes

For structured data that crosses to Rust, use typed messages or custom messages:

from horus.msggen import define_message

RobotDiagnostics = define_message("RobotDiagnostics", "diagnostics", [
    ("cpu_temp", "f32"),
    ("battery_pct", "f32"),
    ("error_code", "i32"),
    ("uptime_sec", "f64"),
])

def diagnostics_tick(node):
    diag = RobotDiagnostics(
        cpu_temp=65.0,
        battery_pct=85.0,
        error_code=0,
        uptime_sec=horus.elapsed(),
    )
    node.send("diagnostics", diag.to_bytes())

See Custom Messages for runtime vs compiled message details.


Testing Patterns

tick_once with Mock Data

The primary testing pattern: set up nodes, inject data via topics, step through ticks, assert outputs.

import horus

def test_obstacle_avoidance():
    results = []

    def sensor_tick(node):
        # Simulate obstacle at 0.3m
        node.send("distance", {"value": 0.3})

    def controller_tick(node):
        msg = node.recv("distance")
        if msg:
            if msg["value"] < 0.5:
                cmd = horus.CmdVel(linear=0.0, angular=0.5)
            else:
                cmd = horus.CmdVel(linear=1.0, angular=0.0)
            node.send("cmd_vel", cmd)
            results.append(cmd)

    sensor = horus.Node(name="sensor", pubs=["distance"], tick=sensor_tick, rate=100, order=0)
    ctrl = horus.Node(
        name="controller",
        subs=["distance"],
        pubs=[horus.CmdVel],
        tick=controller_tick,
        rate=100,
        order=1,
    )

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(sensor)
    sched.add(ctrl)

    sched.tick_once()

    assert len(results) == 1
    assert results[0].linear == 0.0  # Should stop
    assert results[0].angular == 0.5  # Should turn

Deterministic RNG for Reproducible Tests

def test_noisy_sensor():
    readings = []

    def noisy_sensor(node):
        base = 25.0
        noise = horus.rng_float() * 0.5  # Deterministic noise
        readings.append(base + noise)

    node = horus.Node(name="sensor", tick=noisy_sensor, rate=100)
    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(node)

    for _ in range(10):
        sched.tick_once()

    # Same 10 readings every run
    expected = readings.copy()

    readings.clear()
    sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
    sched2.add(horus.Node(name="sensor", tick=noisy_sensor, rate=100))
    for _ in range(10):
        sched2.tick_once()

    assert readings == expected

Testing Node Interactions

Test that multiple nodes communicate correctly:

def test_pipeline():
    final_outputs = []

    def producer_tick(node):
        node.send("raw", {"value": 42})

    def transformer_tick(node):
        msg = node.recv("raw")
        if msg:
            node.send("processed", {"doubled": msg["value"] * 2})

    def consumer_tick(node):
        msg = node.recv("processed")
        if msg:
            final_outputs.append(msg["doubled"])

    producer = horus.Node(name="producer", pubs=["raw"], tick=producer_tick, rate=100, order=0)
    transformer = horus.Node(
        name="transformer", subs=["raw"], pubs=["processed"],
        tick=transformer_tick, rate=100, order=1,
    )
    consumer = horus.Node(
        name="consumer", subs=["processed"],
        tick=consumer_tick, rate=100, order=2,
    )

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(producer)
    sched.add(transformer)
    sched.add(consumer)

    sched.tick_once()

    assert final_outputs == [84]

Testing Safety Behavior

def test_deadline_miss_policy():
    import time

    miss_count = [0]

    def slow_tick(node):
        time.sleep(0.01)  # 10ms — exceeds 1ms budget
        node.send("output", {"tick": horus.tick()})

    def on_error(node, exc):
        miss_count[0] += 1

    node = horus.Node(
        name="slow",
        pubs=["output"],
        tick=slow_tick,
        on_error=on_error,
        rate=100,
        budget=1 * horus.ms,
        on_miss="warn",
    )

    sched = horus.Scheduler(tick_rate=100)
    sched.add(node)
    sched.run(duration=0.5)

    stats = sched.safety_stats()
    print(f"Deadline misses: {stats.get('deadline_misses', 0)}")

Introspection in Tests

Query node and scheduler state for assertions:

sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(controller)

for _ in range(100):
    sched.tick_once()

# Assert node health
assert sched.has_node("sensor")
assert sched.has_node("controller")
assert sched.get_node_count() == 2

# Assert performance
sensor_stats = sched.get_node_stats("sensor")
assert sensor_stats["total_ticks"] == 100
assert sensor_stats["error_count"] == 0
assert sensor_stats["avg_tick_duration_ms"] < 1.0

Design Decisions

Why compute=True as a kwarg instead of a separate ComputeNode class? One Node class with kwargs is simpler than a class hierarchy. compute=True is a scheduling hint, not a different kind of node. The same tick function, the same send/recv API, the same error handling. Changing a node from default to compute is a one-character edit, not a class refactor.

Why auto-detect async def instead of requiring async=True? Python's async def already declares the function's execution model at the language level. Requiring a separate async=True kwarg would be redundant and error-prone (what happens if async=True but tick is a regular def?). Auto-detection means the declaration is the configuration.

Why is on="topic" mutually exclusive with compute=True? These are different execution models. Event-driven nodes wake on data arrival (interrupt-like). Compute nodes run on a fixed schedule on a thread pool (periodic). Combining them creates ambiguous semantics: should the node run when data arrives AND on a fixed schedule? The explicit choice avoids confusion.

Why deterministic mode uses SimClock instead of slowed wall clock? SimClock advances by exactly 1/rate per tick regardless of actual execution time. This means a 1-second simulation produces the same physics whether the machine takes 0.5s or 5s to compute it. Wall clock, even slowed, would produce different results depending on system load.

Why tick_once() instead of mocking the scheduler? tick_once() runs the real scheduler for exactly one tick — same init sequence, same ordering, same safety checks. Mocking the scheduler would test your mock, not your node. tick_once() gives you deterministic, real execution with zero test infrastructure.


Trade-offs

ChoiceBenefitCost
compute=True for CPU workMain thread stays responsiveThread pool scheduling adds ~0.1ms jitter
async def for I/ONon-blocking, natural Python async~1ms event loop overhead, pending awaits block shutdown
on="topic" for eventsZero CPU when idleCannot guarantee tick rate, mutually exclusive with compute
rate= on compute nodesPrevents thread pool saturationNode might miss data between ticks
Runtime mutation (set_node_rate)Adaptive systems, degraded modePotential for race conditions if multiple threads mutate
Deterministic modeReproducible tests, debuggingSingle-threaded, slower than real-time
RecordingPost-mortem debugging, regression testsDisk I/O overhead, storage growth
Dict topics for prototypingAny Python data, zero setup~5-50 us vs ~1.5 us typed, Python-only
tick_once() for testingReal scheduler, real orderingNo parallel execution in deterministic mode

See Also