Tutorial: Real-Time Control (Python)

Looking for the Rust version? See Tutorial: Real-Time Control.

Prerequisites

What You'll Build

A robot controller with four nodes running at different rates and execution classes:

  1. IMU Sensor at 100 Hz --- real-time, reads accelerometer and gyroscope data
  2. PID Controller at 50 Hz --- real-time, computes velocity commands from sensor feedback
  3. Path Planner at 10 Hz --- compute class, runs on a thread pool
  4. Safety Monitor at 100 Hz --- real-time, highest priority, stops the system on dangerous commands

Time estimate: ~20 minutes

Step 1: Four Nodes, No Real-Time

Start with four basic nodes. All use the default configuration --- no budgets, no deadlines, no safety policies yet.

import horus
import math

# ---- IMU Sensor (100 Hz) ----

def make_imu():
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()
        imu = horus.Imu(
            accel_x=0.1 * math.sin(t[0] * 2.0),
            accel_y=0.05 * math.cos(t[0] * 3.0),
            accel_z=9.81,
            gyro_x=0.01,
            gyro_y=0.0,
            gyro_z=0.05,
        )
        node.send("imu.data", imu)

    return horus.Node(
        name="imu_sensor",
        tick=tick,
        rate=100,
        order=0,
        pubs=[horus.Imu],
    )


# ---- PID Controller (50 Hz) ----

def make_controller():
    integral = [0.0]
    target_speed = 0.5

    def tick(node):
        imu = node.recv("imu.data")
        if imu is None:
            return

        error = target_speed - imu.accel_x
        integral[0] += error * horus.dt()
        command = 2.0 * error + 0.1 * integral[0]

        cmd = horus.CmdVel(linear=command, angular=0.0)
        node.send("cmd_vel", cmd)

    return horus.Node(
        name="pid_controller",
        tick=tick,
        rate=50,
        order=10,
        subs=[horus.Imu],
        pubs=[horus.CmdVel],
    )


# ---- Path Planner (10 Hz) ----

def make_planner():
    waypoints = [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0)]
    idx = [0]

    def tick(node):
        odom = node.recv("odom")
        if odom is None:
            return

        # Pick the next waypoint
        wx, wy = waypoints[idx[0] % len(waypoints)]
        dx = wx - odom.x
        dy = wy - odom.y
        dist = math.sqrt(dx * dx + dy * dy)

        if dist < 0.2:
            idx[0] += 1

        heading = math.atan2(dy, dx)
        node.send("plan.target", {
            "heading": heading,
            "distance": dist,
            "waypoint": idx[0] % len(waypoints),
        })

    return horus.Node(
        name="path_planner",
        tick=tick,
        rate=10,
        order=50,
        subs=["odom"],
        pubs=["plan.target"],
    )


# ---- Safety Monitor (100 Hz) ----

def make_safety():
    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd is None:
            return

        if abs(cmd.linear) > 2.0:
            node.log_warning(f"Unsafe velocity: {cmd.linear:.2f} m/s")
            node.request_stop()

        if abs(cmd.angular) > 1.5:
            node.log_warning(f"Unsafe angular velocity: {cmd.angular:.2f} rad/s")
            node.request_stop()

    return horus.Node(
        name="safety_monitor",
        tick=tick,
        rate=100,
        order=1,
        subs=[horus.CmdVel],
    )


# ---- Run ----

horus.run(
    make_imu(),
    make_controller(),
    make_planner(),
    make_safety(),
    tick_rate=100,
)

Run this with horus run. All four nodes tick on the scheduler with no timing enforcement. The IMU publishes sensor data, the PID controller computes velocity commands, the planner picks waypoints, and the safety monitor watches for dangerous velocities. It works, but nothing prevents a slow tick from cascading delays across the system.

Step 2: Add Rates and Auto-Derived Timing

Setting rate= on a node does three things automatically:

  1. Sets the tick frequency
  2. Derives a default budget (80% of the period)
  3. Derives a default deadline (95% of the period)

The node is also promoted to the Rt execution class --- it gets a dedicated thread with timing enforcement.

