Multi-Sensor Fusion (Python)
Fuses IMU orientation with wheel odometry using a complementary filter. Subscribes to Imu and Odometry, caches the latest from each, blends heading when both are available, and publishes a corrected Odometry with fused heading. Uses NumPy for efficient angle wrapping.
Problem
You need a single pose estimate from multiple sensors that run at different rates and have different noise characteristics.
When To Use
- Combining IMU heading with wheel odometry for ground robots
- Any system where no single sensor gives a complete state estimate
- When you need a confidence metric for downstream planners
Prerequisites
- HORUS installed (Installation Guide)
- Python 3.8+ with
numpyinstalled
horus.toml
[package]
name = "sensor-fusion-py"
version = "0.1.0"
description = "IMU + odometry complementary filter"
language = "python"
[dependencies]
numpy = { version = ">=1.24", source = "pypi" }
Complete Code
#!/usr/bin/env python3
"""Complementary filter: fuse IMU heading with wheel odometry position."""
import math
import numpy as np
import horus
from horus import Node, Imu, Odometry, us, ms
# ── Configuration ────────────────────────────────────────────
ALPHA = 0.7 # Favor IMU for heading (less drift than wheels on turns)
HIGH_YAW_RATE = 0.5 # rad/s — threshold for reducing confidence during fast turns
# ── State ────────────────────────────────────────────────────
last_imu = [None]
last_odom = [None]
tick_count = [0]
# ── Helpers ──────────────────────────────────────────────────
def wrap_angle(angle):
"""Wrap angle to [-pi, pi] using NumPy."""
return float(np.arctan2(np.sin(angle), np.cos(angle)))
def imu_yaw(imu_msg):
"""Extract yaw rate from IMU gyro_z (simplified — no magnetometer)."""
return imu_msg.gyro_z
# ── Node callbacks ───────────────────────────────────────────
def fusion_tick(node):
# IMPORTANT: always recv() ALL topics every tick to drain buffers
imu = node.recv("imu.raw")
odom = node.recv("odom.wheels")
if imu is not None:
last_imu[0] = imu
if odom is not None:
last_odom[0] = odom
# Fuse only when both sources are available
if last_imu[0] is None or last_odom[0] is None:
return
tick_count[0] += 1
imu_data = last_imu[0]
odom_data = last_odom[0]
# Integrate IMU yaw rate to get heading estimate
dt = 1.0 / 50.0 # 50 Hz fusion rate
imu_heading = odom_data.theta + imu_data.gyro_z * dt
# Complementary filter: blend odom heading with IMU-corrected heading
fused_theta = wrap_angle(
(1.0 - ALPHA) * odom_data.theta + ALPHA * imu_heading
)
# Confidence drops during fast turns (IMU gyro saturates)
yaw_rate = abs(imu_data.gyro_z)
confidence = 0.6 if yaw_rate > HIGH_YAW_RATE else 0.9
# Publish fused pose as corrected Odometry
fused = Odometry(
x=odom_data.x,
y=odom_data.y,
theta=fused_theta,
linear_velocity=odom_data.linear_velocity,
angular_velocity=odom_data.angular_velocity,
)
node.send("pose.fused", fused)
# Publish confidence as a separate lightweight message
node.send("pose.confidence", {"confidence": confidence, "tick": tick_count[0]})
if tick_count[0] % 100 == 0:
print(
f"Fusion tick {tick_count[0]}: "
f"odom=({odom_data.x:.2f}, {odom_data.y:.2f}, {odom_data.theta:.2f}) "
f"fused_theta={fused_theta:.2f} conf={confidence:.1f}"
)
def fusion_shutdown(node):
print(f"Fusion: completed {tick_count[0]} ticks")
# ── Main ─────────────────────────────────────────────────────
fusion_node = Node(
name="Fusion",
tick=fusion_tick,
shutdown=fusion_shutdown,
rate=50, # 50 Hz output — from 100 Hz IMU + 20 Hz odom
order=10, # After sensor nodes
subs=["imu.raw", "odom.wheels"],
pubs=["pose.fused", "pose.confidence"],
)
if __name__ == "__main__":
horus.run(fusion_node)
Expected Output
[HORUS] Scheduler running — tick_rate: 1000 Hz
[HORUS] Node "Fusion" started (50 Hz)
Fusion tick 100: odom=(0.50, 0.00, 0.10) fused_theta=0.10 conf=0.9
Fusion tick 200: odom=(1.00, 0.05, 0.20) fused_theta=0.20 conf=0.9
^C
Fusion: completed 250 ticks
[HORUS] Shutting down...
[HORUS] Node "Fusion" shutdown complete
Key Points
- Multi-topic aggregation pattern:
recv()all topics, cache latest, fuse when both are available - Complementary filter is the simplest sensor fusion — for production, consider an Extended Kalman Filter (EKF)
ALPHA = 0.7favors IMU for heading — wheel odometry drifts on carpet/tile; tune per surfacewrap_angle()uses NumPy to keep heading in the[-pi, pi]range, avoiding discontinuities- No
shutdown()for actuators — fusion nodes don't move anything, just publish estimates - 50 Hz output from 100 Hz IMU + 20 Hz odometry is fine — fusion runs on cached values
- Confidence field lets downstream nodes decide how much to trust the estimate
Variations
- Extended Kalman Filter (EKF): Replace the complementary filter with
filterpyor a custom EKF - Three-sensor fusion: Add GPS or LiDAR-based localization as a third input
- Adaptive alpha: Adjust
ALPHAbased on vehicle dynamics — favor IMU during fast turns, odometry on straights - NumPy matrix filter: Use a full state vector
[x, y, theta, v, omega]with matrix operations
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Fused heading drifts over time | Complementary filter cannot correct absolute drift | Add an absolute reference (GPS, magnetometer) or switch to EKF |
| No output published | One or both sensors not publishing | Check horus monitor for active topics on imu.raw and odom.wheels |
| Heading jumps at startup | First recv() returns stale cached data | Initialize caches to None and only fuse when both are fresh |
| Confidence always low | HIGH_YAW_RATE threshold too tight | Tune the threshold to match your robot's normal turn rate |
| Heading wraps incorrectly | Not using angle wrapping | Use wrap_angle() around any heading blend |
See Also
- Multi-Sensor Fusion (Rust) — Rust version of this recipe
- IMU Reader (Python) — IMU publishing pattern