Scheduler Deep-Dive (Python)

A warehouse AGV runs eight nodes: two LiDARs, a camera pipeline, a safety monitor, a path planner, motor controllers, a battery watcher, and a cloud uploader. The motor controller must tick every millisecond. The safety monitor must run before the motors every cycle. The path planner needs 40 ms of CPU time and must never block the motor controller. When the emergency stop fires, motors must halt before sensors disconnect.

Coordinating all of this by hand --- threads, locks, timers, signal handlers --- is the single biggest source of bugs in robotics software.

The HORUS scheduler handles it. You configure nodes, call run(), and the scheduler manages execution order, tick rates, deadline enforcement, safety monitoring, and graceful shutdown.

The Execution Model

How a Tick Works

Every tick cycle, the scheduler does the following:

  1. For each registered node (in .order() sequence):
    • Check the per-node rate limiter --- is this node due to tick?
    • If yes, start a timer and call the node's tick() function
    • Stop the timer. If the node exceeded its budget, apply its on_miss policy
    • Record metrics (tick duration, overruns, health state)
  2. After all nodes are processed, sleep to maintain the global tick rate
  Tick N                                  Tick N+1
  |                                       |
  v                                       v
  [ safety ] [ sensor ] [ motor ] [sleep] [ safety ] [ sensor ] ...
  order=0     order=10   order=20          order=0     order=10

Key details:

  • BestEffort nodes (the default) execute sequentially in order on the main thread. RT, Compute, Event, and AsyncIo nodes run on their own threads and synchronize at tick boundaries.
  • Rate limiting is per-node: a 10 Hz node inside a 1 kHz scheduler only has tick() called every 100th cycle.
  • Budget enforcement happens after tick() returns --- the scheduler does not preempt mid-tick. It records the overrun and applies the miss policy.

Initialization

When you call scheduler.run() (or horus.run()), initialization happens lazily:

  1. All pending node configurations are finalized (execution class inference, budget/deadline auto-derivation)
  2. init() is called on every node, in .order() sequence
  3. If a node's init() raises an exception, that node enters Error state and is excluded from ticking. Other nodes continue.
  4. The main tick loop begins

Lazy initialization means you can add and configure nodes in any order. The scheduler resolves everything at startup.

Shutdown

When the scheduler receives a stop signal (Ctrl+C, scheduler.stop(), or a node calling node.request_stop()):

  1. The main loop exits
  2. shutdown() is called on every node in reverse order --- last-added first
  3. RT threads are given 3 seconds to exit; stalled threads are detached
  4. Shared memory is cleaned up

Reverse-order shutdown ensures dependent nodes stop before their dependencies. The motor controller (order 20) shuts down before the sensor (order 10) that feeds it.

horus.run() --- The One-Liner

For most programs, horus.run() is all you need:

import horus
from horus import us

def read_sensor(node):
    node.send("scan", {"ranges": [1.0, 2.0, 3.0]})

def navigate(node):
    if node.has_msg("scan"):
        scan = node.recv("scan")
        node.send("cmd", {"linear": 0.5, "angular": scan["ranges"][0]})

def drive(node):
    if node.has_msg("cmd"):
        cmd = node.recv("cmd")
        # Send to motor hardware
        node.log_info(f"Driving: linear={cmd['linear']}")

sensor = horus.Node(name="sensor", tick=read_sensor, rate=10, order=0, pubs=["scan"])
ctrl = horus.Node(name="controller", tick=navigate, rate=30, order=10, subs=["scan"], pubs=["cmd"])
motor = horus.Node(name="motor", tick=drive, rate=1000, order=20, budget=300*us, subs=["cmd"])

horus.run(sensor, ctrl, motor, rt=True, watchdog_ms=500)

horus.run() creates a Scheduler behind the scenes, adds all nodes, and calls run(). Every scheduler-level parameter is available as a keyword argument:

horus.run(
    *nodes,
    duration=None,          # seconds (None = forever)
    tick_rate=1000.0,       # Hz
    rt=False,               # SCHED_FIFO + mlockall
    deterministic=False,    # SimClock
    blackbox_mb=0,          # Flight recorder size
    watchdog_ms=0,          # Frozen node detection
    recording=False,        # Session recording
    name=None,              # Scheduler name
    cores=None,             # CPU affinity [0, 1, ...]
    max_deadline_misses=None,  # Escalation threshold
    verbose=False,          # Debug logging
    telemetry=None,         # Endpoint URL
)

