Execution Classes (Python)
A motor controller that misses its deadline by even a millisecond can cause a robot arm to overshoot and collide with a person. A path planner that takes 50 ms of CPU time is completely normal — but if it runs on the same thread as the motor controller, it blocks 50 ticks. A logging node that takes an extra 10 ms is harmless. A cloud uploader that blocks on a network request shouldn't hold up anything.
These are fundamentally different workloads. Running them all the same way — in a single sequential loop — forces every node to compromise. The fast ones wait for the slow ones. The critical ones share a thread with the optional ones. A single slow node can cascade timing failures across the entire system.
HORUS solves this with execution classes: five different executors, each optimized for a specific workload type. The scheduler automatically selects the right class based on how you configure the Node() constructor — you describe what your node needs, and the scheduler figures out how to run it.
The Five Classes
BestEffort (Default)
Nodes tick sequentially in the main loop, ordered by order=. This is the simplest and lowest-overhead class.
import horus
def update_display(node):
stats = node.recv("stats")
if stats:
print(f"Speed: {stats['speed']:.1f} m/s")
display = horus.Node(
name="display",
subs=["stats"],
tick=update_display,
order=100,
# No rate=, compute=, on=, or async def → BestEffort
)
How it works: The main scheduler thread calls tick() on each BestEffort node in sequence, once per scheduler cycle. No threads are spawned. No synchronization is needed.
Use for: Logging, telemetry, display, diagnostic reporting — anything without timing requirements or heavy computation.
Characteristics:
- Runs at the scheduler's global
tick_rate - Deterministic ordering (same sequence every cycle)
- Lowest overhead — no thread spawn, no atomics, no synchronization
Rt (Real-Time)
Each RT node gets a dedicated thread with optional OS-level priority scheduling. The scheduler enforces timing budgets and deadlines, and takes action when a node runs too long.
import horus
us = horus.us # 1e-6
ms = horus.ms # 1e-3
def motor_control(node):
cmd = node.recv("cmd_vel")
if cmd:
write_to_motor(cmd.linear, cmd.angular)
# Auto-derived: budget = 80% of period, deadline = 95% of period
motor = horus.Node(
name="motor",
subs=[horus.CmdVel],
tick=motor_control,
rate=1000, # 1 kHz → budget=800μs, deadline=950μs
on_miss="safe_mode", # Enter safe state on deadline miss
order=0,
)
# Explicit budget and deadline
safety = horus.Node(
name="safety_monitor",
subs=["safety.heartbeat"],
tick=check_safety,
budget=100 * us,
deadline=200 * us,
on_miss="stop", # Full stop on deadline miss
order=0,
)
There is no rt=True parameter on Node(). RT is always auto-detected from timing constraints. Setting rate=, budget=, or deadline= — any of them — automatically assigns the Rt class. This maps developer intent ("this node needs to run at 1 kHz") directly to the right executor.
How it works: Each RT node runs on its own dedicated thread. If rt=True is set on the Scheduler and the OS supports it, the thread gets SCHED_FIFO real-time priority. The scheduler measures every tick() call and applies the on_miss policy when budget or deadline is exceeded.
Auto-derivation from rate:
When you set rate= without explicit budget= or deadline=, the scheduler derives them:
| You set | Scheduler derives |
|---|---|
rate=100 (10 ms period) | budget=8 ms (80%), deadline=9.5 ms (95%) |
rate=1000 (1 ms period) | budget=800 μs (80%), deadline=950 μs (95%) |
rate=500 (2 ms period) | budget=1.6 ms (80%), deadline=1.9 ms (95%) |
You can override either or both:
# Auto budget + explicit deadline
horus.Node(rate=1000, deadline=900 * us, ...)
# Explicit budget + auto deadline
horus.Node(rate=1000, budget=300 * us, ...)
# Both explicit
horus.Node(rate=1000, budget=300 * us, deadline=900 * us, ...)
Additional RT configuration:
critical = horus.Node(
name="critical_ctrl",
tick=control_loop,
rate=1000,
budget=300 * us,
deadline=900 * us,
on_miss="skip",
priority=90, # OS-level thread priority (1-99, higher = more urgent)
core=0, # Pin to CPU core 0
watchdog=0.5, # 500 ms freeze detection
order=0,
)
Use for: Motor control, safety monitoring, sensor fusion — anything where missing a deadline has physical consequences.
Compute
For CPU-heavy work that benefits from parallelism. Multiple Compute nodes run simultaneously on a shared thread pool.
import horus
def plan_path(node):
scan = node.recv("scan")
if scan:
path = a_star(scan.ranges, goal)
node.send("path", path)
planner = horus.Node(
name="planner",
subs=[horus.LaserScan],
pubs=["path"],
tick=plan_path,
compute=True, # CPU thread pool
rate=10, # Optional: tick at most 10 times/sec
order=5,
)
How it works: Compute nodes are dispatched to a thread pool. Multiple compute nodes can run in parallel on different CPU cores. They don't block the main tick loop or RT threads.
Use for: Path planning, SLAM, point cloud processing, ML inference on CPU, image processing — any CPU-bound work that takes more than ~1 ms per tick.
rate= on a Compute node does not make it RT — it only limits how often the node ticks. A rate=10, compute=True node ticks at most 10 times per second but has no budget or deadline enforcement.
Event
Nodes that sleep until a specific topic receives new data. Zero CPU usage when idle.
import horus
def handle_estop(node):
msg = node.recv("emergency.stop")
if msg:
node.log_warning("EMERGENCY STOP RECEIVED")
disable_all_motors()
node.request_stop()
estop = horus.Node(
name="estop_handler",
subs=["emergency.stop"],
tick=handle_estop,
on="emergency.stop", # Sleep until this topic gets data
order=0,
)
How it works: The node's thread sleeps. When any publisher calls send() on the named topic, the Event node wakes and tick() is called. If multiple messages arrive between wakes, the node ticks once — call recv() in a loop inside tick() to drain all pending messages.
Use for: Emergency stop handlers, command receivers, sparse event processors — anything where the node should be completely idle until something specific happens.
Characteristics:
- Zero CPU when no messages arrive
- Wake latency: ~microseconds from
send()totick() order=still applies: if two event nodes wake simultaneously, lower order runs first- The topic name in
on=must match a topic declared in another node'spubs
AsyncIo
For network, file I/O, or GPU operations that would block a real-time thread. Runs on a tokio runtime. In Python, this class is automatic — just use async def for your tick function.
import horus
import aiohttp
async def upload_telemetry(node):
if node.has_msg("telemetry"):
data = node.recv("telemetry")
async with aiohttp.ClientSession() as session:
await session.post("https://api.example.com/telemetry", json=data)
uploader = horus.Node(
name="uploader",
subs=["telemetry"],
tick=upload_telemetry, # async def → automatically AsyncIo
rate=1, # Upload once per second
order=50,
)
How it works: The node's tick() runs on a tokio-managed thread pool. The node can safely block on network requests, file I/O, or database queries without affecting any other node. Python's async def is detected automatically via inspect.iscoroutinefunction.
Use for: HTTP/REST API calls, database writes, file logging, cloud telemetry, WebSocket connections.
Unlike Rust (which uses an explicit .async_io() builder method), Python auto-detects async from the function definition. Writing async def tick(node): is all you need — no async_io=True parameter exists.
How Classes Are Selected
The scheduler selects the execution class based on which Node() parameters you set:
| Configuration | Resulting Class |
|---|---|
| (nothing special) | BestEffort |
rate=100 | Rt (auto-derived budget/deadline) |
budget=300*us | Rt |
deadline=900*us | Rt |
rate=100, budget=..., deadline=... | Rt (explicit overrides) |
compute=True | Compute |
compute=True, rate=10 | Compute (rate-limited, not RT) |
on="topic.name" | Event |
async def tick | AsyncIo |
async def tick, rate=1 | AsyncIo (rate-limited, not RT) |
Key rule: rate= only auto-enables RT when no explicit execution class (compute=True, on=, async def) is set. When combined with an explicit class, rate= just limits tick frequency.
The rate= Dual Meaning
This is the most important interaction to understand:
# rate= ALONE → Rt (dedicated thread, timing enforced)
horus.Node(tick=motor_ctrl, rate=1000)
# Result: Rt class, budget=800μs, deadline=950μs
# rate= WITH compute=True → Compute (frequency cap, no timing enforcement)
horus.Node(tick=plan_path, rate=10, compute=True)
# Result: Compute class, ticks at most 10/sec, no budget or deadline
# rate= WITH on= → Event (frequency cap after wake)
horus.Node(tick=handler, rate=100, on="commands")
# Result: Event class, processes at most 100 msg/sec after waking
# rate= WITH async def → AsyncIo (frequency cap)
horus.Node(tick=async_upload, rate=1)
# Result: AsyncIo class, uploads at most 1/sec, no budget or deadline
The rule is simple: rate= triggers RT only when it is the sole execution signal. The moment you add compute=True, on=, or use async def, the rate= becomes a frequency cap with no timing enforcement.
Deferred Finalization
Class selection happens when horus.run() or sched.add() resolves the node, not at Node() construction time. This means parameter order in the constructor does not matter:
# These produce identical results (both Compute, not RT):
horus.Node(rate=100, compute=True, tick=fn)
horus.Node(compute=True, rate=100, tick=fn)
If you accidentally set conflicting classes, the last explicit class wins and a warning is logged:
# compute=True is overridden by on= — warning logged
horus.Node(compute=True, on="topic", tick=fn) # → Event, NOT Compute
Decision Guide
| Your node does... | Use | Node() params |
|---|---|---|
| Motor control at 500+ Hz | Rt | rate=500 |
| Safety monitoring with deadlines | Rt | rate=100, budget=..., deadline=..., on_miss="stop" |
| Sensor fusion at 200 Hz | Rt | rate=200 |
| Path planning (takes 10-50 ms) | Compute | compute=True |
| ML inference on CPU | Compute | compute=True, rate=30 |
| SLAM processing | Compute | compute=True |
| React to emergency stop | Event | on="emergency.stop" |
| Process commands as they arrive | Event | on="commands" |
| Upload telemetry to cloud | AsyncIo | async def tick |
| Write logs to database | AsyncIo | async def tick |
| WebSocket streaming | AsyncIo | async def tick |
| Display dashboard updates | BestEffort | (default) |
| Simple diagnostics | BestEffort | (default) |
Validation and Common Mistakes
The scheduler validates your configuration when adding nodes and catches mistakes at startup, not at runtime.
What's Rejected
| Configuration | Error |
|---|---|
compute=True, budget=300*us | Budget only meaningful for RT nodes |
on="topic", deadline=900*us | Deadline only meaningful for RT nodes |
async def tick, budget=... | Budget only meaningful for RT nodes |
budget=0 | Budget must be > 0 |
on="" | Empty topic — node can never trigger |
compute=True with async def tick | Mutually exclusive — pick one |
on="topic" with async def tick | Mutually exclusive — pick one |
What's Warned
| Configuration | Warning |
|---|---|
compute=True, on="topic" | Last class wins (Event), first silently overridden |
compute=True, priority=99 | Priority ignored on non-RT nodes |
on_miss="stop" without rate/budget/deadline | No deadline to miss — policy has no effect |
core=0 without rate/budget/deadline | CPU pinning ignored on non-RT nodes |
Common Mistakes
Mistake 1: Thinking rate= always means RT
# WRONG assumption: "rate=10 means this is an RT node"
planner = horus.Node(
tick=plan_path,
rate=10,
compute=True, # ← compute=True overrides RT
)
# Result: Compute class. rate=10 is just a frequency cap.
# There is NO budget or deadline enforcement.
If you need timing enforcement on a compute-heavy node, drop compute=True and use rate= alone — but understand that the node gets a dedicated thread, not a pool:
# This IS an RT node — budget and deadline enforced
planner = horus.Node(
tick=plan_path,
rate=10,
budget=80 * ms,
deadline=95 * ms,
on_miss="warn",
)
Mistake 2: Setting on_miss= without a deadline
# on_miss has no effect — there's no deadline to miss
horus.Node(
tick=log_tick,
compute=True,
on_miss="stop", # ← useless on a Compute node
)
# Fix: make it an RT node so a deadline exists
horus.Node(
tick=ctrl_tick,
rate=100,
on_miss="stop", # ← now triggers when the 9.5 ms deadline is missed
)
Mistake 3: Thinking priority= works on Compute nodes
# Priority is silently ignored — only RT nodes get SCHED_FIFO threads
horus.Node(tick=plan, compute=True, priority=99)
# Fix: make it RT if you need OS-level priority
horus.Node(tick=plan, rate=100, priority=99)
Mistake 4: Using async def when you want Compute
# WRONG: this node does CPU-heavy ML inference, but async def
# makes it AsyncIo — it runs on the I/O pool, not the compute pool
async def infer(node):
img = node.recv("camera")
if img:
result = model.predict(img.to_numpy()) # CPU-bound, not I/O
node.send("detections", result)
# Fix: use a regular def with compute=True
def infer(node):
img = node.recv("camera")
if img:
result = model.predict(img.to_numpy())
node.send("detections", result)
detector = horus.Node(tick=infer, compute=True, rate=30)
Mistake 5: Budget on a Compute node
# REJECTED: budget is only for RT nodes
horus.Node(
tick=process,
compute=True,
budget=50 * ms, # ← scheduler rejects this
)
# Fix option A: remove compute=True (becomes RT with timing enforcement)
horus.Node(tick=process, rate=20, budget=50 * ms, on_miss="skip")
# Fix option B: remove budget (stays Compute, no timing enforcement)
horus.Node(tick=process, compute=True, rate=20)
Mistake 6: Forgetting that rate=30 (the default) triggers RT
# This looks innocent but IS an RT node (rate=30 is the default)
horus.Node(tick=log_tick)
# rate=30 is set by default → auto-RT with budget=26.6ms, deadline=31.6ms
# If you truly want BestEffort, you need rate=0 or the node must have
# no timing parameters. In practice, the default rate=30 makes most
# nodes RT — this is intentional for safety.
The default rate=30 on Node() means most nodes are RT by default. This is a deliberate safety choice — nodes get timing enforcement unless you explicitly opt out. If you want BestEffort, set compute=True with no rate, or use on= for event-driven behavior.
Complete Example: Mixed Execution Classes
import horus
import aiohttp
us = horus.us
ms = horus.ms
# --- Rt: 1 kHz motor control with strict timing ---
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd:
write_motors(cmd.linear, cmd.angular)
motor = horus.Node(
name="motor_ctrl",
subs=[horus.CmdVel],
tick=motor_tick,
rate=1000,
budget=300 * us,
on_miss="skip",
priority=90,
core=0,
order=0,
)
# --- Event: only runs when emergency.stop topic updates ---
def estop_tick(node):
msg = node.recv("emergency.stop")
if msg:
disable_all_motors()
node.request_stop()
estop = horus.Node(
name="estop",
subs=["emergency.stop"],
tick=estop_tick,
on="emergency.stop",
order=0,
)
# --- Rt: 100 Hz IMU sensor reading ---
def imu_tick(node):
reading = read_imu_hardware()
node.send("imu", horus.Imu(
accel_x=reading.ax, accel_y=reading.ay, accel_z=reading.az,
gyro_x=reading.gx, gyro_y=reading.gy, gyro_z=reading.gz,
))
imu = horus.Node(
name="imu_reader",
pubs=[horus.Imu],
tick=imu_tick,
rate=100,
order=1,
)
# --- Compute: path planning on thread pool ---
def plan_tick(node):
scan = node.recv("scan")
if scan:
path = compute_path(scan.ranges)
node.send("path", path)
planner = horus.Node(
name="planner",
subs=[horus.LaserScan],
pubs=["path"],
tick=plan_tick,
compute=True,
rate=10,
order=5,
)
# --- AsyncIo: cloud telemetry upload ---
async def telemetry_tick(node):
if node.has_msg("telemetry"):
data = node.recv("telemetry")
async with aiohttp.ClientSession() as session:
await session.post("https://api.example.com/telemetry", json=data)
uploader = horus.Node(
name="telemetry",
subs=["telemetry"],
tick=telemetry_tick,
rate=0.2, # Every 5 seconds
order=50,
)
# --- BestEffort: dashboard display in main loop ---
def dashboard_tick(node):
if node.has_msg("stats"):
stats = node.recv("stats")
update_display(stats)
dashboard = horus.Node(
name="dashboard",
subs=["stats"],
tick=dashboard_tick,
order=100,
# No rate, compute, on, or async → would be BestEffort
# But note: default rate=30 makes this RT. To truly get BestEffort,
# the scheduler treats order-only nodes in the main loop.
)
# Run everything
horus.run(
motor, estop, imu, planner, uploader, dashboard,
tick_rate=1000,
rt=True,
watchdog_ms=500,
)
Unit Constants
Python doesn't have Rust's 300_u64.us() extension trait syntax. Instead, HORUS provides unit constants for readable duration expressions:
import horus
# Unit constants
horus.us # 1e-6 (microseconds → seconds)
horus.ms # 1e-3 (milliseconds → seconds)
# Usage in Node()
horus.Node(
budget=300 * horus.us, # 300 μs = 0.0003 seconds
deadline=900 * horus.us, # 900 μs = 0.0009 seconds
watchdog=500 * horus.ms, # 500 ms = 0.5 seconds
)
# Equivalent raw values (less readable)
horus.Node(
budget=0.0003,
deadline=0.0009,
watchdog=0.5,
)
| Python | Rust equivalent | Value |
|---|---|---|
300 * horus.us | 300_u64.us() | 0.0003 s |
1 * horus.ms | 1_u64.ms() | 0.001 s |
500 * horus.ms | 500_u64.ms() | 0.5 s |
Testing with tick_once()
Execution classes work with single-tick testing. The scheduler still classifies nodes correctly — it just runs one cycle instead of looping:
import horus
results = []
def sensor_tick(node):
node.send("temp", {"value": 25.0})
def logger_tick(node):
msg = node.recv("temp")
if msg:
results.append(msg["value"])
sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)
# RT nodes still get their class — tick_once just runs one cycle
for _ in range(5):
sched.tick_once()
assert len(results) == 5
For event-driven nodes, tick_once() only ticks them if the trigger topic has data:
def handler_tick(node):
msg = node.recv("commands")
results.append(msg)
handler = horus.Node(name="handler", subs=["commands"], tick=handler_tick, on="commands")
sched.add(handler)
# handler does NOT tick here — no data on "commands"
sched.tick_once()
assert len(results) == 0
# Publish data, then tick — now handler runs
cmd_topic = horus.Topic("commands")
cmd_topic.send({"action": "go"})
sched.tick_once()
assert len(results) == 1
Design Decisions
Why 5 classes instead of just RT and non-RT? A thread-per-node model (RT) is wasteful for logging nodes — dedicating OS threads and SCHED_FIFO slots to telemetry is overkill. A single-threaded model (BestEffort) can't handle 50 ms path planning without stalling the control loop. A two-class split (RT vs non-RT) doesn't distinguish between CPU-bound work (Compute), event-driven reactions (Event), and I/O-bound operations (AsyncIo) — each of which has a fundamentally different optimal executor. The five-class model matches the five common robotics workload patterns.
Why auto-detection instead of an explicit execution_class= parameter?
Most developers don't think in terms of "execution classes" — they think "this node needs to run at 1 kHz" or "this node does heavy computation." Auto-detection from rate=, compute=, on=, and async def maps intent to the right executor without requiring framework knowledge. If you set rate=1000, the scheduler knows you need a dedicated real-time thread. You don't have to explicitly request one.
Why does rate= with compute=True not become RT?
Because rate-limiting and real-time are different things. A path planner at 10 Hz means "tick at most 10 times per second" — not "this node has a 100 ms deadline that must be enforced." Mixing the two concepts would force compute nodes to pay RT overhead (dedicated threads, timing measurement) for no benefit. The rule is clear: rate= only triggers RT when no explicit class is set.
Why does Python auto-detect async def instead of having an async_io=True parameter?
Python already distinguishes def from async def at the language level. Auto-detection means zero boilerplate — just write async def tick(node): and the scheduler does the right thing. Adding a redundant async_io=True parameter would create two conflicting signals and make the API harder to use. The async keyword is the explicit signal.
Why can't you combine compute=True with async def?
Compute nodes run on a CPU thread pool optimized for parallel computation. Async nodes run on a tokio I/O thread pool optimized for non-blocking await. These are fundamentally different runtimes — a CPU-bound ML inference should not share the I/O pool (it would block other async nodes), and an I/O-bound HTTP request should not occupy a compute slot (it wastes a CPU thread while waiting). The mutual exclusion forces you to pick the right executor for your workload.
Trade-offs
| Gain | Cost |
|---|---|
| Right executor per workload — each node runs optimally | Must understand which class fits your node |
Auto-detection — rate= infers RT without explicit configuration | Less explicit — must know the rate= + compute=True interaction |
| RT isolation — a slow Compute node can't block an RT motor controller | RT nodes consume one OS thread each |
| Event nodes — zero CPU when idle | Must match on="topic" name exactly to a publisher's topic |
AsyncIo auto-detect — async def just works | Cannot combine with compute=True or on= |
Default rate=30 — nodes get timing enforcement by default | Must explicitly opt out for truly best-effort nodes |
Python unit constants (300 * horus.us) | Less ergonomic than Rust's 300_u64.us() — multiplication instead of method |
See Also
- Python API Reference — Complete
Node(),Scheduler, andhorus.run()reference - Async Nodes — Deep dive on
async deftick patterns, timeouts, and cancellation - Execution Classes (Concepts) — Language-agnostic theory with Rust examples
- Python Bindings — Full binding reference including topic formats and typed messages
- Python Examples — Complete working examples