Topics Deep-Dive (Python)

A warehouse robot has a Python ML node running YOLO at 30 FPS, a Rust motor controller ticking at 1 kHz, and a Python dashboard logging everything. The ML node detects an obstacle and needs to tell the motor controller to stop -- in under 2 microseconds, across process boundaries, without the GIL blocking the Rust side.

HORUS topics make this work. Every node.send() writes into a shared-memory ring buffer. Every node.recv() reads from it. No sockets, no serialization for typed messages, no kernel involvement. The Python node and the Rust node share the same memory region -- the ML node writes a CmdVel, and the motor controller reads it directly.

This page covers the Python topic API in depth: how to declare topics, what determines your latency, how to receive messages correctly, and when to use the standalone Topic class outside the scheduler.


How Topics Work

A topic is a named ring buffer in shared memory. When you call node.send("cmd_vel", data), HORUS writes data into the ring buffer for the topic named "cmd_vel". When another node calls node.recv("cmd_vel"), it reads the oldest unread message from that same buffer.

  Python Node A                   Shared Memory                  Rust/Python Node B
 ┌─────────────┐          ┌─────────────────────────┐          ┌─────────────────┐
 │ node.send() │ ──write──│  Ring Buffer (1024 slots)│──read──  │  node.recv()    │
 │             │          │  topic: "cmd_vel"        │          │                 │
 └─────────────┘          └─────────────────────────┘          └─────────────────┘

Key properties:

  • Lock-free: Writers and readers never block each other. send() always returns immediately.
  • Bounded: The ring buffer has a fixed capacity (default: 1024 messages). When full, the oldest unread message is overwritten.
  • Cross-language: A Python publisher and a Rust subscriber share the exact same SHM region. No bridge process, no translation layer.
  • Auto-discovered: Two nodes that use the same topic name are connected automatically. No configuration, no broker.

Three Ways to Declare Topics

Topic declarations go in the pubs and subs parameters of horus.Node(). The declaration style determines the performance path -- string topics use generic serialization, typed topics use zero-copy Pod transfer.

String Topics (Generic)

import horus

node = horus.Node(
    name="logger",
    subs=["sensor.data", "motor.status"],
    pubs=["log.output"],
    tick=my_tick,
    rate=10,
)

String topics use GenericMessage under the hood -- your data is serialized with MessagePack before writing to SHM. This handles any Python-serializable value: dicts, lists, strings, numbers, nested structures.

def my_tick(node):
    # Dicts, lists, nested structures -- all work
    node.send("log.output", {
        "level": "info",
        "message": "Motor started",
        "details": {"voltage": 12.4, "current": 1.2},
    })

Latency: ~5--50 us per send+recv round-trip (depends on message size). The cost is serialization and deserialization -- MessagePack must convert your Python dict to bytes and back.

When to use: Prototyping, logging, configuration updates, any data that changes shape frequently, or when you need to send arbitrary Python objects.

Typed Topics (Pod Zero-Copy)

from horus import Node, CmdVel, Imu, LaserScan

node = Node(
    name="controller",
    subs=[Imu, LaserScan],
    pubs=[CmdVel],
    tick=control_tick,
    rate=100,
)

When you pass a message class instead of a string, HORUS uses the Pod (plain-old-data) path. The message type has a fixed binary layout known at compile time. The Python wrapper writes fields directly into SHM -- no serialization step.

Topic names are derived automatically from the type:

TypeAuto-generated name
CmdVel"cmd_vel"
Imu"imu"
LaserScan"scan"
Pose2D"pose2d"
Odometry"odometry"
from horus import CmdVel

def control_tick(node):
    cmd = CmdVel(linear=1.0, angular=0.0)
    node.send("cmd_vel", cmd)

Latency: ~1.5 us per send+recv round-trip. This is 3--30x faster than the generic path because there is no serialization -- the Pod struct is memory-mapped directly.

When to use: Control loops, sensor data, any hot path where latency matters. Use typed topics for anything running above 10 Hz.

