Tutorial 3: Full Robot System (Python)

Looking for the Rust version? See Tutorial 3: Full Robot Integration.

Combine an IMU sensor, velocity commander, motor controller, and state estimator into a complete robot system with multi-rate scheduling.

What You'll Learn

  • Composing 4+ nodes into a working system
  • Multi-rate scheduling (different nodes at different frequencies)
  • Data fusion (subscribing to multiple topics)
  • Monitoring with horus monitor and horus topic echo

Architecture

ImuSensor (100Hz) ──→ imu.data ──→ StateEstimator (100Hz) ──→ robot.pose
Commander (10Hz)  ──→ motor.cmd ──→ MotorController (50Hz)  ──→ motor.state ──→ StateEstimator

The Code

import horus
import math

us = horus.us  # microsecond constant for budget/deadline

# ── IMU Sensor (100 Hz) ─────────────────────────────────────

def make_imu():
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()
        node.send("imu.data", {
            "accel_x": 0.1 * math.sin(t[0] * 2.0),
            "accel_y": 0.05 * math.cos(t[0] * 3.0),
            "accel_z": 9.81,
            "gyro_yaw": 0.05,  # constant rotation
            "timestamp": horus.now(),
        })

    return horus.Node(name="ImuSensor", tick=tick, rate=100, order=0,
                      pubs=["imu.data"])


# ── Commander (10 Hz) ────────────────────────────────────────

def make_commander():
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()
        node.send("motor.cmd", {
            "velocity": 1.0 + 0.5 * math.sin(t[0] * 0.3),
        })

    return horus.Node(name="Commander", tick=tick, rate=10, order=1,
                      pubs=["motor.cmd"])


# ── Motor Controller (50 Hz) ────────────────────────────────

def make_motor():
    state = {"velocity": 0.0, "position": 0.0}

    def tick(node):
        cmd = node.recv("motor.cmd")
        if cmd is not None:
            state["velocity"] = cmd["velocity"]
        state["position"] += state["velocity"] * horus.dt()
        node.send("motor.state", {
            "position": state["position"],
            "velocity": state["velocity"],
        })

    def shutdown(node):
        state["velocity"] = 0.0
        print("Motor stopped safely")

    return horus.Node(name="MotorController", tick=tick, shutdown=shutdown,
                      rate=50, order=2,
                      subs=["motor.cmd"], pubs=["motor.state"])


# ── State Estimator (100 Hz) ────────────────────────────────

def make_estimator():
    """Fuses IMU gyro + motor velocity into a robot pose estimate."""
    pose = {"x": 0.0, "y": 0.0, "heading": 0.0}

    def tick(node):
        dt = horus.dt()

        # Fuse IMU data (heading from gyro)
        imu = node.recv("imu.data")
        if imu is not None:
            pose["heading"] += imu["gyro_yaw"] * dt

        # Fuse motor state (position from velocity)
        motor = node.recv("motor.state")
        if motor is not None:
            pose["x"] += motor["velocity"] * math.cos(pose["heading"]) * dt
            pose["y"] += motor["velocity"] * math.sin(pose["heading"]) * dt

        node.send("robot.pose", {
            "x": pose["x"],
            "y": pose["y"],
            "heading": pose["heading"],
            "timestamp": horus.now(),
        })

    return horus.Node(name="StateEstimator", tick=tick, rate=100, order=3,
                      subs=["imu.data", "motor.state"], pubs=["robot.pose"])


# ── Main ────────────────────────────────────────────────────

print("Starting full robot system...")
print("  ImuSensor:       100 Hz (order 0)")
print("  Commander:        10 Hz (order 1)")
print("  MotorController:  50 Hz (order 2)")
print("  StateEstimator:  100 Hz (order 3)")
print()

horus.run(
    make_imu(),
    make_commander(),
    make_motor(),
    make_estimator(),
    tick_rate=100,
    watchdog_ms=500,
)

Run and Monitor

# Terminal 1: Run the robot
horus run

# Terminal 2: Monitor topics
horus topic echo robot.pose

# Terminal 3: Full monitoring dashboard
horus monitor

Key Concepts

Multi-Rate Scheduling

Each node runs at its own rate. The scheduler handles the timing:

make_imu()        # rate=100 — ticks 100 times per second
make_commander()  # rate=10  — ticks 10 times per second
make_motor()      # rate=50  — ticks 50 times per second
make_estimator()  # rate=100 — ticks 100 times per second

Data Fusion

The state estimator subscribes to two topics and fuses them:

imu = node.recv("imu.data")      # gyro → heading
motor = node.recv("motor.state") # velocity → position

Always call recv() on every subscribed topic every tick — even if you don't need the data yet. This prevents stale data accumulation.

Watchdog

watchdog_ms=500 detects frozen nodes. If any node's tick takes longer than 500ms, the safety monitor triggers.

Using the Scheduler Directly

For production configs with RT features:

scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500, rt=True)

scheduler.add(make_imu())
scheduler.add(make_commander())
scheduler.add(make_motor())
scheduler.add(make_estimator())

scheduler.run()

Next Steps


See Also