When to use horus.run() vs Scheduler: Use horus.run() when you create all nodes upfront and run until Ctrl+C (or a fixed duration). Use Scheduler when you need runtime mutation (adding/removing nodes, changing rates), tick_once() for testing, or the context manager pattern.

The Scheduler Class

Creating a Scheduler

All configuration happens through keyword arguments:

from horus import Scheduler, Node, us, ms

sched = Scheduler(
    tick_rate=1000.0,       # 1 kHz global tick rate
    rt=True,                # Enable real-time scheduling
    watchdog_ms=500,        # 500 ms frozen-node detection
    blackbox_mb=16,         # 16 MB flight recorder
    max_deadline_misses=50, # Emergency stop after 50 misses
)

Scheduler Parameters Reference

ParameterTypeDefaultDescription
tick_ratefloat1000.0Global tick rate in Hz. Match to your fastest node
rtboolFalseEnable SCHED_FIFO scheduling and mlockall memory locking
deterministicboolFalseEnable SimClock (fixed dt, seeded RNG, reproducible results)
blackbox_mbint0Flight recorder buffer size in MB. 0 disables
watchdog_msint0Watchdog timeout in milliseconds. 0 disables
recordingboolFalseEnable session recording for replay
namestr or NoneNoneScheduler name for logging and diagnostics
coreslist[int] or NoneNonePin scheduler to specific CPU cores
max_deadline_missesint or NoneNoneEmergency stop threshold. Default (in Rust): 100
verboseboolFalseEnable verbose debug logging
telemetrystr or NoneNoneTelemetry export endpoint URL

Adding Nodes

sched = Scheduler(tick_rate=1000, rt=True)

sched.add(Node(name="safety", tick=safety_fn, rate=1000, order=0))
sched.add(Node(name="motor", tick=motor_fn, rate=1000, order=5, budget=300*us))
sched.add(Node(name="planner", tick=plan_fn, rate=50, order=50, compute=True))

sched.run()

add() returns self, so you can chain:

sched.add(sensor_node).add(ctrl_node).add(motor_node)

Running

# Run forever (until Ctrl+C or .stop())
sched.run()

# Run for a fixed duration
sched.run(duration=30.0)  # 30 seconds

Context Manager

The context manager calls stop() automatically on exit, ensuring clean shutdown even if an exception occurs:

with Scheduler(tick_rate=100, watchdog_ms=500) as sched:
    sched.add(Node(name="sensor", tick=read_sensor, rate=100, order=0))
    sched.add(Node(name="logger", tick=log_data, rate=10, order=100))
    sched.run(duration=60.0)
# stop() called automatically here

This is the recommended pattern for production code.

Node Scheduling Parameters

Every scheduling parameter is set on the Node constructor. The scheduler reads them when you call add().

Parameter Reference

ParameterTypeDefaultDescription
ratefloat30Node tick rate in Hz
orderint100Execution order (lower = earlier). 0-9 critical, 10-49 high, 50-99 normal, 100-199 low, 200+ background
budgetfloat or NoneNoneMax expected tick duration in seconds. None = auto (80% of period)
deadlinefloat or NoneNoneHard deadline in seconds. None = auto (95% of period)
on_missstr or NoneNoneDeadline miss policy: "warn", "skip", "safe_mode", "stop"
failure_policystr or NoneNoneError policy: "fatal", "restart", "skip", "ignore"
computeboolFalseRun on thread pool (CPU-heavy work)
onstr or NoneNoneEvent-driven --- tick only when this topic receives a message
priorityint or NoneNoneOS thread priority (SCHED_FIFO 1-99, requires rt=True)
coreint or NoneNonePin to specific CPU core index
watchdogfloat or NoneNonePer-node watchdog timeout in seconds (overrides global)

Budget and deadline are in seconds. Python floats represent seconds, not microseconds. Use the us and ms constants for readability: budget=300 * us means 300 microseconds. Writing budget=300 means 300 seconds --- almost certainly not what you want.

Unit Constants

from horus import us, ms

us = 1e-6   # microseconds -> seconds
ms = 1e-3   # milliseconds -> seconds

# Examples
budget=300 * us      # 300 microseconds
deadline=900 * us    # 900 microseconds
deadline=5 * ms      # 5 milliseconds
watchdog=500 * ms    # 500 milliseconds (or just use watchdog_ms=500 on the Scheduler)

Execution Classes

The scheduler assigns each node an execution class based on its configuration. You do not set this directly --- it is inferred:

ConfigurationAssigned ClassThread ModelBest For
rate, budget, or deadline setRtDedicated thread, budget enforcedMotor control, safety, sensor fusion
compute=TrueComputeThread poolPath planning, SLAM, image processing
on="topic.name"EventSleeps until message arrivesEmergency stop, command handlers
async def tickAsyncIoTokio runtimeHTTP, cloud upload, database
None of the aboveBestEffortSequential on main threadLogging, telemetry, display

RT is auto-detected. Setting rate, budget, or deadline on a node automatically assigns the Rt execution class. There is no separate rt=True on individual nodes --- the scheduler infers it from timing constraints.

Budget and Deadline Auto-Derivation

When you set rate on a node, the scheduler auto-derives timing constraints:

  • Budget = 80% of the period (e.g., 1000 Hz means a 1 ms period, so budget = 800 us)
  • Deadline = 95% of the period (e.g., 1000 Hz means deadline = 950 us)

You can override either with explicit values:

# Auto-derived: rate=1000 -> budget=800us, deadline=950us
auto_node = Node(name="auto", tick=fn, rate=1000, order=0)

# Explicit override
explicit_node = Node(name="explicit", tick=fn, rate=1000, order=0,
                     budget=300 * us, deadline=900 * us)

The on_miss Policy

When a node exceeds its deadline, the miss policy fires:

PolicyBehaviorUse When
"warn"Log warning, continue normallyNon-critical nodes (default)
"skip"Skip this node's next tick to recoverHigh-frequency nodes that can afford one skipped cycle
"safe_mode"Call enter_safe_state() on the nodeMotor controllers, actuators
"stop"Stop the entire schedulerSafety monitors, last resort
motor = Node(
    name="motor",
    tick=motor_ctrl,
    rate=1000,
    order=5,
    budget=300 * us,
    deadline=900 * us,
    on_miss="safe_mode",  # Safe state on deadline miss
)

safety = Node(
    name="safety_monitor",
    tick=check_safety,
    rate=1000,
    order=0,
    budget=100 * us,
    deadline=200 * us,
    on_miss="stop",  # Stop everything if safety monitor misses
)

The failure_policy

When a node's tick() raises an exception, the failure policy determines what happens:

PolicyBehavior
"fatal"Stop the entire scheduler
"restart"Re-initialize the node and resume
"skip"Skip this tick, try again next cycle
"ignore"Log the error and continue
cloud = Node(
    name="cloud_upload",
    tick=upload_data,
    rate=1,
    order=200,
    failure_policy="skip",  # Network glitches shouldn't stop the robot
)

Async Nodes

If your tick function is async def, the node is automatically assigned to the AsyncIo execution class. No additional configuration needed.

import aiohttp

async def upload_telemetry(node):
    if node.has_msg("telemetry"):
        data = node.recv("telemetry")
        async with aiohttp.ClientSession() as session:
            await session.post("https://fleet.example.com/telemetry", json=data)

uploader = Node(
    name="cloud",
    tick=upload_telemetry,  # async -> automatically AsyncIo class
    rate=1,
    order=200,
    pubs=[],
    subs=["telemetry"],
    failure_policy="skip",
)

Event-Driven Nodes

Set on="topic.name" to create a node that sleeps until that topic receives a message:

def handle_estop(node):
    msg = node.recv("emergency.stop")
    node.log_warning(f"E-STOP triggered: {msg}")
    node.request_stop()

estop = Node(
    name="estop",
    tick=handle_estop,
    on="emergency.stop",  # Only wakes when message arrives
    order=0,
    subs=["emergency.stop"],
)

Compute Nodes

Set compute=True for CPU-heavy work that should run on a thread pool instead of the main loop:

def plan_path(node):
    if node.has_msg("map"):
        grid = node.recv("map")
        # Heavy computation — runs on thread pool, won't block motor controller
        path = a_star(grid, start=(0, 0), goal=(10, 10))
        node.send("path", path)

planner = Node(
    name="planner",
    tick=plan_path,
    rate=10,
    order=50,
    compute=True,  # Thread pool, not main loop
    subs=["map"],
    pubs=["path"],
)

CPU Pinning and Priority

For maximum timing determinism, pin nodes to specific CPU cores and set OS thread priority:

motor = Node(
    name="motor",
    tick=motor_ctrl,
    rate=1000,
    order=0,
    budget=300 * us,
    core=2,         # Pin to CPU core 2
    priority=80,    # SCHED_FIFO priority (requires rt=True on scheduler)
)

