Tutorial 4: Custom Messages (Python)

Looking for the Rust version? See Tutorial 4: Custom Message Types.

Learn three ways to send data between Python nodes: typed messages (fast), dicts (flexible), and dataclasses (structured).

What You'll Learn

  • Using built-in typed messages (CmdVel, Imu, Odometry)
  • Sending Python dicts as generic messages
  • Using dataclasses for structured Python-only data
  • Compiling custom messages with maturin for production performance
  • Performance trade-offs between approaches
  • When to use which approach

Prerequisites: Tutorial 1 (Python) completed. A working horus installation.

Time: 25 minutes


Approach 1: Built-in Typed Messages (Fastest)

Use HORUS's built-in message types for maximum performance (~1.7us latency, zero-copy with Rust nodes). These types are defined in Rust and exposed to Python via PyO3 bindings, so they work seamlessly across both languages.

When to use typed messages

  • Your data matches a standard robotics message (IMU, velocity, pose, etc.)
  • You need cross-language compatibility (Python publisher, Rust subscriber, or vice versa)
  • You want the fastest possible latency without a custom build step
  • You are building control loops, sensor pipelines, or motor drivers

Complete example

Create typed_demo.py:

import horus

# --- Node tick functions ---

def imu_sensor_tick(node):
    """Simulate an IMU sensor publishing orientation and acceleration."""
    imu = horus.Imu()
    # Imu fields: orientation[4], angular_velocity[3], linear_acceleration[3]
    # All fields default to 0.0 — set what you need
    node.send("imu", imu)

def motor_driver_tick(node):
    """Receive velocity commands and 'drive' motors."""
    cmd = node.recv("cmd_vel")
    if cmd is not None:
        print(f"[Motor] linear={cmd.linear:.2f} angular={cmd.angular:.2f}")

def controller_tick(node):
    """Read IMU data and publish velocity commands."""
    imu = node.recv("imu")
    if imu is not None:
        # Simple proportional controller: slow down if tilting
        tilt = abs(imu.linear_acceleration[0])
        speed = max(0.1, 1.0 - tilt * 0.5)
        cmd = horus.CmdVel(linear=speed, angular=0.0)
        node.send("cmd_vel", cmd)
        print(f"[Ctrl] tilt={tilt:.2f} -> speed={speed:.2f}")

# --- Node definitions ---

sensor = horus.Node(
    name="ImuSensor",
    tick=imu_sensor_tick,
    rate=100,
    order=0,
    pubs=[horus.Imu],  # typed topic — HORUS knows the type at registration
)

controller = horus.Node(
    name="Controller",
    tick=controller_tick,
    rate=100,
    order=1,
    subs=[horus.Imu],
    pubs=[horus.CmdVel],
)

motor = horus.Node(
    name="MotorDriver",
    tick=motor_driver_tick,
    rate=100,
    order=2,
    subs=[horus.CmdVel],
)

# --- Run ---

horus.run(sensor, controller, motor, duration=5.0)

Create horus.toml in the same directory:

[project]
name = "typed-demo"
version = "0.1.0"
language = "python"
entry = "typed_demo.py"

Run it:

horus run

Expected output:

[Ctrl] tilt=0.00 -> speed=1.00
[Motor] linear=1.00 angular=0.00
[Ctrl] tilt=0.00 -> speed=1.00
[Motor] linear=1.00 angular=0.00
...

Key details

  • pubs=[horus.Imu] registers a typed topic. HORUS uses the type's built-in topic name (e.g., horus.Imu maps to "imu").
  • node.send("imu", imu) sends to the topic by name. The name must match what subscribers use.
  • node.recv("imu") returns None if no message has arrived since the last read.
  • All 70+ built-in types are zero-copy across the Rust/Python boundary: the Python object wraps the same shared memory that Rust nodes read.

