Tutorial 1: IMU Sensor Node (Python)

Looking for the Rust version? See Tutorial 1: Build an IMU Sensor Node.

Build a node that simulates IMU sensor readings (accelerometer + gyroscope) and publishes them over a topic. A second node subscribes and displays the data.

What You'll Learn

  • Creating a Python node with horus.Node()
  • Publishing data with node.send()
  • Subscribing with node.recv()
  • Running multiple nodes with horus.run()
  • Setting execution order and tick rate

Step 1: Create the Project

horus new imu-tutorial -p
cd imu-tutorial

Step 2: Write the Code

Replace main.py:

import horus
import math

# ── Sensor Node ──────────────────────────────────────────────

def make_sensor():
    """Simulates an IMU producing accelerometer and gyroscope readings."""
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()

        # Simulate accelerometer (gravity + vibration)
        accel_x = 0.1 * math.sin(t[0] * 2.0)
        accel_y = 0.05 * math.cos(t[0] * 3.0)
        accel_z = 9.81 + 0.02 * math.sin(t[0] * 10.0)

        # Simulate gyroscope (slow rotation)
        gyro_roll = 0.01 * math.sin(t[0])
        gyro_pitch = 0.02 * math.cos(t[0] * 0.5)
        gyro_yaw = 0.05

        reading = {
            "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z,
            "gyro_roll": gyro_roll, "gyro_pitch": gyro_pitch, "gyro_yaw": gyro_yaw,
            "timestamp": horus.now(),
        }
        node.send("imu.data", reading)

    return horus.Node(name="ImuSensor", tick=tick, rate=100, order=0,
                      pubs=["imu.data"])


# ── Display Node ─────────────────────────────────────────────

def make_display():
    """Prints every 100th IMU sample to avoid flooding the terminal."""
    count = [0]

    def tick(node):
        msg = node.recv("imu.data")
        if msg is not None:
            count[0] += 1
            if count[0] % 100 == 0:
                print(f"[#{count[0]}] accel=({msg['accel_x']:.3f}, {msg['accel_y']:.3f}, {msg['accel_z']:.2f})"
                      f"  gyro=({msg['gyro_roll']:.4f}, {msg['gyro_pitch']:.4f}, {msg['gyro_yaw']:.4f})")

    return horus.Node(name="ImuDisplay", tick=tick, rate=100, order=1,
                      subs=["imu.data"])


# ── Main ─────────────────────────────────────────────────────

print("Starting IMU tutorial...\n")
horus.run(make_sensor(), make_display())

Step 3: Run It

horus run

You'll see output every 100 samples:

Starting IMU tutorial...

[#100] accel=(0.091, -0.042, 9.81)  gyro=(0.0084, 0.0193, 0.0500)
[#200] accel=(-0.054, 0.048, 9.81)  gyro=(0.0059, 0.0100, 0.0500)
[#300] accel=(-0.099, -0.013, 9.81)  gyro=(-0.0029, -0.0098, 0.0500)

Press Ctrl+C to stop.

Understanding the Code

State Management

Python nodes use closures for state. Wrap mutable state in a list (t = [0.0]) since closures can't reassign outer variables:

t = [0.0]  # mutable via t[0]

def tick(node):
    t[0] += horus.dt()  # works

Timing with horus.dt()

Use horus.dt() instead of time.time() — it returns the actual timestep and works correctly in deterministic mode:

t[0] += horus.dt()  # correct — uses framework time

Execution Order

order=0 runs before order=1. The sensor publishes data before the display reads it:

sensor  = horus.Node(..., order=0)  # runs first
display = horus.Node(..., order=1)  # runs second

Using Typed Messages

The dict-based version above works, but typed topics are significantly faster and enforce a schema at compile time. Here is the same sensor + display tutorial rewritten with horus.Imu:

import horus
import math

# ── Sensor Node (typed) ────────────────────────────────────

def make_sensor():
    """Publishes IMU readings using the typed horus.Imu message."""
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()

        reading = horus.Imu(
            accel_x=0.1 * math.sin(t[0] * 2.0),
            accel_y=0.05 * math.cos(t[0] * 3.0),
            accel_z=9.81 + 0.02 * math.sin(t[0] * 10.0),
            gyro_x=0.01 * math.sin(t[0]),
            gyro_y=0.02 * math.cos(t[0] * 0.5),
            gyro_z=0.05,
        )
        node.send("imu", reading)

    return horus.Node(name="imu_sensor", tick=tick, rate=100, order=0,
                      pubs=[horus.Imu])


# ── Display Node (typed) ───────────────────────────────────

def make_display():
    """Receives typed Imu messages — fields are attributes, not dict keys."""
    count = [0]

    def tick(node):
        imu = node.recv("imu")
        if imu:
            count[0] += 1
            if count[0] % 100 == 0:
                print(f"[#{count[0]}] Accel Z: {imu.accel_z:.2f} m/s²  "
                      f"Gyro: ({imu.gyro_x:.4f}, {imu.gyro_y:.4f}, {imu.gyro_z:.4f})")

    return horus.Node(name="display", tick=tick, rate=10, order=1,
                      subs=[horus.Imu])


# ── Main ────────────────────────────────────────────────────

print("Starting IMU tutorial (typed)...\n")
horus.run(make_sensor(), make_display())

Key Differences from the Dict Version

Dict topicsTyped topics
Publishnode.send("imu.data", {"accel_z": 9.81})node.send("imu", horus.Imu(accel_z=9.81))
Subscribemsg["accel_z"]imu.accel_z
Declarepubs=["imu.data"]pubs=[horus.Imu]
TransportGenericMessage serializationZero-copy Pod transport

Performance: Typed topics (horus.Imu) use zero-copy Pod transport at ~1.7 us. Dict topics use GenericMessage serialization at ~6-12 us. Use typed topics for anything in a control loop.

Experiments

  1. Change the rate — try rate=1000 for 1kHz sampling
  2. Add noise — use horus.rng_float() for deterministic random noise
  3. Add a filter node — subscribe to raw IMU, apply smoothing, publish filtered data

Next Steps


See Also