CPU pinning and priority require rt=True on the scheduler. Without it, these hints are applied on a best-effort basis. On Linux, SCHED_FIFO priorities require either root or the CAP_SYS_NICE capability.

The Framework Clock

Inside tick(), init(), and shutdown() callbacks, the framework clock provides consistent time across all nodes:

import horus

def control_loop(node):
    t = horus.now()              # Current time (seconds)
    delta = horus.dt()           # Time since last tick (seconds)
    total = horus.elapsed()      # Time since scheduler start (seconds)
    n = horus.tick()             # Current tick number (int)
    remaining = horus.budget_remaining()  # Time left in budget (seconds)

    # Use dt for frame-rate-independent physics
    velocity += acceleration * delta
    position += velocity * delta
FunctionReturnsDescription
horus.now()floatCurrent time in seconds. Wall clock in normal mode, SimClock in deterministic mode
horus.dt()floatTime elapsed since last tick in seconds. Fixed 1/rate in deterministic mode
horus.elapsed()floatSeconds since scheduler start
horus.tick()intMonotonically increasing tick counter
horus.budget_remaining()floatSeconds remaining in this tick's budget. float('inf') if no budget set
horus.rng_float()floatRandom float in [0.0, 1.0). System entropy normally, tick-seeded in deterministic mode

Normal vs Deterministic Clock

In normal mode, horus.now() returns wall-clock time and horus.dt() returns the actual elapsed duration. In deterministic mode (deterministic=True), the scheduler uses a SimClock: horus.dt() returns a fixed 1/rate value every tick, and horus.rng_float() produces the same sequence across runs. This makes tests and simulations reproducible.

tick_once() --- Testing and Simulation

tick_once() executes exactly one tick cycle, then returns. This is the primary tool for testing scheduler behavior without threads or timing.

from horus import Scheduler, Node

def accumulate(node):
    count = node.recv("count") or 0
    node.send("count", count + 1)

sched = Scheduler(tick_rate=100)
sched.add(Node(name="counter", tick=accumulate, rate=100, order=0,
               pubs=["count"], subs=["count"]))

sched.tick_once()  # Init (lazy) + one tick
# count topic now has value 1

sched.tick_once()  # Second tick
# count topic now has value 2

Selective Ticking

Pass a list of node names to tick only specific nodes:

sched.tick_once(["sensor"])         # Only tick the sensor
sched.tick_once(["sensor", "ctrl"]) # Tick sensor and controller, skip motor

This is useful for unit-testing a single node while keeping others frozen.

tick_for() --- Timed Runs

tick_for() runs the tick loop for a specific duration, then returns:

sched.tick_for(1.0)                  # Run for 1 second, then return
sched.tick_for(0.5, ["sensor"])      # Run only sensor for 0.5 seconds

This is useful for integration tests that need to observe behavior over time, and for simulation stepping where you advance by a fixed wall-clock interval.

Runtime Mutation

You can modify the scheduler while it is running.

set_node_rate()

Change a node's tick rate dynamically:

# Sensor normally runs at 100 Hz
sched.add(Node(name="sensor", tick=read_lidar, rate=100, order=0))
sched.run()  # running in a thread or after tick_once

# Slow down to conserve power
sched.set_node_rate("sensor", 20)   # 100 Hz -> 20 Hz

# Speed up for precision docking
sched.set_node_rate("sensor", 500)  # 20 Hz -> 500 Hz

set_tick_budget()

Change a node's tick budget dynamically. The argument is in microseconds (unlike the Node constructor which takes seconds):

sched.set_tick_budget("motor", 500)  # Allow 500 us per tick

remove_node()

Remove a node from the running scheduler:

removed = sched.remove_node("cloud_logger")  # Returns True if found

The removed node's shutdown() is called before it is detached from the tick loop.

add_critical_node()

Mark a node as safety-critical with a dedicated watchdog timeout. If a critical node exceeds this timeout, the scheduler calls enter_safe_state() on all nodes:

sched.add_critical_node("motor_controller", timeout_ms=500)

This is stricter than the global watchdog: a regular node that freezes gets isolated; a critical node that freezes triggers a system-wide safe state.

Safety and Monitoring

Watchdog

Enable the watchdog with watchdog_ms on the scheduler:

sched = Scheduler(tick_rate=1000, watchdog_ms=500)

The watchdog uses graduated response:

TimeoutHealth StateAction
1x watchdogWarningLog warning
2x watchdogUnhealthySkip tick, log error
3x watchdog (critical node)IsolatedRemove from tick loop, call enter_safe_state()