Named Typed Topics

node = horus.Node(
    name="dual_arm",
    pubs={"left.cmd": CmdVel, "right.cmd": CmdVel},
    subs={"left.odom": Odometry, "right.odom": Odometry},
    tick=dual_arm_tick,
    rate=100,
)

A dict maps custom topic names to types. This gives you the Pod zero-copy performance path with explicit control over the topic name -- essential when you have multiple topics of the same type (two arms, two cameras, four wheels).

def dual_arm_tick(node):
    left_odom = node.recv("left.odom")
    right_odom = node.recv("right.odom")

    if left_odom and right_odom:
        node.send("left.cmd", CmdVel(linear=1.0, angular=0.0))
        node.send("right.cmd", CmdVel(linear=1.0, angular=0.0))

Latency: Same as typed topics (~1.5 us). The dict syntax only changes the name, not the transport mechanism.

Declaration Summary

StyleSyntaxTransportLatencyBest for
Stringpubs=["data"]GenericMessage (MessagePack)~5--50 usDicts, prototyping, flexible data
Typedpubs=[CmdVel]Pod zero-copy~1.5 usControl loops, sensor data
Named typedpubs={"cmd": CmdVel}Pod zero-copy~1.5 usMultiple topics of the same type

You can mix styles in the same node. Use typed topics for your hot-path control data and string topics for debug/logging output:

node = horus.Node(
    name="controller",
    subs=[Imu],                          # Typed: fast sensor input
    pubs={"cmd": CmdVel, "debug": None}, # Typed CmdVel + string debug
    tick=control_tick,
    rate=100,
)

Performance: Generic vs Typed

The difference between generic and typed topics is not academic. At high frequencies, serialization overhead dominates:

ScenarioGeneric (string)Typed (Pod)Speedup
CmdVel (16 bytes) at 100 Hz~10 us/msg~1.5 us/msg6.7x
Imu (~300 bytes) at 200 Hz~25 us/msg~2.5 us/msg10x
dict with 5 keys at 30 Hz~15 us/msgN/A--

At 1 kHz, a 25 us generic send+recv consumes 2.5% of every tick cycle. A 1.5 us typed send+recv consumes 0.15%. For a motor controller with a 1 ms budget, that difference is the margin between meeting deadlines and missing them.

Rule of thumb: If the topic runs above 10 Hz and carries structured data with a known schema, use a typed topic. If the data is ad-hoc (debug dicts, configuration blobs, log messages), use a string topic.

Why the Difference?

Generic topics must serialize your Python dict into bytes (MessagePack encoding), copy those bytes into SHM, then deserialize on the receiver side. Typed topics have a fixed binary layout -- the Python wrapper writes each field at its known offset in the SHM slot, and the receiver reads from those same offsets. No encoding, no decoding, no intermediate byte buffer.


Receiving Messages

recv() -- Take One Message

def tick(node):
    msg = node.recv("sensor.data")
    if msg is not None:
        process(msg)

recv() returns the oldest unread message and removes it from the buffer. If nothing is available, it returns None immediately -- it never blocks. Each call consumes exactly one message.

Important: If a publisher sends 5 messages between your ticks, a single recv() returns only the first one. The other 4 remain in the buffer. If you only ever call recv() once per tick but messages arrive faster than you process them, the buffer fills and older messages are dropped.

has_msg() -- Peek Without Consuming

def tick(node):
    if node.has_msg("emergency"):
        stop_cmd = node.recv("emergency")
        handle_emergency(stop_cmd)
    else:
        do_normal_work(node)

has_msg() checks whether at least one message is waiting on the topic, without consuming it. Internally, it performs a recv() and buffers the result -- the next recv() call returns that same message. This means has_msg() followed by recv() does not skip a message.

Use has_msg() when you need to branch on whether data is available before committing to process it.

recv_all() -- Drain the Buffer

