Scheduler Deep-Dive (Python)
A warehouse AGV runs eight nodes: two LiDARs, a camera pipeline, a safety monitor, a path planner, motor controllers, a battery watcher, and a cloud uploader. The motor controller must tick every millisecond. The safety monitor must run before the motors every cycle. The path planner needs 40 ms of CPU time and must never block the motor controller. When the emergency stop fires, motors must halt before sensors disconnect.
Coordinating all of this by hand --- threads, locks, timers, signal handlers --- is the single biggest source of bugs in robotics software.
The HORUS scheduler handles it. You configure nodes, call run(), and the scheduler manages execution order, tick rates, deadline enforcement, safety monitoring, and graceful shutdown.
The Execution Model
How a Tick Works
Every tick cycle, the scheduler does the following:
- For each registered node (in
.order()sequence):- Check the per-node rate limiter --- is this node due to tick?
- If yes, start a timer and call the node's
tick()function - Stop the timer. If the node exceeded its budget, apply its
on_misspolicy - Record metrics (tick duration, overruns, health state)
- After all nodes are processed, sleep to maintain the global tick rate
Tick N Tick N+1
| |
v v
[ safety ] [ sensor ] [ motor ] [sleep] [ safety ] [ sensor ] ...
order=0 order=10 order=20 order=0 order=10
Key details:
- BestEffort nodes (the default) execute sequentially in order on the main thread. RT, Compute, Event, and AsyncIo nodes run on their own threads and synchronize at tick boundaries.
- Rate limiting is per-node: a 10 Hz node inside a 1 kHz scheduler only has
tick()called every 100th cycle. - Budget enforcement happens after
tick()returns --- the scheduler does not preempt mid-tick. It records the overrun and applies the miss policy.
Initialization
When you call scheduler.run() (or horus.run()), initialization happens lazily:
- All pending node configurations are finalized (execution class inference, budget/deadline auto-derivation)
init()is called on every node, in.order()sequence- If a node's
init()raises an exception, that node enters Error state and is excluded from ticking. Other nodes continue. - The main tick loop begins
Lazy initialization means you can add and configure nodes in any order. The scheduler resolves everything at startup.
Shutdown
When the scheduler receives a stop signal (Ctrl+C, scheduler.stop(), or a node calling node.request_stop()):
- The main loop exits
shutdown()is called on every node in reverse order --- last-added first- RT threads are given 3 seconds to exit; stalled threads are detached
- Shared memory is cleaned up
Reverse-order shutdown ensures dependent nodes stop before their dependencies. The motor controller (order 20) shuts down before the sensor (order 10) that feeds it.
horus.run() --- The One-Liner
For most programs, horus.run() is all you need:
import horus
from horus import us
def read_sensor(node):
node.send("scan", {"ranges": [1.0, 2.0, 3.0]})
def navigate(node):
if node.has_msg("scan"):
scan = node.recv("scan")
node.send("cmd", {"linear": 0.5, "angular": scan["ranges"][0]})
def drive(node):
if node.has_msg("cmd"):
cmd = node.recv("cmd")
# Send to motor hardware
node.log_info(f"Driving: linear={cmd['linear']}")
sensor = horus.Node(name="sensor", tick=read_sensor, rate=10, order=0, pubs=["scan"])
ctrl = horus.Node(name="controller", tick=navigate, rate=30, order=10, subs=["scan"], pubs=["cmd"])
motor = horus.Node(name="motor", tick=drive, rate=1000, order=20, budget=300*us, subs=["cmd"])
horus.run(sensor, ctrl, motor, rt=True, watchdog_ms=500)
horus.run() creates a Scheduler behind the scenes, adds all nodes, and calls run(). Every scheduler-level parameter is available as a keyword argument:
horus.run(
*nodes,
duration=None, # seconds (None = forever)
tick_rate=1000.0, # Hz
rt=False, # SCHED_FIFO + mlockall
deterministic=False, # SimClock
blackbox_mb=0, # Flight recorder size
watchdog_ms=0, # Frozen node detection
recording=False, # Session recording
name=None, # Scheduler name
cores=None, # CPU affinity [0, 1, ...]
max_deadline_misses=None, # Escalation threshold
verbose=False, # Debug logging
telemetry=None, # Endpoint URL
)
When to use horus.run() vs Scheduler: Use horus.run() when you create all nodes upfront and run until Ctrl+C (or a fixed duration). Use Scheduler when you need runtime mutation (adding/removing nodes, changing rates), tick_once() for testing, or the context manager pattern.
The Scheduler Class
Creating a Scheduler
All configuration happens through keyword arguments:
from horus import Scheduler, Node, us, ms
sched = Scheduler(
tick_rate=1000.0, # 1 kHz global tick rate
rt=True, # Enable real-time scheduling
watchdog_ms=500, # 500 ms frozen-node detection
blackbox_mb=16, # 16 MB flight recorder
max_deadline_misses=50, # Emergency stop after 50 misses
)
Scheduler Parameters Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
tick_rate | float | 1000.0 | Global tick rate in Hz. Match to your fastest node |
rt | bool | False | Enable SCHED_FIFO scheduling and mlockall memory locking |
deterministic | bool | False | Enable SimClock (fixed dt, seeded RNG, reproducible results) |
blackbox_mb | int | 0 | Flight recorder buffer size in MB. 0 disables |
watchdog_ms | int | 0 | Watchdog timeout in milliseconds. 0 disables |
recording | bool | False | Enable session recording for replay |
name | str or None | None | Scheduler name for logging and diagnostics |
cores | list[int] or None | None | Pin scheduler to specific CPU cores |
max_deadline_misses | int or None | None | Emergency stop threshold. Default (in Rust): 100 |
verbose | bool | False | Enable verbose debug logging |
telemetry | str or None | None | Telemetry export endpoint URL |
Adding Nodes
sched = Scheduler(tick_rate=1000, rt=True)
sched.add(Node(name="safety", tick=safety_fn, rate=1000, order=0))
sched.add(Node(name="motor", tick=motor_fn, rate=1000, order=5, budget=300*us))
sched.add(Node(name="planner", tick=plan_fn, rate=50, order=50, compute=True))
sched.run()
add() returns self, so you can chain:
sched.add(sensor_node).add(ctrl_node).add(motor_node)
Running
# Run forever (until Ctrl+C or .stop())
sched.run()
# Run for a fixed duration
sched.run(duration=30.0) # 30 seconds
Context Manager
The context manager calls stop() automatically on exit, ensuring clean shutdown even if an exception occurs:
with Scheduler(tick_rate=100, watchdog_ms=500) as sched:
sched.add(Node(name="sensor", tick=read_sensor, rate=100, order=0))
sched.add(Node(name="logger", tick=log_data, rate=10, order=100))
sched.run(duration=60.0)
# stop() called automatically here
This is the recommended pattern for production code.
Node Scheduling Parameters
Every scheduling parameter is set on the Node constructor. The scheduler reads them when you call add().
Parameter Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
rate | float | 30 | Node tick rate in Hz |
order | int | 100 | Execution order (lower = earlier). 0-9 critical, 10-49 high, 50-99 normal, 100-199 low, 200+ background |
budget | float or None | None | Max expected tick duration in seconds. None = auto (80% of period) |
deadline | float or None | None | Hard deadline in seconds. None = auto (95% of period) |
on_miss | str or None | None | Deadline miss policy: "warn", "skip", "safe_mode", "stop" |
failure_policy | str or None | None | Error policy: "fatal", "restart", "skip", "ignore" |
compute | bool | False | Run on thread pool (CPU-heavy work) |
on | str or None | None | Event-driven --- tick only when this topic receives a message |
priority | int or None | None | OS thread priority (SCHED_FIFO 1-99, requires rt=True) |
core | int or None | None | Pin to specific CPU core index |
watchdog | float or None | None | Per-node watchdog timeout in seconds (overrides global) |
Budget and deadline are in seconds. Python floats represent seconds, not microseconds. Use the us and ms constants for readability: budget=300 * us means 300 microseconds. Writing budget=300 means 300 seconds --- almost certainly not what you want.
Unit Constants
from horus import us, ms
us = 1e-6 # microseconds -> seconds
ms = 1e-3 # milliseconds -> seconds
# Examples
budget=300 * us # 300 microseconds
deadline=900 * us # 900 microseconds
deadline=5 * ms # 5 milliseconds
watchdog=500 * ms # 500 milliseconds (or just use watchdog_ms=500 on the Scheduler)
Execution Classes
The scheduler assigns each node an execution class based on its configuration. You do not set this directly --- it is inferred:
| Configuration | Assigned Class | Thread Model | Best For |
|---|---|---|---|
rate, budget, or deadline set | Rt | Dedicated thread, budget enforced | Motor control, safety, sensor fusion |
compute=True | Compute | Thread pool | Path planning, SLAM, image processing |
on="topic.name" | Event | Sleeps until message arrives | Emergency stop, command handlers |
async def tick | AsyncIo | Tokio runtime | HTTP, cloud upload, database |
| None of the above | BestEffort | Sequential on main thread | Logging, telemetry, display |
RT is auto-detected. Setting rate, budget, or deadline on a node automatically assigns the Rt execution class. There is no separate rt=True on individual nodes --- the scheduler infers it from timing constraints.
Budget and Deadline Auto-Derivation
When you set rate on a node, the scheduler auto-derives timing constraints:
- Budget = 80% of the period (e.g., 1000 Hz means a 1 ms period, so budget = 800 us)
- Deadline = 95% of the period (e.g., 1000 Hz means deadline = 950 us)
You can override either with explicit values:
# Auto-derived: rate=1000 -> budget=800us, deadline=950us
auto_node = Node(name="auto", tick=fn, rate=1000, order=0)
# Explicit override
explicit_node = Node(name="explicit", tick=fn, rate=1000, order=0,
budget=300 * us, deadline=900 * us)
The on_miss Policy
When a node exceeds its deadline, the miss policy fires:
| Policy | Behavior | Use When |
|---|---|---|
"warn" | Log warning, continue normally | Non-critical nodes (default) |
"skip" | Skip this node's next tick to recover | High-frequency nodes that can afford one skipped cycle |
"safe_mode" | Call enter_safe_state() on the node | Motor controllers, actuators |
"stop" | Stop the entire scheduler | Safety monitors, last resort |
motor = Node(
name="motor",
tick=motor_ctrl,
rate=1000,
order=5,
budget=300 * us,
deadline=900 * us,
on_miss="safe_mode", # Safe state on deadline miss
)
safety = Node(
name="safety_monitor",
tick=check_safety,
rate=1000,
order=0,
budget=100 * us,
deadline=200 * us,
on_miss="stop", # Stop everything if safety monitor misses
)
The failure_policy
When a node's tick() raises an exception, the failure policy determines what happens:
| Policy | Behavior |
|---|---|
"fatal" | Stop the entire scheduler |
"restart" | Re-initialize the node and resume |
"skip" | Skip this tick, try again next cycle |
"ignore" | Log the error and continue |
cloud = Node(
name="cloud_upload",
tick=upload_data,
rate=1,
order=200,
failure_policy="skip", # Network glitches shouldn't stop the robot
)
Async Nodes
If your tick function is async def, the node is automatically assigned to the AsyncIo execution class. No additional configuration needed.
import aiohttp
async def upload_telemetry(node):
if node.has_msg("telemetry"):
data = node.recv("telemetry")
async with aiohttp.ClientSession() as session:
await session.post("https://fleet.example.com/telemetry", json=data)
uploader = Node(
name="cloud",
tick=upload_telemetry, # async -> automatically AsyncIo class
rate=1,
order=200,
pubs=[],
subs=["telemetry"],
failure_policy="skip",
)
Event-Driven Nodes
Set on="topic.name" to create a node that sleeps until that topic receives a message:
def handle_estop(node):
msg = node.recv("emergency.stop")
node.log_warning(f"E-STOP triggered: {msg}")
node.request_stop()
estop = Node(
name="estop",
tick=handle_estop,
on="emergency.stop", # Only wakes when message arrives
order=0,
subs=["emergency.stop"],
)
Compute Nodes
Set compute=True for CPU-heavy work that should run on a thread pool instead of the main loop:
def plan_path(node):
if node.has_msg("map"):
grid = node.recv("map")
# Heavy computation — runs on thread pool, won't block motor controller
path = a_star(grid, start=(0, 0), goal=(10, 10))
node.send("path", path)
planner = Node(
name="planner",
tick=plan_path,
rate=10,
order=50,
compute=True, # Thread pool, not main loop
subs=["map"],
pubs=["path"],
)
CPU Pinning and Priority
For maximum timing determinism, pin nodes to specific CPU cores and set OS thread priority:
motor = Node(
name="motor",
tick=motor_ctrl,
rate=1000,
order=0,
budget=300 * us,
core=2, # Pin to CPU core 2
priority=80, # SCHED_FIFO priority (requires rt=True on scheduler)
)
CPU pinning and priority require rt=True on the scheduler. Without it, these hints are applied on a best-effort basis. On Linux, SCHED_FIFO priorities require either root or the CAP_SYS_NICE capability.
The Framework Clock
Inside tick(), init(), and shutdown() callbacks, the framework clock provides consistent time across all nodes:
import horus
def control_loop(node):
t = horus.now() # Current time (seconds)
delta = horus.dt() # Time since last tick (seconds)
total = horus.elapsed() # Time since scheduler start (seconds)
n = horus.tick() # Current tick number (int)
remaining = horus.budget_remaining() # Time left in budget (seconds)
# Use dt for frame-rate-independent physics
velocity += acceleration * delta
position += velocity * delta
| Function | Returns | Description |
|---|---|---|
horus.now() | float | Current time in seconds. Wall clock in normal mode, SimClock in deterministic mode |
horus.dt() | float | Time elapsed since last tick in seconds. Fixed 1/rate in deterministic mode |
horus.elapsed() | float | Seconds since scheduler start |
horus.tick() | int | Monotonically increasing tick counter |
horus.budget_remaining() | float | Seconds remaining in this tick's budget. float('inf') if no budget set |
horus.rng_float() | float | Random float in [0.0, 1.0). System entropy normally, tick-seeded in deterministic mode |
Normal vs Deterministic Clock
In normal mode, horus.now() returns wall-clock time and horus.dt() returns the actual elapsed duration. In deterministic mode (deterministic=True), the scheduler uses a SimClock: horus.dt() returns a fixed 1/rate value every tick, and horus.rng_float() produces the same sequence across runs. This makes tests and simulations reproducible.
tick_once() --- Testing and Simulation
tick_once() executes exactly one tick cycle, then returns. This is the primary tool for testing scheduler behavior without threads or timing.
from horus import Scheduler, Node
def accumulate(node):
count = node.recv("count") or 0
node.send("count", count + 1)
sched = Scheduler(tick_rate=100)
sched.add(Node(name="counter", tick=accumulate, rate=100, order=0,
pubs=["count"], subs=["count"]))
sched.tick_once() # Init (lazy) + one tick
# count topic now has value 1
sched.tick_once() # Second tick
# count topic now has value 2
Selective Ticking
Pass a list of node names to tick only specific nodes:
sched.tick_once(["sensor"]) # Only tick the sensor
sched.tick_once(["sensor", "ctrl"]) # Tick sensor and controller, skip motor
This is useful for unit-testing a single node while keeping others frozen.
tick_for() --- Timed Runs
tick_for() runs the tick loop for a specific duration, then returns:
sched.tick_for(1.0) # Run for 1 second, then return
sched.tick_for(0.5, ["sensor"]) # Run only sensor for 0.5 seconds
This is useful for integration tests that need to observe behavior over time, and for simulation stepping where you advance by a fixed wall-clock interval.
Runtime Mutation
You can modify the scheduler while it is running.
set_node_rate()
Change a node's tick rate dynamically:
# Sensor normally runs at 100 Hz
sched.add(Node(name="sensor", tick=read_lidar, rate=100, order=0))
sched.run() # running in a thread or after tick_once
# Slow down to conserve power
sched.set_node_rate("sensor", 20) # 100 Hz -> 20 Hz
# Speed up for precision docking
sched.set_node_rate("sensor", 500) # 20 Hz -> 500 Hz
set_tick_budget()
Change a node's tick budget dynamically. The argument is in microseconds (unlike the Node constructor which takes seconds):
sched.set_tick_budget("motor", 500) # Allow 500 us per tick
remove_node()
Remove a node from the running scheduler:
removed = sched.remove_node("cloud_logger") # Returns True if found
The removed node's shutdown() is called before it is detached from the tick loop.
add_critical_node()
Mark a node as safety-critical with a dedicated watchdog timeout. If a critical node exceeds this timeout, the scheduler calls enter_safe_state() on all nodes:
sched.add_critical_node("motor_controller", timeout_ms=500)
This is stricter than the global watchdog: a regular node that freezes gets isolated; a critical node that freezes triggers a system-wide safe state.
Safety and Monitoring
Watchdog
Enable the watchdog with watchdog_ms on the scheduler:
sched = Scheduler(tick_rate=1000, watchdog_ms=500)
The watchdog uses graduated response:
| Timeout | Health State | Action |
|---|---|---|
| 1x watchdog | Warning | Log warning |
| 2x watchdog | Unhealthy | Skip tick, log error |
| 3x watchdog (critical node) | Isolated | Remove from tick loop, call enter_safe_state() |
A single late tick might be a transient GC pause. The graduated response gives transient problems time to resolve while still catching truly frozen nodes.
Per-Node Watchdog
Override the global watchdog for specific nodes:
motor = Node(
name="motor",
tick=motor_ctrl,
rate=1000,
order=0,
watchdog=200 * ms, # Stricter: 200 ms instead of global 500 ms
)
Deadline Miss Escalation
Set max_deadline_misses to stop the scheduler after a cumulative threshold:
sched = Scheduler(tick_rate=1000, max_deadline_misses=50)
# After 50 total deadline misses across all nodes, the scheduler stops
safety_stats()
Query safety monitoring statistics at runtime:
stats = sched.safety_stats()
if stats:
print(f"Budget overruns: {stats['budget_overruns']}")
print(f"Watchdog expirations: {stats['watchdog_expirations']}")
print(f"Node health states: {stats['health_states']}")
get_node_stats()
Query per-node statistics:
stats = sched.get_node_stats("motor")
print(f"Node: {stats['name']}")
print(f"Priority: {stats['priority']}")
print(f"Total ticks: {stats['total_ticks']}")
print(f"Errors: {stats['errors_count']}")
Real-Time Features
Enabling RT
Pass rt=True to the scheduler to request real-time scheduling:
sched = Scheduler(tick_rate=1000, rt=True)
This enables:
- SCHED_FIFO: Linux real-time scheduling class for all RT-class nodes
- mlockall: Lock all memory pages to prevent page faults during ticks
- CPU isolation: Use isolated cores when available
Checking RT Capabilities
sched = Scheduler(tick_rate=1000, rt=True)
# Did we get full RT?
if sched.has_full_rt():
print("Full real-time capabilities active")
else:
print("Running with degraded RT")
for d in sched.degradations():
print(f" Degradation: {d}")
# Detailed capabilities
caps = sched.capabilities()
print(f"RT scheduling: {caps['rt_scheduling']}")
print(f"Memory locking: {caps['memory_locking']}")
print(f"CPU isolation: {caps['cpu_isolation']}")
Degradations
When rt=True is set but the system cannot provide all RT features, the scheduler degrades gracefully and logs what it could not enable:
sched = Scheduler(tick_rate=1000, rt=True)
for msg in sched.degradations():
print(f" {msg}")
# Example output:
# SCHED_FIFO unavailable (no CAP_SYS_NICE) - using SCHED_OTHER
# mlockall failed (EPERM) - memory pages may be swapped
The scheduler still runs. Nodes still tick. Timing guarantees are weakened but not absent.
For development, rt=True is safe --- the scheduler does its best and logs what it cannot do. For production where degraded mode is unacceptable, check has_full_rt() at startup and refuse to continue if it returns False.
Deterministic Mode
Enable deterministic mode for reproducible simulation and testing:
sched = Scheduler(tick_rate=100, deterministic=True)
In deterministic mode:
horus.dt()returns a fixed1/rateevery tick (not wall-clock elapsed)horus.now()advances bydteach tick (SimClock)horus.rng_float()returns a tick-seeded sequence --- same across runs- Execution order is determined by the dependency graph (inferred from topic connections), not OS thread scheduling
This guarantees identical results across runs on any machine. Use it for:
- Unit tests: Assert exact outputs after N ticks
- Simulation: Physics engines need fixed timesteps
- Regression tests: Catch behavioral changes in CI
sched = Scheduler(tick_rate=100, deterministic=True)
sched.add(Node(name="sim_sensor", tick=sim_tick, rate=100, order=0))
for _ in range(1000):
sched.tick_once()
assert horus.dt() == 0.01 # Fixed: 1/100 Hz = 10 ms
Recording and Replay
Recording a Session
sched = Scheduler(tick_rate=100, recording=True, blackbox_mb=16)
sched.add(Node(name="sensor", tick=read_lidar, rate=100, order=0))
sched.run(duration=60.0) # Record 60 seconds of data
# Check recording status
print(sched.is_recording()) # True while running
Stopping and Listing Recordings
# Stop recording and get file paths
files = sched.stop_recording()
for f in files:
print(f"Recorded: {f}")
# List all available recordings
recordings = sched.list_recordings()
for r in recordings:
print(f"Session: {r}")
Deleting Recordings
sched.delete_recording("session_2026_03_20_143000")
Flight Recorder (Blackbox)
The blackbox is a rolling buffer that records the last N megabytes of tick data. Unlike recording, which captures everything to disk, the blackbox keeps a fixed-size ring buffer in memory and only writes to disk on crash or explicit dump:
sched = Scheduler(tick_rate=1000, blackbox_mb=64)
# On crash: last 64 MB of tick data is saved for post-mortem analysis
Introspection
Query the scheduler at runtime:
print(sched.status()) # "idle", "running", or "stopped"
print(sched.current_tick()) # Current tick number
print(sched.is_running()) # True if in the tick loop
print(sched.scheduler_name()) # Scheduler name
# Node introspection
print(sched.get_node_count()) # Number of registered nodes
print(sched.get_node_names()) # ["sensor", "motor", "planner"]
print(sched.has_node("motor")) # True
# All node info
for info in sched.get_all_nodes():
print(f"{info['name']}: order={info['order']}, state={info['state']}")
# Specific node
order = sched.get_node_info("motor") # Returns execution order (int)
Node Lifecycle Callbacks
Beyond tick, nodes have init and shutdown callbacks:
def setup_hardware(node):
node.log_info("Connecting to motor controller...")
# Hardware init here
node.send("status", {"state": "ready"})
def control(node):
if node.has_msg("cmd"):
cmd = node.recv("cmd")
# Motor control logic
node.log_debug(f"Command: {cmd}")
def cleanup(node):
node.log_info("Stopping motor controller...")
node.send("cmd", {"linear": 0.0, "angular": 0.0}) # Stop motors
motor = Node(
name="motor",
init=setup_hardware, # Called once at startup
tick=control, # Called every tick
shutdown=cleanup, # Called once at shutdown
rate=1000,
order=5,
budget=300 * us,
)
Logging methods (log_info, log_warning, log_error, log_debug) only work inside init, tick, and shutdown callbacks. Calling them outside these contexts silently drops the message.
Production Patterns
Warehouse AGV
A full warehouse AGV with safety monitoring, path planning, motor control, and fleet reporting:
import horus
from horus import Node, Scheduler, us, ms
# --- Node callbacks ---
def safety_check(node):
"""Order 0: runs before everything else, every cycle."""
if node.has_msg("scan"):
scan = node.recv("scan")
min_range = min(scan["ranges"])
if min_range < 0.3: # 30 cm emergency threshold
node.send("emergency.stop", {"reason": "obstacle", "distance": min_range})
node.log_warning(f"Emergency stop: obstacle at {min_range:.2f}m")
def read_lidar(node):
"""Order 10: read LiDAR at 40 Hz."""
# Hardware read
node.send("scan", {"ranges": [1.2, 0.8, 2.5, 1.1], "angle_min": -1.57})
def plan_path(node):
"""Order 50: heavy computation on thread pool."""
if node.has_msg("scan"):
scan = node.recv("scan")
# A* or RRT on occupancy grid (40+ ms of computation)
path = compute_path(scan)
node.send("path", path)
def track_path(node):
"""Order 60: pure pursuit controller at 100 Hz."""
if node.has_msg("path"):
path = node.recv("path")
cmd = pure_pursuit(path, lookahead=0.5)
node.send("cmd_vel", cmd)
def motor_drive(node):
"""Order 70: motor controller at 1 kHz with tight budget."""
if node.has_msg("cmd_vel"):
cmd = node.recv("cmd_vel")
# Write to motor hardware
apply_wheel_velocities(cmd["left"], cmd["right"])
def battery_check(node):
"""Order 100: slow background monitoring."""
voltage = read_battery_voltage()
if voltage < 22.0:
node.log_warning(f"Low battery: {voltage:.1f}V")
node.send("battery", {"voltage": voltage})
def fleet_report(node):
"""Order 200: async HTTP upload to fleet management."""
if node.has_msg("battery"):
data = node.recv("battery")
# aiohttp call to fleet server
node.send("fleet.telemetry", data)
# --- Build and run ---
safety = Node(name="safety", tick=safety_check, rate=1000, order=0,
budget=100*us, deadline=200*us, on_miss="stop",
subs=["scan"], pubs=["emergency.stop"])
lidar = Node(name="lidar", tick=read_lidar, rate=40, order=10,
pubs=["scan"])
planner = Node(name="planner", tick=plan_path, rate=10, order=50,
compute=True, subs=["scan"], pubs=["path"])
tracker = Node(name="tracker", tick=track_path, rate=100, order=60,
subs=["path"], pubs=["cmd_vel"])
motor = Node(name="motor", tick=motor_drive, rate=1000, order=70,
budget=300*us, deadline=900*us, on_miss="safe_mode",
core=2, subs=["cmd_vel"])
battery = Node(name="battery", tick=battery_check, rate=1, order=100)
fleet = Node(name="fleet", tick=fleet_report, rate=1, order=200,
failure_policy="skip", subs=["battery"], pubs=["fleet.telemetry"])
with Scheduler(tick_rate=1000, rt=True, watchdog_ms=500,
blackbox_mb=16, max_deadline_misses=50) as sched:
sched.add(safety).add(lidar).add(planner).add(tracker)
sched.add(motor).add(battery).add(fleet)
sched.add_critical_node("motor", timeout_ms=200)
sched.add_critical_node("safety", timeout_ms=100)
sched.run()
What this sets up:
- Safety monitor at order 0 runs before everything --- if it detects an obstacle, it fires before the motor ticks
- LiDAR at 40 Hz feeds the planner and safety monitor
- Path planner on the Compute thread pool --- its 40 ms computation does not block the 1 kHz motor controller
- Motor controller pinned to core 2 with a 300 us budget --- enters safe state on miss
- Fleet reporter tolerates network failures with
failure_policy="skip" - Motor and safety are critical nodes --- if either freezes, the entire system enters safe state
Drone Flight Controller
A quadrotor with IMU fusion, PID control, and telemetry:
import horus
from horus import Node, us, ms
def imu_read(node):
"""Read IMU at 400 Hz."""
raw = read_imu_hardware()
fused = complementary_filter(raw)
node.send("imu", fused)
def attitude_control(node):
"""Inner loop: attitude PID at 400 Hz."""
if node.has_msg("imu") and node.has_msg("setpoint"):
imu = node.recv("imu")
sp = node.recv("setpoint")
delta = horus.dt()
motors = pid_control(imu, sp, delta)
node.send("motor_cmd", motors)
def position_control(node):
"""Outer loop: position PID at 50 Hz."""
if node.has_msg("gps"):
gps = node.recv("gps")
waypoint = node.recv("waypoint") if node.has_msg("waypoint") else current_waypoint()
setpoint = position_pid(gps, waypoint, horus.dt())
node.send("setpoint", setpoint)
def motor_output(node):
"""ESC output at 400 Hz with hard deadline."""
if node.has_msg("motor_cmd"):
cmd = node.recv("motor_cmd")
write_esc(cmd)
def telemetry_log(node):
"""Log to SD card at 10 Hz."""
if node.has_msg("imu"):
node.send("log", {
"t": horus.now(),
"tick": horus.tick(),
"imu": node.recv("imu"),
})
imu = Node(name="imu", tick=imu_read, rate=400, order=0, core=0)
att = Node(name="attitude", tick=attitude_control, rate=400, order=10,
budget=200*us, deadline=500*us, on_miss="safe_mode", core=1,
subs=["imu", "setpoint"], pubs=["motor_cmd"])
pos = Node(name="position", tick=position_control, rate=50, order=20,
subs=["gps", "waypoint"], pubs=["setpoint"])
esc = Node(name="esc", tick=motor_output, rate=400, order=30,
budget=100*us, deadline=200*us, on_miss="stop", core=1,
subs=["motor_cmd"])
logger = Node(name="logger", tick=telemetry_log, rate=10, order=100,
subs=["imu"], pubs=["log"])
horus.run(imu, att, pos, esc, logger,
tick_rate=400, rt=True, watchdog_ms=100,
blackbox_mb=8, max_deadline_misses=10, cores=[0, 1])
Key patterns:
- Inner loop (attitude) and outer loop (position) run at different rates through the same scheduler
horus.dt()provides frame-rate-independent PID integration- ESC output uses
on_miss="stop"--- if the ESC misses a deadline, stop the scheduler (drone must not fly with stale motor commands) cores=[0, 1]pins the entire scheduler to two dedicated coresblackbox_mb=8records the last 8 MB of flight data for crash investigation
Testing with the Scheduler
Unit Testing a Single Node
def test_counter_node():
"""Test that counter increments each tick."""
results = []
def counter_tick(node):
count = (node.recv("count") or 0) + 1
node.send("count", count)
results.append(count)
sched = Scheduler(tick_rate=100, deterministic=True)
sched.add(Node(name="counter", tick=counter_tick, rate=100, order=0,
pubs=["count"], subs=["count"]))
sched.tick_once()
sched.tick_once()
sched.tick_once()
assert results == [1, 2, 3]
Integration Testing Multiple Nodes
def test_sensor_to_motor_pipeline():
"""Test that sensor data flows through the controller to the motor."""
motor_cmds = []
def fake_sensor(node):
node.send("scan", {"ranges": [0.5, 1.0, 1.5]})
def controller(node):
if node.has_msg("scan"):
scan = node.recv("scan")
node.send("cmd", {"speed": min(scan["ranges"])})
def mock_motor(node):
if node.has_msg("cmd"):
motor_cmds.append(node.recv("cmd"))
sched = Scheduler(tick_rate=100, deterministic=True)
sched.add(Node(name="sensor", tick=fake_sensor, rate=100, order=0, pubs=["scan"]))
sched.add(Node(name="ctrl", tick=controller, rate=100, order=10,
subs=["scan"], pubs=["cmd"]))
sched.add(Node(name="motor", tick=mock_motor, rate=100, order=20, subs=["cmd"]))
# Run enough ticks for data to flow through the pipeline
sched.tick_for(0.1)
assert len(motor_cmds) > 0
assert motor_cmds[0]["speed"] == 0.5
Testing Safety Behavior
def test_watchdog_detects_frozen_node():
"""Verify the watchdog catches a node that hangs."""
import time
def frozen_tick(node):
time.sleep(2.0) # Simulate frozen node
sched = Scheduler(tick_rate=10, watchdog_ms=100)
sched.add(Node(name="frozen", tick=frozen_tick, rate=10, order=0))
sched.tick_once()
stats = sched.safety_stats()
assert stats is not None
assert stats["watchdog_expirations"] > 0
Design Decisions
Why horus.run() instead of manual thread management?
Python's GIL makes multi-threaded Python unreliable for real-time work. horus.run() hands control to the Rust scheduler, which manages threads natively. Python tick() functions are called back from Rust --- the GIL is acquired only for the duration of each Python callback, then released. This gives Python nodes near-native scheduling precision while keeping the API simple.
Why all config on Node() instead of scheduler.add().order().rate().build()?
In Python, keyword arguments are idiomatic. Chained builder methods are a Rust pattern that does not translate well. Node(rate=1000, order=0, budget=300*us) is immediately readable. It also means the node carries its own configuration --- you can pass it between functions, store it in a list, or serialize it without losing scheduling intent.
Why us and ms constants instead of special unit types?
300 * us is plain Python math --- it produces a float in seconds. No special types to import, no conversion functions to remember, no confusion about what unit a function expects. The cost is that budget=300 silently means 300 seconds, which is why the constants exist and the docstrings emphasize them.
Why lazy initialization?
init() runs when scheduler.run() is called, not when scheduler.add() is called. This means you can configure all nodes, set global scheduler settings, and defer hardware initialization until the system is truly ready to start. It also means the scheduler's clock, RT configuration, and recording state are all finalized before any node initializes.
Why reverse-order shutdown?
Nodes are typically added in dependency order: sensors then controllers then loggers. Reverse-order shutdown means controllers stop motors before sensors disconnect, and loggers record the shutdown sequence before they themselves stop.
Trade-offs
| Gain | Cost |
|---|---|
One-liner horus.run() --- no boilerplate | Less control than manual Scheduler for runtime mutation |
| Python GIL released between ticks --- Rust scheduler handles threading | Python tick() functions must not hold the GIL for work done outside the callback |
Auto-derived budget/deadline from rate --- less to configure | Must use explicit budget/deadline to override the 80%/95% defaults |
| Graduated watchdog --- transient spikes do not kill nodes | 3x timeout before isolation means truly frozen nodes take longer to detect |
deterministic=True --- reproducible tests and simulations | Not suitable for production (no wall-clock timing) |
All config on Node() --- Pythonic, self-contained | Cannot reconfigure a node after creation (create a new one instead) |
tick_once() for testing --- fully deterministic single-step | No rate control in single-step mode (by design) |
| Recording --- full session capture for replay | Disk I/O overhead, not suitable for ultra-low-latency production |
| RT degradation --- runs on developer laptops and production alike | Must check has_full_rt() to confirm actual RT in production |
See Also
- Python API Reference --- Full API surface
- Scheduler: Running Your Nodes --- Beginner introduction
- Scheduler --- Full Reference --- Concepts and architecture
- Execution Classes --- Deep dive into the five classes
- Nodes --- Full Reference --- The components the scheduler runs
- Safety Monitor --- Watchdog, budget enforcement, graduated degradation
- Getting Started (Python) --- First Python application