A single late tick might be a transient GC pause. The graduated response gives transient problems time to resolve while still catching truly frozen nodes.

Per-Node Watchdog

Override the global watchdog for specific nodes:

motor = Node(
    name="motor",
    tick=motor_ctrl,
    rate=1000,
    order=0,
    watchdog=200 * ms,  # Stricter: 200 ms instead of global 500 ms
)

Deadline Miss Escalation

Set max_deadline_misses to stop the scheduler after a cumulative threshold:

sched = Scheduler(tick_rate=1000, max_deadline_misses=50)
# After 50 total deadline misses across all nodes, the scheduler stops

safety_stats()

Query safety monitoring statistics at runtime:

stats = sched.safety_stats()
if stats:
    print(f"Budget overruns: {stats['budget_overruns']}")
    print(f"Watchdog expirations: {stats['watchdog_expirations']}")
    print(f"Node health states: {stats['health_states']}")

get_node_stats()

Query per-node statistics:

stats = sched.get_node_stats("motor")
print(f"Node: {stats['name']}")
print(f"Priority: {stats['priority']}")
print(f"Total ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")

Real-Time Features

Enabling RT

Pass rt=True to the scheduler to request real-time scheduling:

sched = Scheduler(tick_rate=1000, rt=True)

This enables:

  • SCHED_FIFO: Linux real-time scheduling class for all RT-class nodes
  • mlockall: Lock all memory pages to prevent page faults during ticks
  • CPU isolation: Use isolated cores when available

Checking RT Capabilities

sched = Scheduler(tick_rate=1000, rt=True)

# Did we get full RT?
if sched.has_full_rt():
    print("Full real-time capabilities active")
else:
    print("Running with degraded RT")
    for d in sched.degradations():
        print(f"  Degradation: {d}")

# Detailed capabilities
caps = sched.capabilities()
print(f"RT scheduling: {caps['rt_scheduling']}")
print(f"Memory locking: {caps['memory_locking']}")
print(f"CPU isolation: {caps['cpu_isolation']}")

Degradations

When rt=True is set but the system cannot provide all RT features, the scheduler degrades gracefully and logs what it could not enable:

sched = Scheduler(tick_rate=1000, rt=True)
for msg in sched.degradations():
    print(f"  {msg}")
# Example output:
#   SCHED_FIFO unavailable (no CAP_SYS_NICE) - using SCHED_OTHER
#   mlockall failed (EPERM) - memory pages may be swapped

The scheduler still runs. Nodes still tick. Timing guarantees are weakened but not absent.

For development, rt=True is safe --- the scheduler does its best and logs what it cannot do. For production where degraded mode is unacceptable, check has_full_rt() at startup and refuse to continue if it returns False.

Deterministic Mode

Enable deterministic mode for reproducible simulation and testing:

sched = Scheduler(tick_rate=100, deterministic=True)

In deterministic mode:

  • horus.dt() returns a fixed 1/rate every tick (not wall-clock elapsed)
  • horus.now() advances by dt each tick (SimClock)
  • horus.rng_float() returns a tick-seeded sequence --- same across runs
  • Execution order is determined by the dependency graph (inferred from topic connections), not OS thread scheduling

This guarantees identical results across runs on any machine. Use it for:

  • Unit tests: Assert exact outputs after N ticks
  • Simulation: Physics engines need fixed timesteps
  • Regression tests: Catch behavioral changes in CI
sched = Scheduler(tick_rate=100, deterministic=True)
sched.add(Node(name="sim_sensor", tick=sim_tick, rate=100, order=0))

for _ in range(1000):
    sched.tick_once()
    assert horus.dt() == 0.01  # Fixed: 1/100 Hz = 10 ms

Recording and Replay

Recording a Session

sched = Scheduler(tick_rate=100, recording=True, blackbox_mb=16)
sched.add(Node(name="sensor", tick=read_lidar, rate=100, order=0))
sched.run(duration=60.0)  # Record 60 seconds of data

# Check recording status
print(sched.is_recording())  # True while running

Stopping and Listing Recordings

# Stop recording and get file paths
files = sched.stop_recording()
for f in files:
    print(f"Recorded: {f}")

# List all available recordings
recordings = sched.list_recordings()
for r in recordings:
    print(f"Session: {r}")

Deleting Recordings

sched.delete_recording("session_2026_03_20_143000")

Flight Recorder (Blackbox)