Our nodes already have rate= set, so they are already RT nodes with auto-derived budgets and deadlines:

NodeRatePeriodAuto Budget (80%)Auto Deadline (95%)
IMU Sensor100 Hz10 ms8 ms9.5 ms
PID Controller50 Hz20 ms16 ms19 ms
Path Planner10 Hz100 ms80 ms95 ms
Safety Monitor100 Hz10 ms8 ms9.5 ms

These defaults are reasonable starting points. You do not need to specify budgets and deadlines manually unless profiling shows the auto-derived values are too loose or too tight.

Step 3: Add Deadline Miss Policies

If the PID controller misses a deadline, the motors receive stale commands and the robot drifts. If the safety monitor misses a deadline, dangerous commands go unchecked. Each node needs a policy for what happens on overrun:

PolicyString valueWhat happensBest for
Warn"warn"Log a warning, continue normallyDevelopment, non-critical nodes
Skip"skip"Drop the late tick, run next on scheduleSensors where one missed reading is acceptable
SafeMode"safe_mode"Call enter_safe_state() on the nodeMotor controllers, actuators
Stop"stop"Shut down the entire schedulerSafety monitors --- last line of defense

Update the node constructors with miss policies:

import horus

us = horus.us  # 1e-6
ms = horus.ms  # 1e-3

# IMU: skip missed ticks --- one stale reading is fine
imu = horus.Node(
    name="imu_sensor",
    tick=imu_tick,
    rate=100,
    order=0,
    on_miss="skip",
    pubs=[horus.Imu],
)

# PID controller: enter safe state on deadline miss
controller = horus.Node(
    name="pid_controller",
    tick=controller_tick,
    rate=50,
    order=10,
    budget=10 * ms,
    deadline=18 * ms,
    on_miss="safe_mode",
    subs=[horus.Imu],
    pubs=[horus.CmdVel],
)

# Safety monitor: stop everything if it misses a deadline
safety = horus.Node(
    name="safety_monitor",
    tick=safety_tick,
    rate=100,
    order=1,
    budget=2 * ms,
    deadline=5 * ms,
    on_miss="stop",
    subs=[horus.CmdVel],
)

The PID controller now has explicit budget=10*ms and deadline=18*ms, overriding the auto-derived values. The safety monitor has a tight budget=2*ms --- it does minimal work per tick and must finish fast.

Step 4: Move the Planner to Compute

The path planner runs algorithms that can take 10-50 ms. If it runs on the RT thread, a slow planning cycle blocks the PID controller. Use compute=True to move it to a thread pool:

planner = horus.Node(
    name="path_planner",
    tick=planner_tick,
    rate=10,
    order=50,
    compute=True,     # runs on worker thread pool, not RT thread
    subs=["odom"],
    pubs=["plan.target"],
)

With compute=True, the planner runs on a separate thread pool. It cannot block the 50 Hz PID controller or the 100 Hz safety monitor. Note that rate=10 on a Compute node is a frequency cap --- it limits how often the planner ticks, but there is no budget or deadline enforcement. This is the right choice: path planning takes variable time, and a 50 ms planning cycle is normal, not a failure.

rate= on a Compute node does not make it RT. It only limits tick frequency. A rate=10, compute=True node ticks at most 10 times per second with no timing enforcement. Drop compute=True if you need deadline enforcement.

Step 5: Configure the Scheduler for Real-Time

Enable OS-level real-time scheduling on the scheduler:

sched = horus.Scheduler(
    tick_rate=100,
    rt=True,                    # request SCHED_FIFO + mlockall
    watchdog_ms=500,            # detect frozen nodes
    max_deadline_misses=5,      # stop after 5 consecutive misses
)
  • rt=True requests real-time OS scheduling. Falls back gracefully if permissions are unavailable --- the same code runs on a developer laptop (without RT) and a production robot (with RT).
  • watchdog_ms=500 detects nodes that stop responding. Graduated response: warning at 500 ms, unhealthy at 1000 ms, isolated at 1500 ms.
  • max_deadline_misses=5 stops the scheduler after 5 total deadline misses --- a system-wide safety net.