def tick(node):
    commands = node.recv_all("commands")
    for cmd in commands:
        execute(cmd)
    node.log_debug(f"Processed {len(commands)} commands")

recv_all() returns a list of all available messages, draining the buffer completely. Returns an empty list if nothing is available.

This is the correct pattern when you must process every message (logging, event recording, command queues) rather than just the latest.

Pattern: Keep-Latest

The most common pattern in control loops -- drain the buffer and act on only the newest message:

def control_tick(node):
    # Drain all buffered IMU readings, keep only the latest
    latest_imu = None
    for msg in node.recv_all("imu"):
        latest_imu = msg

    if latest_imu is not None:
        cmd = compute_control(latest_imu)
        node.send("cmd_vel", cmd)

This ensures you always act on the freshest data, even if several messages accumulated between ticks. Stale readings are discarded.

Pattern: Conditional Multi-Topic

def fusion_tick(node):
    lidar = node.recv("lidar") if node.has_msg("lidar") else None
    camera = node.recv("camera") if node.has_msg("camera") else None

    if lidar and camera:
        fused = fuse_sensors(lidar, camera)
        node.send("fused", fused)
    elif lidar:
        node.send("fused", lidar_only_estimate(lidar))

Check multiple topics and degrade gracefully when some sensors are unavailable.

Pattern: Batch Processing

def logger_tick(node):
    events = node.recv_all("events")
    if events:
        # Batch write is more efficient than one-at-a-time
        with open("log.jsonl", "a") as f:
            for event in events:
                f.write(json.dumps(event) + "\n")

Collect all messages and process them in a batch. More efficient for I/O-heavy consumers that benefit from batching.


Dropped Messages

Messages are dropped when the ring buffer is full and a new send() overwrites the oldest unread slot. This happens when:

  • A publisher sends faster than the subscriber reads (subscriber too slow)
  • A subscriber is temporarily blocked (GIL contention, I/O wait, garbage collection pause)
  • The buffer capacity is too small for the burst rate

How to Detect Drops

HORUS does not expose a Python-level dropped_count() method on nodes. Instead, use these strategies:

Sequence numbers: Add a counter to your messages and check for gaps on the receiver side:

send_seq = 0

def publisher_tick(node):
    global send_seq
    send_seq += 1
    node.send("data", {"seq": send_seq, "value": read_sensor()})

last_seq = 0

def subscriber_tick(node):
    global last_seq
    for msg in node.recv_all("data"):
        if msg["seq"] != last_seq + 1:
            node.log_warning(f"Dropped {msg['seq'] - last_seq - 1} messages")
        last_seq = msg["seq"]
        process(msg)

Rate monitoring: Use horus monitor --tui to watch topic publish and subscribe rates in real time. If the publish rate consistently exceeds the subscribe rate, drops are happening.

How to Prevent Drops

StrategyHowWhen
Drain every tickUse recv_all() instead of single recv()Always, unless you only need latest
Increase capacitydefault_capacity=4096 on NodeBursty publishers
Keep-latest patternDrain + use only the newestControl loops
Reduce publisher rateLower the publisher's ratePublisher is too fast for the subscriber
Use Rust for the subscriberRust nodes process messages fasterGIL is the bottleneck

The default buffer capacity is 1024 messages. For most applications, this is large enough that drops only occur under sustained overload, not brief bursts.


Topic Naming Rules

HORUS topic names use dots for hierarchy. Never use slashes.

# CORRECT
pubs=["sensor.temperature"]
pubs=["robot.arm.left.position"]
pubs=["camera.front.rgb"]

# WRONG -- fails on macOS, may cause subtle bugs on Linux
pubs=["sensor/temperature"]
pubs=["robot/arm/left/position"]

Slashes fail because shm_open (the POSIX system call that creates shared memory regions) interprets / as a directory separator on macOS. A topic named "sensor/temperature" tries to create /dev/shm/sensor/temperature as a nested path, which fails if the intermediate directory does not exist. Dots work identically on Linux and macOS.