The blackbox is a rolling buffer that records the last N megabytes of tick data. Unlike recording, which captures everything to disk, the blackbox keeps a fixed-size ring buffer in memory and only writes to disk on crash or explicit dump:

sched = Scheduler(tick_rate=1000, blackbox_mb=64)
# On crash: last 64 MB of tick data is saved for post-mortem analysis

Introspection

Query the scheduler at runtime:

print(sched.status())         # "idle", "running", or "stopped"
print(sched.current_tick())   # Current tick number
print(sched.is_running())     # True if in the tick loop
print(sched.scheduler_name()) # Scheduler name

# Node introspection
print(sched.get_node_count())   # Number of registered nodes
print(sched.get_node_names())   # ["sensor", "motor", "planner"]
print(sched.has_node("motor"))  # True

# All node info
for info in sched.get_all_nodes():
    print(f"{info['name']}: order={info['order']}, state={info['state']}")

# Specific node
order = sched.get_node_info("motor")  # Returns execution order (int)

Node Lifecycle Callbacks

Beyond tick, nodes have init and shutdown callbacks:

def setup_hardware(node):
    node.log_info("Connecting to motor controller...")
    # Hardware init here
    node.send("status", {"state": "ready"})

def control(node):
    if node.has_msg("cmd"):
        cmd = node.recv("cmd")
        # Motor control logic
        node.log_debug(f"Command: {cmd}")

def cleanup(node):
    node.log_info("Stopping motor controller...")
    node.send("cmd", {"linear": 0.0, "angular": 0.0})  # Stop motors

motor = Node(
    name="motor",
    init=setup_hardware,       # Called once at startup
    tick=control,              # Called every tick
    shutdown=cleanup,          # Called once at shutdown
    rate=1000,
    order=5,
    budget=300 * us,
)

Logging methods (log_info, log_warning, log_error, log_debug) only work inside init, tick, and shutdown callbacks. Calling them outside these contexts silently drops the message.

Production Patterns

Warehouse AGV

A full warehouse AGV with safety monitoring, path planning, motor control, and fleet reporting:

import horus
from horus import Node, Scheduler, us, ms

# --- Node callbacks ---

def safety_check(node):
    """Order 0: runs before everything else, every cycle."""
    if node.has_msg("scan"):
        scan = node.recv("scan")
        min_range = min(scan["ranges"])
        if min_range < 0.3:  # 30 cm emergency threshold
            node.send("emergency.stop", {"reason": "obstacle", "distance": min_range})
            node.log_warning(f"Emergency stop: obstacle at {min_range:.2f}m")

def read_lidar(node):
    """Order 10: read LiDAR at 40 Hz."""
    # Hardware read
    node.send("scan", {"ranges": [1.2, 0.8, 2.5, 1.1], "angle_min": -1.57})

def plan_path(node):
    """Order 50: heavy computation on thread pool."""
    if node.has_msg("scan"):
        scan = node.recv("scan")
        # A* or RRT on occupancy grid (40+ ms of computation)
        path = compute_path(scan)
        node.send("path", path)

def track_path(node):
    """Order 60: pure pursuit controller at 100 Hz."""
    if node.has_msg("path"):
        path = node.recv("path")
        cmd = pure_pursuit(path, lookahead=0.5)
        node.send("cmd_vel", cmd)

def motor_drive(node):
    """Order 70: motor controller at 1 kHz with tight budget."""
    if node.has_msg("cmd_vel"):
        cmd = node.recv("cmd_vel")
        # Write to motor hardware
        apply_wheel_velocities(cmd["left"], cmd["right"])

def battery_check(node):
    """Order 100: slow background monitoring."""
    voltage = read_battery_voltage()
    if voltage < 22.0:
        node.log_warning(f"Low battery: {voltage:.1f}V")
    node.send("battery", {"voltage": voltage})

def fleet_report(node):
    """Order 200: async HTTP upload to fleet management."""
    if node.has_msg("battery"):
        data = node.recv("battery")
        # aiohttp call to fleet server
        node.send("fleet.telemetry", data)

# --- Build and run ---

safety   = Node(name="safety",  tick=safety_check, rate=1000, order=0,
                budget=100*us, deadline=200*us, on_miss="stop",
                subs=["scan"], pubs=["emergency.stop"])

lidar    = Node(name="lidar",   tick=read_lidar, rate=40, order=10,
                pubs=["scan"])

planner  = Node(name="planner", tick=plan_path, rate=10, order=50,
                compute=True, subs=["scan"], pubs=["path"])