Step 6: Complete System

Here is the full program with all nodes configured, real-time scheduling, and safety policies:

import horus
import math
import gc

us = horus.us
ms = horus.ms

# ---- IMU Sensor (100 Hz) ----------------------------------------

def make_imu():
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()
        imu = horus.Imu(
            accel_x=0.1 * math.sin(t[0] * 2.0),
            accel_y=0.05 * math.cos(t[0] * 3.0),
            accel_z=9.81,
            gyro_x=0.01,
            gyro_y=0.0,
            gyro_z=0.05,
        )
        node.send("imu.data", imu)

    return horus.Node(
        name="imu_sensor",
        tick=tick,
        rate=100,
        order=0,
        on_miss="skip",       # one stale reading is acceptable
        pubs=[horus.Imu],
    )


# ---- PID Controller (50 Hz) -------------------------------------

def make_controller():
    gc.disable()  # no GC pauses in the control loop
    integral = [0.0]
    target_speed = 0.5

    def tick(node):
        imu = node.recv("imu.data")
        if imu is None:
            return

        # PID compute --- no allocations in the fast path
        error = target_speed - imu.accel_x
        integral[0] += error * horus.dt()
        command = 2.0 * error + 0.1 * integral[0]

        cmd = horus.CmdVel(linear=command, angular=0.0)
        node.send("cmd_vel", cmd)

        # Collect GC only if budget allows
        if horus.budget_remaining() > 0.003:
            gc.collect(generation=0)

    def enter_safe_state(node):
        # Called when deadline is missed (on_miss="safe_mode")
        node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
        node.log_warning("Controller entered safe state --- zero velocity")

    return horus.Node(
        name="pid_controller",
        tick=tick,
        shutdown=enter_safe_state,
        rate=50,
        order=10,
        budget=10 * ms,
        deadline=18 * ms,
        on_miss="safe_mode",  # zero velocity on deadline miss
        subs=[horus.Imu],
        pubs=[horus.CmdVel],
    )


# ---- Path Planner (10 Hz, Compute) ------------------------------

def make_planner():
    waypoints = [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0)]
    idx = [0]

    def tick(node):
        odom = node.recv("odom")
        if odom is None:
            return

        wx, wy = waypoints[idx[0] % len(waypoints)]
        dx = wx - odom.x
        dy = wy - odom.y
        dist = math.sqrt(dx * dx + dy * dy)

        if dist < 0.2:
            idx[0] += 1

        heading = math.atan2(dy, dx)
        node.send("plan.target", {
            "heading": heading,
            "distance": dist,
            "waypoint": idx[0] % len(waypoints),
        })

    return horus.Node(
        name="path_planner",
        tick=tick,
        rate=10,
        order=50,
        compute=True,         # thread pool --- does not block RT nodes
        subs=["odom"],
        pubs=["plan.target"],
    )


# ---- Safety Monitor (100 Hz) ------------------------------------

def make_safety():
    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd is None:
            return

        if abs(cmd.linear) > 2.0:
            node.log_warning(f"Unsafe velocity: {cmd.linear:.2f} m/s")
            node.request_stop()

        if abs(cmd.angular) > 1.5:
            node.log_warning(f"Unsafe angular rate: {cmd.angular:.2f} rad/s")
            node.request_stop()

    return horus.Node(
        name="safety_monitor",
        tick=tick,
        rate=100,
        order=1,              # runs right after IMU, before controller
        budget=2 * ms,
        deadline=5 * ms,
        on_miss="stop",       # stop everything if safety check is late
        priority=95,          # highest priority RT thread
        subs=[horus.CmdVel],
    )


# ---- Main -------------------------------------------------------

print("Starting multi-rate robot controller")
print("  IMU Sensor:       100 Hz (Rt, on_miss=skip)")
print("  PID Controller:    50 Hz (Rt, on_miss=safe_mode)")
print("  Path Planner:      10 Hz (Compute)")
print("  Safety Monitor:   100 Hz (Rt, on_miss=stop, priority=95)")
print()

