Real-Time Systems (Python)
A motor controller sends velocity commands 100 times per second. Each command says "move at this speed for the next 10 milliseconds." If one command arrives 50 ms late, the motor runs uncorrected for 5x too long. The arm overshoots, oscillates, or collides with the work surface. The problem is not speed — the controller was already plenty fast. The problem is predictability. Every command must arrive on time.
Real-time means predictable, not fast. A real-time system guarantees that each computation finishes within a bounded time window. Three values define that window:
Budget: how long the computation should take. This is the expected execution time of your tick() function. A 5 ms budget means your code should finish its work in 5 ms.
Deadline: the maximum time the computation can take before the system intervenes. A 9 ms deadline means the system tolerates up to 9 ms, but fires the miss policy if the tick exceeds that limit. The deadline is always greater than or equal to the budget.
Jitter: the variation in timing between consecutive ticks. If your node runs at 100 Hz, perfect timing means exactly 10 ms between each tick. In practice, one tick starts at 10.1 ms and the next at 9.9 ms. That 0.2 ms variation is jitter. Low jitter means smooth control. High jitter means the robot stutters or drifts.
Perfect timing (zero jitter):
| 10ms | 10ms | 10ms | 10ms | 10ms |
tick tick tick tick tick tick
Real-world timing (some jitter):
|10.1ms|9.9ms |10.0ms|9.8ms |10.2ms|
tick tick tick tick tick tick
Pathological (high jitter, GC pause):
| 10ms | 10ms | 45ms | 10ms | 10ms |
tick tick tick tick tick
^ GC pause
When Python Is Fine for Real-Time
Python runs on CPython, which has a Global Interpreter Lock (GIL). The GIL means only one Python thread executes Python bytecode at a time. Every tick() call acquires the GIL, runs your Python code, and releases it. This acquire/release cycle adds overhead and constrains what frequencies are practical.
Here is an honest assessment:
| Frequency | Period | Typical tick budget | Python viable? | Why |
|---|---|---|---|---|
| 1-10 Hz | 100-1000 ms | 80-800 ms | Yes | Huge budget, GIL overhead is negligible |
| 10-50 Hz | 20-100 ms | 16-80 ms | Yes | Plenty of time for Python + ML inference |
| 50-100 Hz | 10-20 ms | 8-16 ms | Yes, with care | Budget is tight but achievable for simple logic |
| 100-500 Hz | 2-10 ms | 1.6-8 ms | Marginal | GIL acquire (~3 us) is small, but GC pauses (~1-5 ms) can blow the budget |
| 500+ Hz | <2 ms | <1.6 ms | No | GIL overhead + GC pauses make consistent timing impossible |
The practical ceiling for Python RT is about 100 Hz. At 100 Hz, your tick budget is 8 ms (80% of the 10 ms period). A typical Python tick() doing sensor reads and simple math takes 0.1-2 ms, leaving plenty of margin. At 500 Hz, the budget drops to 1.6 ms, where a single garbage collection pause (1-5 ms) blows through the deadline.
HORUS is a soft real-time framework. It runs in Linux userspace, not on an RTOS. The OS kernel can always preempt your process. Python adds another layer of unpredictability with the GIL and garbage collector. For soft RT at 10-100 Hz — sensor fusion, navigation, ML inference pipelines — Python works well. For hard RT at 500+ Hz — motor current loops, PWM generation — that work belongs on dedicated firmware.
GIL Impact on Timing
The GIL adds two sources of timing unpredictability:
GIL acquisition overhead: ~3 microseconds per tick. The scheduler (written in Rust) must acquire the GIL before calling your Python tick() and release it afterward. At 100 Hz (10 ms period), 3 us is 0.03% of the period — negligible. At 1000 Hz (1 ms period), 3 us is 0.3% — still small, but it adds up with other overhead.
Garbage collection pauses: CPython's garbage collector runs periodically to reclaim cyclic references. A minor GC takes 0.1-1 ms. A major GC (generation 2) can take 1-10 ms. These pauses are unpredictable and cannot be preempted — they happen inside tick() and count against your budget.
import gc
import horus
# For critical nodes: disable GC and collect manually between ticks
gc.disable()
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd:
# Fast path — no allocations, no GC risk
node.send("motor_cmd", {"rpm": cmd.linear * 100})
# Check remaining budget before optional GC
if horus.budget_remaining() > 0.002: # 2ms headroom
gc.collect(generation=0) # Minor collection only
motor = horus.Node(
name="motor_controller",
subs=[horus.CmdVel],
pubs=["motor_cmd"],
tick=motor_tick,
rate=100,
budget=5 * horus.ms,
deadline=8 * horus.ms,
on_miss="safe_mode",
)
horus.run(motor, rt=True)
Disabling GC prevents collection of cyclic references. If your tick allocates objects with cycles (e.g., dictionaries referencing each other), memory will leak. Only disable GC for nodes with simple, non-cyclic allocations — or collect manually during budget headroom as shown above.
Auto-Derived Timing from rate=
The simplest way to get real-time behavior. Set a tick rate and HORUS calculates safe defaults:
import horus
def controller_tick(node):
scan = node.recv("scan")
if scan:
cmd = horus.CmdVel(linear=0.3, angular=0.0)
node.send("cmd_vel", cmd)
controller = horus.Node(
name="controller",
subs=[horus.LaserScan],
pubs=[horus.CmdVel],
tick=controller_tick,
rate=100, # 10ms period -> 8ms budget (80%), 9.5ms deadline (95%)
)
horus.run(controller, rt=True)
When you set rate=, the scheduler automatically:
- Calculates the period: 1/100 = 10 ms
- Sets the budget to 80% of the period: 8 ms
- Sets the deadline to 95% of the period: 9.5 ms
- Assigns the Rt execution class — the node gets a dedicated thread
You do not need to call any special method to enable real-time. Setting rate= (when compute and on are not set), or budget=, or deadline= is enough. HORUS auto-detects that the node needs real-time scheduling.
Explicit Budget and Deadline
For fine-grained control, set budget and deadline directly instead of relying on auto-derivation:
import horus
us = horus.us # 1e-6
ms = horus.ms # 1e-3
def fusion_tick(node):
imu = node.recv("imu")
gps = node.recv("gps")
if imu and gps:
# Fuse sensor data
estimate = {"x": gps.latitude, "y": gps.longitude, "heading": imu.yaw}
node.send("pose", estimate)
fusion = horus.Node(
name="fusion",
subs=[horus.Imu, horus.NavSatFix],
pubs=["pose"],
tick=fusion_tick,
rate=50,
budget=3 * ms, # Must finish compute in 3ms
deadline=8 * ms, # Hard wall at 8ms
on_miss="warn",
)
horus.run(fusion, rt=True)
Budget and deadline are specified in seconds. Use the horus.us (1e-6) and horus.ms (1e-3) constants for readability.
If you set budget= without deadline=, the deadline equals the budget — your budget IS your hard deadline:
# budget=500us, deadline=500us (auto-derived from budget)
critical = horus.Node(
name="safety_check",
tick=safety_tick,
rate=100,
budget=500 * us, # Tight: any overrun fires the miss policy
on_miss="stop",
)
Start with auto-derived timing (just rate=). Switch to explicit budget= and deadline= after you have profiled your node with horus.budget_remaining() and know its actual timing characteristics. Premature optimization of timing parameters wastes effort.
Checking Budget at Runtime
Use horus.budget_remaining() inside tick() to check how much time is left. This lets you skip optional work when running behind:
import horus
def perception_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Always run: fast object detection
detections = fast_detect(img)
node.send("detections", detections)
# Optional: expensive classification (only if budget allows)
if horus.budget_remaining() > 0.005: # 5ms headroom
classified = classify_objects(img, detections)
node.send("classified", classified)
perception = horus.Node(
name="perception",
subs=[horus.Image],
pubs=["detections", "classified"],
tick=perception_tick,
rate=30,
budget=20 * horus.ms,
deadline=30 * horus.ms,
on_miss="skip",
)
horus.run(perception, rt=True)
budget_remaining() returns float("inf") if no budget is set. When a budget is active, it returns the remaining seconds. Use this to implement graceful degradation within a single tick — do the critical work first, then fill remaining time with optional processing.
Deadline Miss Policies
When a node's tick() exceeds its deadline, the miss policy determines what happens next:
import horus
# Log and continue (default) — good for development
planner = horus.Node(
name="planner",
tick=planner_tick,
rate=10,
on_miss="warn",
)
# Skip the next tick to recover timing
sensor_fusion = horus.Node(
name="fusion",
tick=fusion_tick,
rate=100,
budget=5 * horus.ms,
on_miss="skip",
)
# Call enter_safe_state() — stops motors, holds position
actuator = horus.Node(
name="actuator",
tick=actuator_tick,
rate=100,
budget=3 * horus.ms,
on_miss="safe_mode",
)
# Shut down the entire scheduler immediately
safety_monitor = horus.Node(
name="safety",
tick=safety_tick,
rate=100,
budget=500 * horus.us,
on_miss="stop",
)
| Policy | String value | What happens | Best for |
|---|---|---|---|
| Warn | "warn" | Logs a warning, continues normally | Default. Non-critical nodes. Development. |
| Skip | "skip" | Skips this node's next tick to let it catch up | High-frequency nodes where one dropped cycle is acceptable |
| SafeMode | "safe_mode" | Calls enter_safe_state() on the node | Motor controllers, actuators — stops movement on overrun |
| Stop | "stop" | Stops the entire scheduler immediately | Safety monitors — the last line of defense |
"stop" is aggressive. A single transient spike — one garbage collection pause, one page fault — will shut down the entire system. Use it only for nodes where any overrun is genuinely unacceptable. For most motor controllers, "safe_mode" is the right choice.
Enabling OS-Level Real-Time Scheduling
The rt=True flag on the Scheduler (or horus.run()) enables OS-level real-time features:
import horus
# Recommended for most deployments — try RT, continue if unavailable
sched = horus.Scheduler(tick_rate=100, rt=True)
sched.add(controller)
sched.run()
# Or via the one-liner
horus.run(controller, tick_rate=100, rt=True)
When rt=True, the scheduler attempts to:
| Feature | What it does | Requires |
|---|---|---|
SCHED_FIFO | Gives your process priority over all normal processes | Root or CAP_SYS_NICE |
mlockall | Locks all memory pages — prevents swap-induced page faults | Root or CAP_IPC_LOCK |
| CPU isolation | Uses isolated cores if available (isolcpus= kernel param) | Kernel boot config |
If any feature is unavailable (e.g., running without root on a development laptop), HORUS logs a warning and continues. This is the prefer_rt behavior — apply what you can, degrade gracefully on the rest.
# After starting, check what was achieved
sched = horus.Scheduler(tick_rate=100, rt=True)
sched.add(controller)
caps = sched.capabilities()
print(f"Full RT: {sched.has_full_rt()}")
for degradation in sched.degradations():
print(f" Degradation: {degradation}")
rt=True is the right choice for almost all deployments. The same code works on a developer laptop (where RT features are unavailable) and on a production robot (where they are). Timing improves progressively as the platform improves.
CPU Pinning
Pin a node to a specific CPU core to reduce jitter from cache thrashing:
controller = horus.Node(
name="controller",
tick=controller_tick,
rate=100,
budget=5 * horus.ms,
core=2, # Pin to CPU core 2
)
When a thread migrates between CPU cores (which the OS does for load balancing), it loses its L1 and L2 cache contents. Rebuilding the cache takes microseconds — which shows up as jitter. Pinning eliminates this.
CPU pinning is most effective when combined with Linux CPU isolation:
# In /etc/default/grub (then update-grub and reboot)
GRUB_CMDLINE_LINUX="isolcpus=2,3"
This tells the kernel not to schedule anything else on cores 2 and 3, reserving them entirely for your pinned nodes.
Priority
Set the OS scheduling priority for a real-time node:
controller = horus.Node(
name="controller",
tick=controller_tick,
rate=100,
budget=5 * horus.ms,
priority=90, # SCHED_FIFO priority 1-99 (higher = more urgent)
core=2,
)
Priority only takes effect when rt=True is set on the Scheduler and SCHED_FIFO is available. Higher values (closer to 99) mean the node preempts lower-priority real-time threads. The Linux kernel reserves priority 99 for its own real-time threads, so practical values are 1-98.
| Priority range | Typical use |
|---|---|
| 90-98 | Safety monitors, emergency stop handlers |
| 50-89 | Motor controllers, actuator loops |
| 10-49 | Sensor fusion, perception pipelines |
| 1-9 | Low-priority RT nodes (data recording with timing) |
Watchdog
The watchdog detects nodes that stop responding. HORUS provides two levels:
Global Watchdog
Set on the Scheduler, applies to all nodes:
sched = horus.Scheduler(
tick_rate=100,
rt=True,
watchdog_ms=500, # Fire if any node is silent for 500ms
max_deadline_misses=3, # Escalate after 3 consecutive misses
)
The graduated response prevents a single transient spike from killing a node:
| Timeout | Health state | Response |
|---|---|---|
| 1x watchdog (500 ms) | Warning | Log warning |
| 2x watchdog (1000 ms) | Unhealthy | Skip tick, log error |
| 3x watchdog (1500 ms, critical node) | Isolated | Remove from tick loop, call enter_safe_state() |
Per-Node Watchdog
Override the global watchdog for individual nodes:
# Safety monitor gets a tighter watchdog
safety = horus.Node(
name="safety",
tick=safety_tick,
rate=100,
watchdog=0.2, # 200ms — tighter than the global 500ms
on_miss="stop",
)
# ML inference gets a longer watchdog (model loading can be slow)
detector = horus.Node(
name="detector",
tick=detect_tick,
rate=30,
watchdog=2.0, # 2 seconds — first inference loads the model
on_miss="skip",
)
Complete Example: Multi-Node RT System
A full system with sensor, controller, and safety monitor at different rates and miss policies:
import horus
import gc
us = horus.us
ms = horus.ms
# --- Sensor node: read IMU at 100Hz ---
def sensor_tick(node):
reading = horus.Imu(
accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.01, gyro_y=0.0, gyro_z=0.0,
)
node.send("imu", reading)
sensor = horus.Node(
name="imu_driver",
pubs=[horus.Imu],
tick=sensor_tick,
rate=100,
budget=2 * ms,
on_miss="skip",
priority=60,
core=2,
)
# --- Controller: PID loop at 50Hz ---
gc.disable() # No GC pauses in the controller
target_speed = 0.5
integral = 0.0
def controller_tick(node):
global integral
imu = node.recv("imu")
if imu is None:
return
# Simple P+I controller
error = target_speed - imu.accel_x
integral += error * horus.dt()
command = 2.0 * error + 0.1 * integral
cmd = horus.CmdVel(linear=command, angular=0.0)
node.send("cmd_vel", cmd)
# Collect GC only if we have headroom
if horus.budget_remaining() > 0.003:
gc.collect(generation=0)
controller = horus.Node(
name="pid_controller",
subs=[horus.Imu],
pubs=[horus.CmdVel],
tick=controller_tick,
rate=50,
budget=5 * ms,
deadline=15 * ms,
on_miss="safe_mode",
priority=70,
core=3,
)
# --- Safety monitor: check system health at 100Hz ---
def safety_tick(node):
cmd = node.recv("cmd_vel")
if cmd and abs(cmd.linear) > 2.0:
node.log_warning(f"Unsafe velocity: {cmd.linear}")
node.request_stop()
safety = horus.Node(
name="safety_monitor",
subs=[horus.CmdVel],
tick=safety_tick,
rate=100,
budget=500 * us,
on_miss="stop",
priority=95,
watchdog=0.2,
core=2,
)
# --- Run everything ---
sched = horus.Scheduler(
tick_rate=100,
rt=True,
watchdog_ms=500,
max_deadline_misses=3,
)
sched.add(sensor)
sched.add(controller)
sched.add(safety)
sched.run()
# After shutdown, inspect what happened
stats = sched.safety_stats()
if stats:
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
print(f"Watchdog timeouts: {stats.get('watchdog_timeouts', 0)}")
Quick Reference
| Your node does... | Configuration | Why |
|---|---|---|
| Motor control at 50 Hz | rate=50, budget=5*ms, on_miss="safe_mode" | Explicit budget, safe degradation on overrun |
| Sensor fusion at 100 Hz | rate=100, budget=3*ms, on_miss="skip" | Skip one reading rather than cascade delays |
| Safety monitor | rate=100, budget=500*us, on_miss="stop", priority=95 | Highest priority, immediate shutdown on overrun |
| ML inference at 30 Hz | rate=30, compute=True, budget=30*ms, on_miss="skip" | Thread pool for CPU-bound work, skip slow frames |
| Path planner (1 Hz, slow) | rate=1, compute=True | CPU-heavy, no deadline needed |
| Background logging | default (no RT config) | BestEffort is fine |
| Emergency stop handler | on="emergency.stop" | Runs only when event fires, zero polling overhead |
Node Parameters for RT
| Parameter | Type | Default | Description |
|---|---|---|---|
rate | float | 30 | Tick rate in Hz. Auto-derives budget/deadline if no compute/on set |
budget | float | None | Max expected tick time in seconds. Use horus.us/horus.ms |
deadline | float | None | Hard wall in seconds. Miss policy fires beyond this |
on_miss | str | None | "warn", "skip", "safe_mode", "stop" |
priority | int | None | OS SCHED_FIFO priority, 1-99. Requires rt=True |
core | int | None | Pin to CPU core |
watchdog | float | None | Per-node watchdog timeout in seconds |
Scheduler Parameters for RT
| Parameter | Type | Default | Description |
|---|---|---|---|
tick_rate | float | 1000.0 | Global tick rate in Hz |
rt | bool | False | Enable SCHED_FIFO + mlockall |
watchdog_ms | int | 0 | Global watchdog timeout (0 = disabled) |
max_deadline_misses | int | None | Consecutive misses before escalation |
cores | list | None | CPU affinity for the scheduler itself |
Design Decisions
Why allow real-time features in Python at all? Many robotics teams have Python-heavy codebases — ML pipelines, prototyping, data analysis. Telling them "rewrite in a compiled language for any timing guarantees" means they get no timing guarantees during the prototype phase, which is when timing bugs are cheapest to find. Python RT at 10-100 Hz catches timing problems early. Teams can migrate hot loops to compiled code later with confidence, because the same budget/deadline parameters work in both languages.
Why auto-derive budget and deadline from rate=?
Developers think in terms of what their node needs: "this controller must run at 100 Hz." They do not think in scheduling policies. Auto-derivation (80% budget, 95% deadline) maps developer intent to the correct execution class. You can override with explicit budget= and deadline= after profiling, but the defaults are safe starting points that catch real problems.
Why "warn" as the default miss policy?
During development, most deadline misses are transient — a background process spiked, the system was under load, the debugger was attached. Stopping the system on every transient miss makes development painful. "warn" logs the problem so you can see the pattern, without disrupting the run. Switch to "safe_mode" or "stop" when deploying to hardware.
Why budget_remaining() instead of just enforcing the deadline?
Hard enforcement (kill the tick at the deadline) is dangerous — it can leave data structures in an inconsistent state. budget_remaining() enables cooperative degradation: the node checks its remaining time and decides what to skip. This is safer and gives the developer control over graceful degradation within a single tick.
Why gc.disable() instead of automatic GC management?
Automatic GC management (e.g., running GC only between ticks) would require deep integration with CPython internals and would break when users import C extensions that trigger GC internally. Manual gc.disable() + gc.collect() is explicit, portable, and gives the developer full control. The budget_remaining() pattern shown in this guide is the recommended approach.
Trade-offs
| Gain | Cost |
|---|---|
| Python RT at 10-100 Hz — catch timing bugs during prototyping | GIL limits max frequency; GC pauses cause jitter above 100 Hz |
Auto-derived timing from rate= — no explicit configuration needed | Less visible what budget/deadline the node actually got |
budget_remaining() cooperative degradation — node controls what to skip | Requires developer discipline; nothing prevents ignoring the budget |
gc.disable() for critical nodes — eliminates GC jitter | Cyclic reference leaks if not manually collecting; must understand GC behavior |
rt=True graceful degradation — works on laptops and production robots | May run without RT features and developer does not notice (check degradations()) |
Per-node priority= and core= — fine-grained OS scheduling control | Requires root or CAP_SYS_NICE; incorrect pinning wastes cores |
Per-node watchdog= — tighter timeout for critical nodes, looser for slow nodes | Multiple timeout values make debugging more complex |
See Also
- Python API Reference — Full Node, Scheduler, and Clock API reference
- Real-Time Systems (Concepts) — Platform-independent RT concepts, hard vs soft vs firm
- Execution Classes — The 5 execution classes and how
rate=maps to Rt - RT Setup — Linux kernel configuration for real-time (isolcpus, PREEMPT_RT, ulimits)
- Python Examples — Complete working examples including RT patterns