tracker  = Node(name="tracker", tick=track_path, rate=100, order=60,
                subs=["path"], pubs=["cmd_vel"])

motor    = Node(name="motor",   tick=motor_drive, rate=1000, order=70,
                budget=300*us, deadline=900*us, on_miss="safe_mode",
                core=2, subs=["cmd_vel"])

battery  = Node(name="battery", tick=battery_check, rate=1, order=100)

fleet    = Node(name="fleet",   tick=fleet_report, rate=1, order=200,
                failure_policy="skip", subs=["battery"], pubs=["fleet.telemetry"])

with Scheduler(tick_rate=1000, rt=True, watchdog_ms=500,
               blackbox_mb=16, max_deadline_misses=50) as sched:
    sched.add(safety).add(lidar).add(planner).add(tracker)
    sched.add(motor).add(battery).add(fleet)
    sched.add_critical_node("motor", timeout_ms=200)
    sched.add_critical_node("safety", timeout_ms=100)
    sched.run()

What this sets up:

  • Safety monitor at order 0 runs before everything --- if it detects an obstacle, it fires before the motor ticks
  • LiDAR at 40 Hz feeds the planner and safety monitor
  • Path planner on the Compute thread pool --- its 40 ms computation does not block the 1 kHz motor controller
  • Motor controller pinned to core 2 with a 300 us budget --- enters safe state on miss
  • Fleet reporter tolerates network failures with failure_policy="skip"
  • Motor and safety are critical nodes --- if either freezes, the entire system enters safe state

Drone Flight Controller

A quadrotor with IMU fusion, PID control, and telemetry:

import horus
from horus import Node, us, ms

def imu_read(node):
    """Read IMU at 400 Hz."""
    raw = read_imu_hardware()
    fused = complementary_filter(raw)
    node.send("imu", fused)

def attitude_control(node):
    """Inner loop: attitude PID at 400 Hz."""
    if node.has_msg("imu") and node.has_msg("setpoint"):
        imu = node.recv("imu")
        sp = node.recv("setpoint")
        delta = horus.dt()
        motors = pid_control(imu, sp, delta)
        node.send("motor_cmd", motors)

def position_control(node):
    """Outer loop: position PID at 50 Hz."""
    if node.has_msg("gps"):
        gps = node.recv("gps")
        waypoint = node.recv("waypoint") if node.has_msg("waypoint") else current_waypoint()
        setpoint = position_pid(gps, waypoint, horus.dt())
        node.send("setpoint", setpoint)

def motor_output(node):
    """ESC output at 400 Hz with hard deadline."""
    if node.has_msg("motor_cmd"):
        cmd = node.recv("motor_cmd")
        write_esc(cmd)

def telemetry_log(node):
    """Log to SD card at 10 Hz."""
    if node.has_msg("imu"):
        node.send("log", {
            "t": horus.now(),
            "tick": horus.tick(),
            "imu": node.recv("imu"),
        })

imu    = Node(name="imu",      tick=imu_read,         rate=400, order=0, core=0)
att    = Node(name="attitude",  tick=attitude_control,  rate=400, order=10,
              budget=200*us, deadline=500*us, on_miss="safe_mode", core=1,
              subs=["imu", "setpoint"], pubs=["motor_cmd"])
pos    = Node(name="position",  tick=position_control,  rate=50,  order=20,
              subs=["gps", "waypoint"], pubs=["setpoint"])
esc    = Node(name="esc",       tick=motor_output,      rate=400, order=30,
              budget=100*us, deadline=200*us, on_miss="stop", core=1,
              subs=["motor_cmd"])
logger = Node(name="logger",    tick=telemetry_log,     rate=10,  order=100,
              subs=["imu"], pubs=["log"])

horus.run(imu, att, pos, esc, logger,
          tick_rate=400, rt=True, watchdog_ms=100,
          blackbox_mb=8, max_deadline_misses=10, cores=[0, 1])

Key patterns:

  • Inner loop (attitude) and outer loop (position) run at different rates through the same scheduler
  • horus.dt() provides frame-rate-independent PID integration
  • ESC output uses on_miss="stop" --- if the ESC misses a deadline, stop the scheduler (drone must not fly with stale motor commands)
  • cores=[0, 1] pins the entire scheduler to two dedicated cores
  • blackbox_mb=8 records the last 8 MB of flight data for crash investigation

Testing with the Scheduler

Unit Testing a Single Node