sched = horus.Scheduler(
    tick_rate=100,
    rt=True,
    watchdog_ms=500,
    max_deadline_misses=5,
)

sched.add(make_imu())
sched.add(make_controller())
sched.add(make_planner())
sched.add(make_safety())

sched.run()

Press Ctrl+C to stop. The timing report shows budget and deadline statistics for each node.

Step 7: Verify Topic Rates

After starting the system, open a second terminal and check that each node is publishing at the expected rate:

# Check IMU is publishing at ~100 Hz
horus topic hz imu.data
# Expected: ~100.0 Hz

# Check PID controller output at ~50 Hz
horus topic hz cmd_vel
# Expected: ~50.0 Hz

# Check planner output at ~10 Hz
horus topic hz plan.target
# Expected: ~10.0 Hz

If a topic's measured rate is significantly lower than expected, a node is missing deadlines or being throttled. Check the scheduler logs for deadline miss warnings.

You can also echo a topic to see live data:

horus topic echo cmd_vel
horus topic echo imu.data

Or use the full monitoring dashboard:

horus monitor

Python Real-Time: What Works, What Doesn't

Python runs on CPython, which has a Global Interpreter Lock (GIL) and a garbage collector. Both affect timing predictability.

Practical Frequency Limits

FrequencyPeriodPython viable?Notes
1-10 Hz100-1000 msYesHuge budget, GIL overhead is negligible
10-50 Hz20-100 msYesPlenty of time for Python + ML inference
50-100 Hz10-20 msYes, with careBudget is tight but achievable for simple logic
100-500 Hz2-10 msMarginalGC pauses (1-5 ms) can blow the budget
500+ Hz<2 msNoGIL + GC make consistent timing impossible

The practical ceiling for Python RT is about 100 Hz. At 100 Hz, your tick budget is 8 ms. A typical Python tick() doing sensor reads and simple math takes 0.1-2 ms, leaving plenty of margin. At 500 Hz, the budget drops to 1.6 ms, where a single garbage collection pause blows through the deadline.

GC Mitigation

The PID controller in Step 6 uses gc.disable() and manual gc.collect(generation=0) during budget headroom. This pattern eliminates GC pauses from the critical path:

gc.disable()  # no automatic GC

def tick(node):
    # ... fast-path control logic, no allocations ...

    if horus.budget_remaining() > 0.003:  # 3 ms headroom
        gc.collect(generation=0)          # minor collection only

Only use this for nodes where GC pauses are unacceptable. For most nodes at 10-50 Hz, the default garbage collector works fine.

When to Use Rust Instead

For this tutorial's architecture:

  • IMU at 100 Hz --- Python works, but you are near the ceiling. If timing jitter is a problem, move to Rust.
  • PID at 50 Hz --- Python is comfortable. 20 ms period gives plenty of budget.
  • Planner at 10 Hz --- Python is ideal. Complex algorithms, ML inference, data structures --- Python's strengths.
  • Safety at 100 Hz --- Python works for monitoring. For hard safety-critical nodes, consider Rust.

For motor control at 1 kHz+, use the Rust Real-Time Control tutorial. Python cannot reliably sustain sub-millisecond tick budgets.

Key Takeaways

  • rate= implies RT --- auto-derives budget (80%), deadline (95%), and promotes to Rt execution class. Override with explicit budget= and deadline= as needed.
  • Safety is explicit --- always set on_miss= for safety-critical nodes. "safe_mode" zeros outputs. "stop" halts the scheduler.
  • Separate by execution class --- keep control loops on RT threads, move heavy computation to compute=True.
  • rt=True degrades gracefully --- request RT on the Scheduler, check sched.has_full_rt() in production.
  • Budget and deadline are in seconds --- use horus.us (1e-6) and horus.ms (1e-3) for readability. budget=300 means 300 seconds, not 300 microseconds.
  • Python is fine for 10-100 Hz --- sensor fusion, navigation, ML inference, safety monitoring. Use Rust for 500+ Hz motor loops.

Next Steps

See Also