Tutorial 5: Hardware & Real-Time (Python)
Looking for the Rust versions? See Tutorial 5: Hardware Drivers and Tutorial: Real-Time Control.
Learn to load hardware drivers from horus.toml and configure real-time scheduling with budgets, deadlines, and miss policies.
What You'll Learn
- Loading hardware nodes with
horus.hardware.load() - Accessing hardware configuration parameters
- Setting budgets and deadlines for timing guarantees
- Handling deadline misses with
on_miss - Using
compute=Truefor CPU-heavy nodes - Production scheduler configuration
Part 1: Hardware Drivers
Configure Drivers in horus.toml
# horus.toml
[package]
name = "my-robot"
version = "0.1.0"
[drivers.arm]
terra = "dynamixel"
port = "/dev/ttyUSB0"
baudrate = 1000000
servo_ids = [1, 2, 3]
[drivers.lidar]
terra = "rplidar"
port = "/dev/ttyUSB1"
scan_mode = "standard"
[drivers.imu]
terra = "i2c"
bus = 1
address = 104
Terra integration is in progress. Driver config loading and parameter access work fully. Hardware connections must be created in your own code for now — the same config will work with zero changes when Terra integration lands.
Load and Read Driver Config
import horus
# Load hardware from horus.toml [hardware] section
entries = horus.hardware.load()
for name, obj in entries:
if isinstance(obj, horus.NodeParams):
port = obj.get_or("port", "/dev/ttyUSB0")
print(f"{name}: port={port}")
Using Config in Nodes
def arm_tick(node):
cmd = node.recv("arm.command")
if cmd is not None:
# Use hardware connection to send commands
node.send("arm.state", {"position": [0.0, 0.0, 0.0]})
arm_node = horus.Node(name="ArmController", tick=arm_tick, rate=100, order=0,
subs=["arm.command"], pubs=["arm.state"])
horus.run(arm_node)
Handling Missing Hardware
If horus.toml has no [hardware] section, hardware.load() raises an exception. Wrap it for graceful fallback:
try:
entries = horus.hardware.load()
except Exception as e:
print(f"No hardware config: {e}")
print("Running in simulation mode")
entries = []
This lets the same codebase run on both real hardware and in simulation without changes.
Part 2: Real-Time Scheduling
Budget and Deadline
Set timing constraints to detect overruns:
import horus
us = horus.us # 1e-6 (microseconds → seconds)
ms = horus.ms # 1e-3 (milliseconds → seconds)
# horus.us = 1e-6 (microseconds), horus.ms = 1e-3 (milliseconds).
# These are plain float constants for readable duration math:
# budget=300 * horus.us means 300 microseconds.
# Motor controller: 1kHz with 800μs budget, 950μs deadline
motor = horus.Node(
name="MotorCtrl",
tick=motor_tick,
rate=1000,
order=0,
budget=800 * us, # must finish tick within 800μs
deadline=950 * us, # hard deadline at 950μs
on_miss="safe_mode", # enter safe state if deadline missed
core=0, # pin to CPU core 0
# Pins this node's thread to CPU core 0. Requires CAP_SYS_NICE or root
# on Linux. If the core doesn't exist or permissions are insufficient,
# HORUS logs a warning and continues with default scheduling.
)
# LiDAR driver: 100Hz, skip missed ticks
lidar = horus.Node(
name="LidarDriver",
tick=lidar_tick,
rate=100,
order=1,
on_miss="skip", # skip tick on deadline miss
)
# Path planner: CPU-heavy, runs on thread pool
planner = horus.Node(
name="PathPlanner",
tick=planner_tick,
rate=10,
order=5,
compute=True, # runs on worker thread pool, not main loop
on_miss="warn", # just log a warning
)
# Production scheduler with watchdog and RT
horus.run(
motor, lidar, planner,
tick_rate=1000,
rt=True, # request SCHED_FIFO (graceful fallback)
watchdog_ms=500, # detect frozen nodes
blackbox_mb=64, # flight recorder for debugging
)
Deadline Miss Policies
| Policy | Behavior | Use For |
|---|---|---|
"warn" | Log warning, continue normally | Non-critical nodes (logging, telemetry) |
"skip" | Skip this tick, resume next cycle | Sensor drivers that can miss a frame |
"safe_mode" | Call enter_safe_state() equivalent, continue ticking | Motor controllers, actuators |
"stop" | Stop the entire scheduler | Safety-critical nodes |
Adaptive Quality with Budget
Use horus.budget_remaining() to do more work when time permits:
def planner_tick(node):
# Always compute basic path
path = compute_basic_path()
# Only optimize if budget allows
if horus.budget_remaining() > 2 * ms:
path = optimize_path(path)
# Only smooth if still have time
if horus.budget_remaining() > 1 * ms:
path = smooth_path(path)
node.send("path", path)
Complete Production Example
import horus
us, ms = horus.us, horus.ms
def safety_tick(node):
"""Safety monitor — runs first every tick."""
if node.has_msg("emergency_stop"):
estop = node.recv("emergency_stop")
if estop.get("engaged"):
node.log_error("EMERGENCY STOP ENGAGED")
node.request_stop()
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is not None:
# Apply velocity command to motor
node.send("motor.state", {"velocity": cmd.get("linear", 0.0)})
def motor_shutdown(node):
node.send("cmd_vel", {"linear": 0.0, "angular": 0.0})
node.log_info("Motors stopped safely")
safety = horus.Node(name="Safety", tick=safety_tick, rate=100, order=0,
subs=["emergency_stop"])
motor = horus.Node(name="Motor", tick=motor_tick, shutdown=motor_shutdown,
rate=100, order=1, budget=500 * us, on_miss="safe_mode",
subs=["cmd_vel"], pubs=["motor.state"])
horus.run(safety, motor, tick_rate=100, rt=True, watchdog_ms=500)
Next Steps
- Real-Time Concepts — what real-time means for robotics
- Safety Monitor — graduated degradation and watchdog
- Choosing a Language — when to use Python vs Rust for RT
- Python API Reference — full scheduling kwargs
See Also
- Hardware Drivers (Rust) — Rust version
- Python Bindings — Python API reference