def test_counter_node():
    """Test that counter increments each tick."""
    results = []

    def counter_tick(node):
        count = (node.recv("count") or 0) + 1
        node.send("count", count)
        results.append(count)

    sched = Scheduler(tick_rate=100, deterministic=True)
    sched.add(Node(name="counter", tick=counter_tick, rate=100, order=0,
                   pubs=["count"], subs=["count"]))

    sched.tick_once()
    sched.tick_once()
    sched.tick_once()

    assert results == [1, 2, 3]

Integration Testing Multiple Nodes

def test_sensor_to_motor_pipeline():
    """Test that sensor data flows through the controller to the motor."""
    motor_cmds = []

    def fake_sensor(node):
        node.send("scan", {"ranges": [0.5, 1.0, 1.5]})

    def controller(node):
        if node.has_msg("scan"):
            scan = node.recv("scan")
            node.send("cmd", {"speed": min(scan["ranges"])})

    def mock_motor(node):
        if node.has_msg("cmd"):
            motor_cmds.append(node.recv("cmd"))

    sched = Scheduler(tick_rate=100, deterministic=True)
    sched.add(Node(name="sensor", tick=fake_sensor, rate=100, order=0, pubs=["scan"]))
    sched.add(Node(name="ctrl",   tick=controller,  rate=100, order=10,
                   subs=["scan"], pubs=["cmd"]))
    sched.add(Node(name="motor",  tick=mock_motor,  rate=100, order=20, subs=["cmd"]))

    # Run enough ticks for data to flow through the pipeline
    sched.tick_for(0.1)

    assert len(motor_cmds) > 0
    assert motor_cmds[0]["speed"] == 0.5

Testing Safety Behavior

def test_watchdog_detects_frozen_node():
    """Verify the watchdog catches a node that hangs."""
    import time

    def frozen_tick(node):
        time.sleep(2.0)  # Simulate frozen node

    sched = Scheduler(tick_rate=10, watchdog_ms=100)
    sched.add(Node(name="frozen", tick=frozen_tick, rate=10, order=0))

    sched.tick_once()

    stats = sched.safety_stats()
    assert stats is not None
    assert stats["watchdog_expirations"] > 0

Design Decisions

Why horus.run() instead of manual thread management?

Python's GIL makes multi-threaded Python unreliable for real-time work. horus.run() hands control to the Rust scheduler, which manages threads natively. Python tick() functions are called back from Rust --- the GIL is acquired only for the duration of each Python callback, then released. This gives Python nodes near-native scheduling precision while keeping the API simple.

Why all config on Node() instead of scheduler.add().order().rate().build()?

In Python, keyword arguments are idiomatic. Chained builder methods are a Rust pattern that does not translate well. Node(rate=1000, order=0, budget=300*us) is immediately readable. It also means the node carries its own configuration --- you can pass it between functions, store it in a list, or serialize it without losing scheduling intent.

Why us and ms constants instead of special unit types?

300 * us is plain Python math --- it produces a float in seconds. No special types to import, no conversion functions to remember, no confusion about what unit a function expects. The cost is that budget=300 silently means 300 seconds, which is why the constants exist and the docstrings emphasize them.

Why lazy initialization?

init() runs when scheduler.run() is called, not when scheduler.add() is called. This means you can configure all nodes, set global scheduler settings, and defer hardware initialization until the system is truly ready to start. It also means the scheduler's clock, RT configuration, and recording state are all finalized before any node initializes.

Why reverse-order shutdown?

Nodes are typically added in dependency order: sensors then controllers then loggers. Reverse-order shutdown means controllers stop motors before sensors disconnect, and loggers record the shutdown sequence before they themselves stop.

Trade-offs

GainCost
One-liner horus.run() --- no boilerplateLess control than manual Scheduler for runtime mutation
Python GIL released between ticks --- Rust scheduler handles threadingPython tick() functions must not hold the GIL for work done outside the callback
Auto-derived budget/deadline from rate --- less to configureMust use explicit budget/deadline to override the 80%/95% defaults
Graduated watchdog --- transient spikes do not kill nodes3x timeout before isolation means truly frozen nodes take longer to detect
deterministic=True --- reproducible tests and simulationsNot suitable for production (no wall-clock timing)
All config on Node() --- Pythonic, self-containedCannot reconfigure a node after creation (create a new one instead)
tick_once() for testing --- fully deterministic single-stepNo rate control in single-step mode (by design)
Recording --- full session capture for replayDisk I/O overhead, not suitable for ultra-low-latency production
RT degradation --- runs on developer laptops and production alikeMust check has_full_rt() to confirm actual RT in production

See Also