FrameworkSeparatorExample
ROS / ROS2//sensor/lidar
HORUS.sensor.lidar

Naming Conventions

PatternExampleUse for
subsystem.datasensor.temperatureMost topics
subsystem.device.datacamera.front.rgbMulti-device systems
robot.subsystem.datarobot1.motor.cmd_velMulti-robot fleets

Avoid: Names starting with _ (reserved for internal use), names containing special characters, names containing /.


Cross-Process Topics

Topics work across process boundaries with zero configuration. A Python node in one process and a Rust node in another process can share the same topic -- HORUS handles everything through shared memory.

How Auto-Discovery Works

When a node creates a topic, HORUS:

  1. Creates (or opens) a shared-memory file named after the topic (e.g., horus_cmd_vel for topic "cmd_vel")
  2. Writes a .meta discovery file containing the topic's type, capacity, and creator PID
  3. Maps the SHM region into the process's address space

When a second node (even in a different process, even in a different language) creates a topic with the same name, HORUS:

  1. Finds the existing SHM file
  2. Maps it into the new process's address space
  3. Both processes now read and write to the same ring buffer

No broker. No discovery service. No network. Just filesystem-level coordination through SHM metadata.

Python + Rust Interop

A Python node and a Rust node share topics seamlessly when they use the same typed messages:

Python process:

from horus import Node, CmdVel, run

def control_tick(node):
    node.send("cmd_vel", CmdVel(linear=1.0, angular=0.5))

controller = Node(name="py_controller", pubs=[CmdVel], tick=control_tick, rate=50)
run(controller)

Rust process (running separately):

use horus::prelude::*;

struct Motor { cmd_sub: Topic<CmdVel> }
impl Node for Motor {
    fn name(&self) -> &str { "Motor" }
    fn tick(&mut self) {
        if let Some(cmd) = self.cmd_sub.recv() {
            drive(cmd.linear, cmd.angular);
        }
    }
}

Both use the CmdVel Pod type. The Python side writes fields at the same memory offsets the Rust side reads from. No serialization, no translation. The Rust node sees the exact bytes the Python node wrote.

Cross-process topics use the SHM backend (~50--167 ns for Rust-to-Rust). The Python overhead (~1.5 us for typed, ~5--50 us for generic) is from the Python/Rust FFI boundary and MessagePack serialization, not from the SHM transport itself.

Namespace Isolation

By default, each terminal session gets its own SHM namespace (based on session ID and user ID). Two horus run commands in different terminals do not share topics.

To explicitly share topics across terminals or processes:

# Both processes must use the same namespace
HORUS_NAMESPACE=shared horus run my_publisher.py
HORUS_NAMESPACE=shared horus run my_subscriber.py

Standalone Topic Class

The node.send() and node.recv() methods work inside the scheduler tick loop. For code that runs outside the scheduler -- scripts, tests, debugging tools, one-shot publishers -- use the standalone Topic class directly.

from horus import Topic, CmdVel

# Create a typed topic (Pod zero-copy)
cmd_topic = Topic(CmdVel)

# Send a message
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))

# Receive a message
msg = cmd_topic.recv()
if msg is not None:
    print(f"linear={msg.linear}, angular={msg.angular}")

Constructor

Topic(msg_type, capacity=None, endpoint=None)
ParameterTypeDescription
msg_typetype or strMessage class (e.g., CmdVel) for typed topics, or a string name for generic topics
capacityint or NoneRing buffer capacity. None uses the default (1024)
endpointstr or NoneCustom topic name. None auto-derives from the type

Methods

MethodSignatureDescription
sendsend(message, node=None) -> boolWrite a message to the ring buffer. Returns True on success
recvrecv(node=None) -> Optional[Any]Read the next message. Returns None if empty

Properties

PropertyTypeDescription
namestrTopic name
msg_typetypeMessage type class
endpointstr or NoneCustom endpoint if set
backend_typestrCurrent backend (e.g., "shm", "intra")