Available types: CmdVel, Imu, Odometry, LaserScan, Pose2D, Pose3D, Twist, BatteryState, JointState, MotorCommand, ServoCommand, EmergencyStop, and 60+ more. See Message Types for the full list.


Approach 2: Python Dicts (Most Flexible)

Send any Python dict as a message. HORUS serializes it with MessagePack under the hood (~6-50us latency depending on dict size). No type registration, no build step, no schema definition.

When to use dicts

  • You are prototyping and the message schema is still evolving
  • Your data does not fit any built-in type (configuration blobs, experiment metadata, etc.)
  • You only need Python-to-Python communication
  • The topic runs at low frequency (1-10Hz) where microsecond overhead does not matter

Complete example

Create dict_demo.py:

import horus
import time

# --- Node tick functions ---

def environment_sensor_tick(node):
    """Simulate an environment sensor with multiple readings."""
    reading = {
        "temperature": 23.5 + (horus.rng_float() * 2.0 - 1.0),
        "humidity": 0.65 + (horus.rng_float() * 0.1 - 0.05),
        "pressure": 1013.25,
        "location": "lab_room_3",
        "tags": ["indoor", "calibrated"],
        "nested": {
            "sensor_id": 42,
            "firmware": "v2.1.0",
        },
    }
    node.send("environment", reading)

def logger_tick(node):
    """Log environment data to the console."""
    data = node.recv("environment")
    if data is not None:
        temp = data["temperature"]
        humidity = data["humidity"] * 100
        sensor_id = data["nested"]["sensor_id"]
        print(f"[Logger] sensor={sensor_id} temp={temp:.1f}C humidity={humidity:.0f}%")

def alert_tick(node):
    """Check for out-of-range readings and publish alerts."""
    data = node.recv("environment")
    if data is not None:
        if data["temperature"] > 24.0:
            alert = {
                "type": "temperature_high",
                "value": data["temperature"],
                "threshold": 24.0,
                "location": data["location"],
            }
            node.send("alerts", alert)
            print(f"[Alert] Temperature {data['temperature']:.1f}C exceeds 24.0C")

# --- Node definitions ---

sensor = horus.Node(
    name="EnvSensor",
    tick=environment_sensor_tick,
    rate=1,       # 1 Hz — environment data changes slowly
    order=0,
    pubs=["environment"],  # string topic name = dict-based message
)

logger = horus.Node(
    name="Logger",
    tick=logger_tick,
    rate=1,
    order=1,
    subs=["environment"],
)

alerter = horus.Node(
    name="Alerter",
    tick=alert_tick,
    rate=1,
    order=2,
    subs=["environment"],
    pubs=["alerts"],
)

# --- Run ---

horus.run(sensor, logger, alerter, duration=10.0)

Create horus.toml:

[project]
name = "dict-demo"
version = "0.1.0"
language = "python"
entry = "dict_demo.py"

Run it:

horus run

Expected output:

[Logger] sensor=42 temp=23.8C humidity=63%
[Logger] sensor=42 temp=22.9C humidity=67%
[Alert] Temperature 24.3C exceeds 24.0C
[Logger] sensor=42 temp=24.3C humidity=62%
...

What dicts support

Dicts can contain any MessagePack-compatible value:

Python typeSupportedNotes
strYesUTF-8 strings
intYesArbitrary precision
floatYes64-bit float
boolYesTrue / False
listYesHeterogeneous allowed
dictYesNested dicts work
NoneYesSerialized as nil
bytesYesRaw binary data
Custom objectsNoUse asdict() or __dict__ first

Limitations

  • No cross-language support. Rust nodes cannot subscribe to dict-based topics (they do not know the schema). If you need Rust interop, use typed messages (Approach 1) or compiled messages (Approach 4).
  • No type checking. A typo in a key name (data["temperture"]) is a runtime KeyError, not a compile-time error.
  • Size overhead. MessagePack includes key names in every message. A dict with long key names is larger than an equivalent struct.

