Tutorial 1: IMU Sensor Node (Python)
Build a node that simulates IMU sensor readings (accelerometer + gyroscope) and publishes them over a topic. A second node subscribes and displays the data.
What You'll Learn
- Creating a Python node with
horus.Node() - Publishing data with
node.send() - Subscribing with
node.recv() - Running multiple nodes with
horus.run() - Setting execution order and tick rate
Step 1: Create the Project
horus new imu-tutorial -p
cd imu-tutorial
Step 2: Write the Code
Replace main.py:
import horus
import math
# ── Sensor Node ──────────────────────────────────────────────
def make_sensor():
"""Simulates an IMU producing accelerometer and gyroscope readings."""
t = [0.0]
def tick(node):
t[0] += horus.dt()
# Simulate accelerometer (gravity + vibration)
accel_x = 0.1 * math.sin(t[0] * 2.0)
accel_y = 0.05 * math.cos(t[0] * 3.0)
accel_z = 9.81 + 0.02 * math.sin(t[0] * 10.0)
# Simulate gyroscope (slow rotation)
gyro_roll = 0.01 * math.sin(t[0])
gyro_pitch = 0.02 * math.cos(t[0] * 0.5)
gyro_yaw = 0.05
reading = {
"accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z,
"gyro_roll": gyro_roll, "gyro_pitch": gyro_pitch, "gyro_yaw": gyro_yaw,
"timestamp": horus.now(),
}
node.send("imu.data", reading)
return horus.Node(name="ImuSensor", tick=tick, rate=100, order=0,
pubs=["imu.data"])
# ── Display Node ─────────────────────────────────────────────
def make_display():
"""Prints every 100th IMU sample to avoid flooding the terminal."""
count = [0]
def tick(node):
msg = node.recv("imu.data")
if msg is not None:
count[0] += 1
if count[0] % 100 == 0:
print(f"[#{count[0]}] accel=({msg['accel_x']:.3f}, {msg['accel_y']:.3f}, {msg['accel_z']:.2f})"
f" gyro=({msg['gyro_roll']:.4f}, {msg['gyro_pitch']:.4f}, {msg['gyro_yaw']:.4f})")
return horus.Node(name="ImuDisplay", tick=tick, rate=100, order=1,
subs=["imu.data"])
# ── Main ─────────────────────────────────────────────────────
print("Starting IMU tutorial...\n")
horus.run(make_sensor(), make_display())
Step 3: Run It
horus run
You'll see output every 100 samples:
Starting IMU tutorial...
[#100] accel=(0.091, -0.042, 9.81) gyro=(0.0084, 0.0193, 0.0500)
[#200] accel=(-0.054, 0.048, 9.81) gyro=(0.0059, 0.0100, 0.0500)
[#300] accel=(-0.099, -0.013, 9.81) gyro=(-0.0029, -0.0098, 0.0500)
Press Ctrl+C to stop.
Understanding the Code
State Management
Python nodes use closures for state. Wrap mutable state in a list (t = [0.0]) since closures can't reassign outer variables:
t = [0.0] # mutable via t[0]
def tick(node):
t[0] += horus.dt() # works
Timing with horus.dt()
Use horus.dt() instead of time.time() — it returns the actual timestep and works correctly in deterministic mode:
t[0] += horus.dt() # correct — uses framework time
Execution Order
order=0 runs before order=1. The sensor publishes data before the display reads it:
sensor = horus.Node(..., order=0) # runs first
display = horus.Node(..., order=1) # runs second
Using Typed Messages
The dict-based version above works, but typed topics are significantly faster and enforce a schema at compile time. Here is the same sensor + display tutorial rewritten with horus.Imu:
import horus
import math
# ── Sensor Node (typed) ────────────────────────────────────
def make_sensor():
"""Publishes IMU readings using the typed horus.Imu message."""
t = [0.0]
def tick(node):
t[0] += horus.dt()
reading = horus.Imu(
accel_x=0.1 * math.sin(t[0] * 2.0),
accel_y=0.05 * math.cos(t[0] * 3.0),
accel_z=9.81 + 0.02 * math.sin(t[0] * 10.0),
gyro_x=0.01 * math.sin(t[0]),
gyro_y=0.02 * math.cos(t[0] * 0.5),
gyro_z=0.05,
)
node.send("imu", reading)
return horus.Node(name="imu_sensor", tick=tick, rate=100, order=0,
pubs=[horus.Imu])
# ── Display Node (typed) ───────────────────────────────────
def make_display():
"""Receives typed Imu messages — fields are attributes, not dict keys."""
count = [0]
def tick(node):
imu = node.recv("imu")
if imu:
count[0] += 1
if count[0] % 100 == 0:
print(f"[#{count[0]}] Accel Z: {imu.accel_z:.2f} m/s² "
f"Gyro: ({imu.gyro_x:.4f}, {imu.gyro_y:.4f}, {imu.gyro_z:.4f})")
return horus.Node(name="display", tick=tick, rate=10, order=1,
subs=[horus.Imu])
# ── Main ────────────────────────────────────────────────────
print("Starting IMU tutorial (typed)...\n")
horus.run(make_sensor(), make_display())
Key Differences from the Dict Version
| Dict topics | Typed topics | |
|---|---|---|
| Publish | node.send("imu.data", {"accel_z": 9.81}) | node.send("imu", horus.Imu(accel_z=9.81)) |
| Subscribe | msg["accel_z"] | imu.accel_z |
| Declare | pubs=["imu.data"] | pubs=[horus.Imu] |
| Transport | GenericMessage serialization | Zero-copy Pod transport |
Performance: Typed topics (
horus.Imu) use zero-copy Pod transport at ~1.7 us. Dict topics use GenericMessage serialization at ~6-12 us. Use typed topics for anything in a control loop.
Experiments
- Change the rate — try
rate=1000for 1kHz sampling - Add noise — use
horus.rng_float()for deterministic random noise - Add a filter node — subscribe to raw IMU, apply smoothing, publish filtered data
Next Steps
- Tutorial 2: Motor Controller (Python) — subscribe to commands and control a motor
- Tutorial 1 (Rust) — same tutorial in Rust
- Python API Reference — full API docs
See Also
- IMU Sensor (Rust) — Rust version
- Python Bindings — Python API reference