Multi-Sensor Fusion (Python)

Fuses IMU orientation with wheel odometry using a complementary filter. Subscribes to Imu and Odometry, caches the latest from each, blends heading when both are available, and publishes a corrected Odometry with fused heading. Uses NumPy for efficient angle wrapping.

Problem

You need a single pose estimate from multiple sensors that run at different rates and have different noise characteristics.

When To Use

  • Combining IMU heading with wheel odometry for ground robots
  • Any system where no single sensor gives a complete state estimate
  • When you need a confidence metric for downstream planners

Prerequisites

horus.toml

[package]
name = "sensor-fusion-py"
version = "0.1.0"
description = "IMU + odometry complementary filter"
language = "python"

[dependencies]
numpy = { version = ">=1.24", source = "pypi" }

Complete Code

#!/usr/bin/env python3
"""Complementary filter: fuse IMU heading with wheel odometry position."""

import math
import numpy as np
import horus
from horus import Node, Imu, Odometry, us, ms

# ── Configuration ────────────────────────────────────────────

ALPHA = 0.7           # Favor IMU for heading (less drift than wheels on turns)
HIGH_YAW_RATE = 0.5   # rad/s — threshold for reducing confidence during fast turns

# ── State ────────────────────────────────────────────────────

last_imu = [None]
last_odom = [None]
tick_count = [0]

# ── Helpers ──────────────────────────────────────────────────

def wrap_angle(angle):
    """Wrap angle to [-pi, pi] using NumPy."""
    return float(np.arctan2(np.sin(angle), np.cos(angle)))

def imu_yaw(imu_msg):
    """Extract yaw rate from IMU gyro_z (simplified — no magnetometer)."""
    return imu_msg.gyro_z

# ── Node callbacks ───────────────────────────────────────────

def fusion_tick(node):
    # IMPORTANT: always recv() ALL topics every tick to drain buffers
    imu = node.recv("imu.raw")
    odom = node.recv("odom.wheels")

    if imu is not None:
        last_imu[0] = imu
    if odom is not None:
        last_odom[0] = odom

    # Fuse only when both sources are available
    if last_imu[0] is None or last_odom[0] is None:
        return

    tick_count[0] += 1
    imu_data = last_imu[0]
    odom_data = last_odom[0]

    # Integrate IMU yaw rate to get heading estimate
    dt = 1.0 / 50.0  # 50 Hz fusion rate
    imu_heading = odom_data.theta + imu_data.gyro_z * dt

    # Complementary filter: blend odom heading with IMU-corrected heading
    fused_theta = wrap_angle(
        (1.0 - ALPHA) * odom_data.theta + ALPHA * imu_heading
    )

    # Confidence drops during fast turns (IMU gyro saturates)
    yaw_rate = abs(imu_data.gyro_z)
    confidence = 0.6 if yaw_rate > HIGH_YAW_RATE else 0.9

    # Publish fused pose as corrected Odometry
    fused = Odometry(
        x=odom_data.x,
        y=odom_data.y,
        theta=fused_theta,
        linear_velocity=odom_data.linear_velocity,
        angular_velocity=odom_data.angular_velocity,
    )
    node.send("pose.fused", fused)

    # Publish confidence as a separate lightweight message
    node.send("pose.confidence", {"confidence": confidence, "tick": tick_count[0]})

    if tick_count[0] % 100 == 0:
        print(
            f"Fusion tick {tick_count[0]}: "
            f"odom=({odom_data.x:.2f}, {odom_data.y:.2f}, {odom_data.theta:.2f}) "
            f"fused_theta={fused_theta:.2f} conf={confidence:.1f}"
        )

def fusion_shutdown(node):
    print(f"Fusion: completed {tick_count[0]} ticks")

# ── Main ─────────────────────────────────────────────────────

fusion_node = Node(
    name="Fusion",
    tick=fusion_tick,
    shutdown=fusion_shutdown,
    rate=50,                       # 50 Hz output — from 100 Hz IMU + 20 Hz odom
    order=10,                      # After sensor nodes
    subs=["imu.raw", "odom.wheels"],
    pubs=["pose.fused", "pose.confidence"],
)

if __name__ == "__main__":
    horus.run(fusion_node)

Expected Output

[HORUS] Scheduler running — tick_rate: 1000 Hz
[HORUS] Node "Fusion" started (50 Hz)
Fusion tick 100: odom=(0.50, 0.00, 0.10) fused_theta=0.10 conf=0.9
Fusion tick 200: odom=(1.00, 0.05, 0.20) fused_theta=0.20 conf=0.9
^C
Fusion: completed 250 ticks
[HORUS] Shutting down...
[HORUS] Node "Fusion" shutdown complete

Key Points

  • Multi-topic aggregation pattern: recv() all topics, cache latest, fuse when both are available
  • Complementary filter is the simplest sensor fusion — for production, consider an Extended Kalman Filter (EKF)
  • ALPHA = 0.7 favors IMU for heading — wheel odometry drifts on carpet/tile; tune per surface
  • wrap_angle() uses NumPy to keep heading in the [-pi, pi] range, avoiding discontinuities
  • No shutdown() for actuators — fusion nodes don't move anything, just publish estimates
  • 50 Hz output from 100 Hz IMU + 20 Hz odometry is fine — fusion runs on cached values
  • Confidence field lets downstream nodes decide how much to trust the estimate

Variations

  • Extended Kalman Filter (EKF): Replace the complementary filter with filterpy or a custom EKF
  • Three-sensor fusion: Add GPS or LiDAR-based localization as a third input
  • Adaptive alpha: Adjust ALPHA based on vehicle dynamics — favor IMU during fast turns, odometry on straights
  • NumPy matrix filter: Use a full state vector [x, y, theta, v, omega] with matrix operations

Common Errors

SymptomCauseFix
Fused heading drifts over timeComplementary filter cannot correct absolute driftAdd an absolute reference (GPS, magnetometer) or switch to EKF
No output publishedOne or both sensors not publishingCheck horus monitor for active topics on imu.raw and odom.wheels
Heading jumps at startupFirst recv() returns stale cached dataInitialize caches to None and only fuse when both are fresh
Confidence always lowHIGH_YAW_RATE threshold too tightTune the threshold to match your robot's normal turn rate
Heading wraps incorrectlyNot using angle wrappingUse wrap_angle() around any heading blend

See Also