When to Use Standalone Topics

Testing: Send test data to a node and verify its output without running the full scheduler.

from horus import Topic, CmdVel

# Inject test data
cmd_topic = Topic(CmdVel)
cmd_topic.send(CmdVel(linear=1.0, angular=0.0))

# Verify the node processed it
output_topic = Topic("output")
result = output_topic.recv()
assert result is not None

One-shot commands: Send a single command from a script and exit.

from horus import Topic, CmdVel

# Send emergency stop
Topic(CmdVel).send(CmdVel(linear=0.0, angular=0.0))

Monitoring tools: Read topic data from a separate process for visualization or logging.

from horus import Topic, Imu
import time

imu_topic = Topic(Imu)
while True:
    msg = imu_topic.recv()
    if msg:
        print(f"accel=({msg.ax:.2f}, {msg.ay:.2f}, {msg.az:.2f})")
    time.sleep(0.01)

The standalone Topic class does not participate in the scheduler's tick loop. It has no rate control, no budget enforcement, and no lifecycle management. For sustained publishing or subscribing, use horus.Node() with the scheduler.


Auto-Created Topics

Topics used in send() or recv() that were not declared in pubs/subs are created automatically on first use:

def tick(node):
    # "debug.info" was not in pubs -- auto-created as a GenericMessage topic
    node.send("debug.info", {"tick": 42, "status": "ok"})

Auto-created topics always use the GenericMessage path (string-style, ~5--50 us). If you want the typed fast path, you must declare the topic in pubs/subs with a message type.

Auto-creation is convenient for debugging and prototyping but should not be relied on in production -- undeclared topics make the data flow harder to trace and always take the slower generic path.


Design Decisions

Why three declaration styles instead of one? String topics are the simplest possible API -- you just name your data channel and send anything through it. Typed topics exist because serialization overhead matters at high frequencies. Named typed topics exist because real robots have multiple instances of the same sensor type (two cameras, four wheels) and need distinct names with the same zero-copy performance. Each style exists to serve a different need; none is universally better.

Why is the default capacity 1024 messages? A control loop at 1 kHz produces 1000 messages per second. At the default capacity, a subscriber can fall a full second behind before messages are dropped. This is generous enough for most applications while keeping memory usage bounded. Increase it for bursty workloads; decrease it if memory is constrained.

Why does recv() return None instead of blocking? Blocking in a tick function stalls the entire scheduler -- every other node waits until the blocked node returns. Returning None lets the node continue to its next tick, and the scheduler maintains its timing guarantees. This is the same design as Rust's Option-based recv() for the same reason.

Why auto-create topics? Strict declaration-only topics are safer (every data flow is explicit), but they add friction during prototyping. Auto-creation lets you add a send("debug.foo", ...) call without modifying the node constructor. The trade-off is that auto-created topics are invisible in the node's pubs/subs lists and always use the slower generic path -- a deliberate incentive to declare your topics properly for production.

Why the standalone Topic class? Nodes require a scheduler. But testing, scripting, and one-shot commands should not need a full scheduler setup. The standalone Topic class provides direct SHM access for these use cases, at the cost of no lifecycle management.


Trade-offs

GainCost
Three declaration styles -- pick the right abstraction levelMust understand the performance difference between generic and typed
Auto-created topics -- fast prototyping, no boilerplateUndeclared topics take the slow generic path; hidden data flow
1024-slot default buffer -- generous for most workloadsUses more memory than Rust's default 4-slot buffer
recv() returns None -- never blocks the schedulerMust check for None on every call (no exceptions on empty)
Pod zero-copy for typed -- 1.5 us cross-processOnly works for fixed-layout message types, not arbitrary dicts
Cross-process transparency -- same API for in-process and cross-processAll cross-process topics incur SHM overhead; cannot force in-process-only
Standalone Topic class -- works outside the schedulerNo rate control, no lifecycle, no budget enforcement

See Also