Topic API

Topic provides standalone pub/sub communication outside the node lifecycle — for scripts, tests, tools, and monitoring. Inside nodes, use node.send() / node.recv() instead.

import horus

topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))

Constructor

horus.Topic(
    msg_type,                    # Message class or string name
    capacity=1024,               # Ring buffer capacity
    endpoint=None,               # Network endpoint (e.g., "topic@host:port")
)
ParameterTypeDefaultDescription
msg_typeclass or strrequiredhorus.CmdVel for typed, "my_topic" for generic
capacityint1024Ring buffer slot count
endpointstr or NoneNoneNetwork endpoint for cross-process topics

Typed vs String

# Typed — zero-copy POD transport (~1.7μs)
topic = horus.Topic(horus.CmdVel)
topic = horus.Topic(horus.Imu, capacity=64)

# String — GenericMessage with MessagePack serialization (~6-50μs)
topic = horus.Topic("my_data")

Methods

MethodSignatureReturnsDescription
sendsend(message) -> boolTrue on successPublish a message
recvrecv() -> Any or NoneMessage or NoneGet latest message

Properties

PropertyTypeDescription
namestrTopic name
is_network_topicboolWhether this is a network-backed topic
endpointstr or NoneNetwork endpoint string
backend_typestrBackend type name (e.g., "SpscIntra", "PodShm")

Typed Topics (Zero-Copy)

Typed topics use the same fixed-size #[repr(C)] message types as Rust. Data transfers through shared memory with no serialization — just a memcpy of the struct.

import horus

# Publisher
pub = horus.Topic(horus.CmdVel)
pub.send(horus.CmdVel(linear=1.0, angular=0.5))

# Subscriber (same or different process)
sub = horus.Topic(horus.CmdVel)
msg = sub.recv()
if msg:
    print(f"linear={msg.linear}, angular={msg.angular}")

Performance: ~1.7μs per send+recv (Python overhead from PyO3 + GIL). The underlying Rust transport is ~14-89ns.

Cross-language: Typed topics are binary-compatible between Rust and Python. A Python node publishing horus.CmdVel is received by a Rust Topic<CmdVel> subscriber with zero conversion.


GenericMessage (String Topics)

When you use a string name, Python dicts are serialized via MessagePack into a GenericMessage.

# These work
node.send("data", {"x": 1.0, "y": 2.0})              # dict
node.send("data", [1, 2, 3])                           # list
node.send("data", "hello")                              # string
node.send("data", {"nested": {"a": [1, 2]}})           # nested

# These fail
node.send("data", my_custom_object)                     # TypeError
node.send("data", lambda: 42)                           # TypeError

Size limit: 4KB max per message. Messages exceeding 4KB are spilled to a TensorPool region automatically, but very large dicts (>100KB) should use typed messages or Image/PointCloud instead.

Serializable types: Any type MessagePack handles — dict, list, str, int, float, bool, None, bytes. Nested structures work. Custom classes do NOT work unless you convert to dict first.

Error behavior: If serialization fails, send() raises TypeError.

Performance: ~6-12μs for small dicts, ~50-110μs for large dicts (50+ keys).

Cross-language: GenericMessage does NOT cross to Rust nodes. Rust nodes use typed Topic<CmdVel> — they cannot receive Python dicts. For cross-language communication, use typed message classes.


Performance Comparison

TransportLatencyUse Case
Typed topic (horus.CmdVel)~1.7μsControl loops, sensor data, any cross-language
GenericMessage (dict)~6-50μsPrototyping, config data, Python-only nodes
Image zero-copy (DLPack)~1.1μsCamera frames to ML inference

Rule of thumb: Use typed topics for anything running faster than 10 Hz or crossing the Rust/Python boundary.


Cross-Language Compatibility

TypePython → RustRust → PythonTransport
Typed messages (CmdVel, Imu, etc.)YesYesZero-copy Pod
Image, PointCloud, DepthImageYesYesPool-backed descriptor
Python dicts (GenericMessage)NoNoPython-only (MessagePack)

Examples

One-Shot Publisher Script

import horus

topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))
print("Sent velocity command")

Topic Monitor Script

import horus
import time

topic = horus.Topic(horus.Imu)

while True:
    msg = topic.recv()
    if msg:
        print(f"accel_z={msg.accel_z:.2f}")
    time.sleep(0.01)

Inside Nodes vs Standalone

Inside a Node, use node.send() / node.recv() — these use the topics declared in pubs / subs:

# Inside node — uses auto-created topics from pubs/subs
def tick(node):
    node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))

# Outside node — standalone Topic for scripts/tools
topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))

Backpressure

Topics use a fixed-size ring buffer. When the buffer is full, the oldest message is dropped — send() never blocks.

Buffer capacity: 4
send(A) → [A, _, _, _]
send(B) → [A, B, _, _]
send(C) → [A, B, C, _]
send(D) → [A, B, C, D]  ← buffer full
send(E) → [B, C, D, E]  ← A dropped (oldest)

Implications:

  • Slow consumers miss messages — the producer is never throttled
  • recv() always returns the most recent message (or None)
  • recv_all() drains whatever is in the buffer (may be fewer than sent)
  • Default capacity is 1024 slots — sufficient for most robotics rates

When to increase capacity:

  • High-rate producer (>1kHz) with slow consumer — increase to avoid drops
  • Recording all messages — increase to match burst rate

When to decrease capacity:

  • Memory-constrained system — reduce to lower SHM usage
  • Latest-value semantics only (motor commands) — capacity=2 is fine
# High-rate lidar (40Hz, large scans) — small capacity, latest-only
topic = horus.Topic(horus.LaserScan, capacity=4)

# Event logging — large capacity to avoid drops
topic = horus.Topic("events", capacity=8192)

Network Topics

Topics can bridge across processes and machines using the endpoint parameter:

# Cross-process on same machine (automatic via SHM — no endpoint needed)
topic = horus.Topic(horus.CmdVel)

# Cross-machine via network
topic = horus.Topic(horus.CmdVel, endpoint="cmd_vel@192.168.1.10:9000")

Network topics use UDP for low-latency transport. The endpoint format is topic_name@host:port.

Note: Most robotics use cases run on a single machine — cross-process topics use shared memory automatically with no configuration. Network topics are for multi-robot systems or distributed architectures.


Pool-Backed Types

Image, PointCloud, and DepthImage are NOT transferred through the ring buffer. Instead, the data lives in a shared memory pool, and only a descriptor (~8 bytes) passes through the topic:

Producer                          Consumer
  │                                  │
  ├─ Allocate 640×480 image         │
  │  in SHM pool                    │
  ├─ Write pixels to pool           │
  ├─ send(descriptor) ─────────────►│ recv() returns descriptor
  │  (8 bytes through ring buffer)  ├─ to_numpy() → view into same SHM
  │                                  │  (zero-copy, ~1.1μs)

This means:

  • 640×480 RGB image = 921,600 bytes of pixel data, but only 8 bytes transit the ring buffer
  • to_numpy() and np.from_dlpack() return views into the pool — no copy
  • The pool handles allocation, reference counting, and recycling automatically
# Sending an image — pool allocation is automatic
img = horus.Image(480, 640, "rgb8")
topic.send(img)  # Sends 8-byte descriptor, not 900KB of pixels

# Receiving — zero-copy view into shared pool
received = topic.recv()
frame = received.to_numpy()  # (480, 640, 3) uint8 — backed by SHM pool

See NumPy & Zero-Copy for detailed patterns and performance data.


Design Decisions

Why pull-based (recv()) not push-based (callbacks)? Pull-based keeps timing deterministic — your tick controls when data is consumed. Callbacks fire at arbitrary times, making budget compliance impossible. The scheduler needs to know exactly how long your tick takes.

Why ring buffer, not a queue? A queue grows without bound if the consumer is slow — dangerous for embedded systems with fixed memory. A ring buffer has fixed size and drops oldest messages on overflow. This matches robotics semantics: the latest sensor reading is more useful than a stale one.

Why overflow drops oldest, not newest? In robotics, the latest data is always more relevant. A 50ms-old LiDAR scan is more useful for obstacle avoidance than a 200ms-old one. Dropping the oldest preserves freshness.

Why pool-backed Image instead of ring buffer? A 640×480 RGB image is 900KB — too large for the ring buffer. Pool allocation puts the data in a separate SHM region and passes only a descriptor (~8 bytes) through the ring buffer. This gives zero-copy semantics at any image size.


Trade-offs

ChoiceBenefitCost
Ring buffer (fixed size)Bounded memory, never blocksMessages dropped on overflow
Drop oldest on overflowLatest data always availableHistorical messages lost
Pull-based recv()Deterministic timingMust poll every tick
Pool-backed Image/PointCloudZero-copy at any sizePool memory pre-allocated
Auto-topic creationWorks without declarationUndeclared topics invisible to monitoring

See Also