Approach 3: Dataclasses (Structured Python)

For structured Python-only data, define @dataclass classes and serialize them to dicts with asdict(). This gives you IDE autocompletion, type hints, and constructor validation while still using the dict transport under the hood.

When to use dataclasses

  • You want Python type checking and IDE support
  • Multiple team members work on the same message schema
  • You need default values, validation, or computed fields
  • The data stays within Python (no Rust subscribers)

Complete example

Create dataclass_demo.py:

import horus
from dataclasses import dataclass, asdict, field
from typing import List

# --- Message definitions ---

@dataclass
class BatteryReading:
    voltage: float
    current: float
    temperature: float
    charge_percent: float
    cell_count: int = 3

    def is_critical(self) -> bool:
        """Business logic directly on the message."""
        return self.voltage < 10.5 or self.temperature > 60.0

@dataclass
class BatteryAlert:
    level: str          # "info", "warning", "critical"
    message: str
    voltage: float
    charge_percent: float

@dataclass
class SystemStatus:
    battery: BatteryReading
    alerts: List[str] = field(default_factory=list)
    uptime_secs: float = 0.0

# --- Node tick functions ---

tick_counter = {"value": 0}

def battery_sensor_tick(node):
    """Simulate a draining battery."""
    tick_counter["value"] += 1
    t = tick_counter["value"]

    reading = BatteryReading(
        voltage=12.6 - (t * 0.15),
        current=2.5,
        temperature=35.0 + (t * 0.3),
        charge_percent=max(0.0, 100.0 - t * 5.0),
    )
    # asdict() converts the dataclass to a dict for transport
    node.send("battery.raw", asdict(reading))
    print(f"[Battery] {reading.charge_percent:.0f}% ({reading.voltage:.1f}V)")

def monitor_tick(node):
    """Watch battery readings and publish alerts."""
    data = node.recv("battery.raw")
    if data is not None:
        # Reconstruct the dataclass from the dict
        reading = BatteryReading(**data)

        if reading.is_critical():
            alert = BatteryAlert(
                level="critical",
                message=f"Battery critical: {reading.voltage:.1f}V",
                voltage=reading.voltage,
                charge_percent=reading.charge_percent,
            )
            node.send("battery.alert", asdict(alert))
            print(f"[Monitor] CRITICAL: {alert.message}")
        elif reading.charge_percent < 30.0:
            alert = BatteryAlert(
                level="warning",
                message=f"Battery low: {reading.charge_percent:.0f}%",
                voltage=reading.voltage,
                charge_percent=reading.charge_percent,
            )
            node.send("battery.alert", asdict(alert))
            print(f"[Monitor] WARNING: {alert.message}")

def dashboard_tick(node):
    """Aggregate battery data and alerts into a system status."""
    battery_data = node.recv("battery.raw")
    alert_data = node.recv("battery.alert")

    if battery_data is not None:
        battery = BatteryReading(**battery_data)
        alerts = []
        if alert_data is not None:
            alerts.append(alert_data["message"])

        status = SystemStatus(
            battery=battery,
            alerts=alerts,
            uptime_secs=tick_counter["value"],
        )
        # Nested dataclasses serialize correctly with asdict()
        node.send("system.status", asdict(status))

# --- Node definitions ---

battery = horus.Node(
    name="Battery",
    tick=battery_sensor_tick,
    rate=1,
    order=0,
    pubs=["battery.raw"],
)

monitor = horus.Node(
    name="Monitor",
    tick=monitor_tick,
    rate=1,
    order=1,
    subs=["battery.raw"],
    pubs=["battery.alert"],
)

dashboard = horus.Node(
    name="Dashboard",
    tick=dashboard_tick,
    rate=1,
    order=2,
    subs=["battery.raw", "battery.alert"],
    pubs=["system.status"],
)

# --- Run ---

horus.run(battery, monitor, dashboard, duration=20.0)

Create horus.toml:

[project]
name = "dataclass-demo"
version = "0.1.0"
language = "python"
entry = "dataclass_demo.py"

Run it:

horus run

Expected output:

[Battery] 95% (12.5V)
[Battery] 90% (12.3V)
[Battery] 85% (12.2V)
...
[Battery] 25% (10.9V)
[Monitor] WARNING: Battery low: 25%
[Battery] 20% (10.7V)
[Monitor] WARNING: Battery low: 20%
...
[Battery] 5% (10.4V)
[Monitor] CRITICAL: Battery critical: 10.4V

Dataclass tips

  • asdict() handles nesting. If SystemStatus contains a BatteryReading, asdict() recursively converts both.
  • Reconstruction with **data. BatteryReading(**data) works because asdict() preserves field names. Nested dataclasses come back as plain dicts though, so you need to reconstruct them manually: reading = BatteryReading(**data["battery"]).
  • Validation in __post_init__. Add a __post_init__ method to validate on construction:
@dataclass
class BatteryReading:
    voltage: float
    current: float
    temperature: float
    charge_percent: float

    def __post_init__(self):
        if not 0.0 <= self.charge_percent <= 100.0:
            raise ValueError(f"charge_percent must be 0-100, got {self.charge_percent}")
  • Methods survive transport. After BatteryReading(**data), you can call reading.is_critical() -- the methods are on the class, not on the dict.

Approach 4: Compiled Messages with maturin (Production)

For Python-only custom messages at high frequency, use horus.msggen to define a message schema, then compile it with maturin for near-native performance (~3-5us). This approach requires a build step but gives you a typed, binary-serialized message without writing any Rust yourself.

When to use compiled messages

  • You have a custom message type that does not match any built-in type
  • The topic runs at high frequency (50Hz+) and dict overhead is too high
  • You want binary serialization without writing Rust
  • You are deploying to production and want predictable performance

Complete example

Create compiled_demo.py:

from horus.msggen import define_message
import horus

# --- Define a custom message type ---
# This creates a Python class backed by fixed-layout binary serialization.
# At runtime (no maturin), latency is ~20-40us.
# After `maturin develop`, latency drops to ~3-5us.

RobotStatus = define_message("RobotStatus", "robot.status", [
    ("battery_level", "f32"),
    ("motor_rpm", "f32"),
    ("error_code", "i32"),
    ("is_active", "bool"),
    ("tick_count", "u64"),
])

WheelSpeed = define_message("WheelSpeed", "wheel.speed", [
    ("left_rpm", "f32"),
    ("right_rpm", "f32"),
    ("timestamp_ns", "u64"),
])

# --- Node tick functions ---

counter = {"value": 0}

def status_publisher_tick(node):
    """Publish robot status at 50 Hz."""
    counter["value"] += 1
    status = RobotStatus(
        battery_level=85.0,
        motor_rpm=3200.0 + (counter["value"] % 100),
        error_code=0,
        is_active=True,
        tick_count=counter["value"],
    )
    node.send("robot.status", status)

def wheel_publisher_tick(node):
    """Publish wheel speeds at 50 Hz."""
    speed = WheelSpeed(
        left_rpm=150.0,
        right_rpm=148.0,
        timestamp_ns=counter["value"] * 20_000_000,  # 20ms per tick
    )
    node.send("wheel.speed", speed)

def dashboard_tick(node):
    """Read both topics and display."""
    status = node.recv("robot.status")
    wheels = node.recv("wheel.speed")
    if status is not None and wheels is not None:
        print(
            f"[Dash] battery={status.battery_level:.0f}% "
            f"rpm={status.motor_rpm:.0f} "
            f"wheels=({wheels.left_rpm:.0f}, {wheels.right_rpm:.0f})"
        )

# --- Node definitions ---

status_pub = horus.Node(
    name="StatusPub",
    tick=status_publisher_tick,
    rate=50,
    order=0,
    pubs=["robot.status"],
)

wheel_pub = horus.Node(
    name="WheelPub",
    tick=wheel_publisher_tick,
    rate=50,
    order=1,
    pubs=["wheel.speed"],
)

dash = horus.Node(
    name="Dashboard",
    tick=dashboard_tick,
    rate=50,
    order=2,
    subs=["robot.status", "wheel.speed"],
)

# --- Run ---

horus.run(status_pub, wheel_pub, dash, duration=5.0)

Create horus.toml:

[project]
name = "compiled-demo"
version = "0.1.0"
language = "python"
entry = "compiled_demo.py"

This example runs without any build step (runtime mode, ~20-40us). To get compiled performance (~3-5us), run:

# One-time setup: install maturin
pip install maturin

# Compile the message definitions
maturin develop

After compilation, the same code runs with binary serialization instead of Python's struct module.

Expected output (both runtime and compiled):

[Dash] battery=85% rpm=3201 wheels=(150, 148)
[Dash] battery=85% rpm=3202 wheels=(150, 148)
[Dash] battery=85% rpm=3203 wheels=(150, 148)
...

Performance Comparison

ApproachLatencyCross-language?Build step?Best for
Typed (horus.CmdVel)~1.7usYes (Rust + Python)NoControl loops, typed sensor data
Dict (GenericMessage)~6-50usNo (Python only)NoPrototyping, flexible schemas
Dataclass (runtime)~20-40usNo (Python only)NoStructured Python-only data
Compiled (maturin)~3-5usNo (Python only)Yes (maturin develop)Production, high-frequency custom types

Latency is measured end-to-end: serialize on the publisher, write to shared memory, read on the subscriber, deserialize. Dict latency varies with payload size (6us for a small dict, 50us for a dict with large nested structures).

Choosing the right approach

Follow this decision process:

Does your data match a built-in type? Use typed messages (Approach 1). They are the fastest option with no build step, and they work across Rust and Python. If you are sending CmdVel, Imu, Odometry, or any of the 70+ standard types, there is no reason to use anything else.

Are you prototyping? Use dicts (Approach 2). You can change the schema by editing a Python dict literal. No class definitions, no registration, no build step. When the schema stabilizes, migrate to typed or compiled messages.

Do you need structure but stay Python-only? Use dataclasses (Approach 3). You get IDE autocompletion, type hints, and constructor validation. The transport cost is the same as dicts (they serialize as dicts), but your code is cleaner and more maintainable.

Do you need a custom type at high frequency? Use compiled messages (Approach 4). Define the schema once with define_message(), compile with maturin, and get ~3-5us latency with binary serialization. This is the right choice for production Python nodes running at 50Hz or above with custom data.

Do you need cross-language custom types? Define the message in Rust with message! and expose it to Python via PyO3. See Tutorial 4 (Rust) for the Rust side and Python API: Custom Messages for binding patterns.


Common Mistakes

Mixing typed and string topic names. If you register pubs=[horus.Imu] but send with node.send("imu", some_dict), the subscriber gets a dict, not an Imu object. Always match the registration type with the send type.

Forgetting asdict(). Sending a raw dataclass instance without asdict() will fail at serialization. Always wrap: node.send("topic", asdict(my_dataclass)).

Nested dataclass reconstruction. BatteryReading(**data) works for flat dataclasses, but if your dataclass contains another dataclass, the nested field comes back as a plain dict. Reconstruct it manually:

@dataclass
class Outer:
    inner: Inner
    value: float

# After receiving:
data = node.recv("topic")
if data is not None:
    inner = Inner(**data["inner"])
    outer = Outer(inner=inner, value=data["value"])

Dict key typos. data["temperture"] (typo) raises KeyError at runtime. Dataclasses catch this at construction time: BatteryReading(temperture=23.5) raises TypeError. This is why Approach 3 is better for team projects.


Next Steps


See Also