Topic API
Topic provides standalone pub/sub communication outside the node lifecycle — for scripts, tests, tools, and monitoring. Inside nodes, use node.send() / node.recv() instead.
import horus
topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))
Constructor
horus.Topic(
msg_type, # Message class or string name
capacity=1024, # Ring buffer capacity
endpoint=None, # Network endpoint (e.g., "topic@host:port")
)
| Parameter | Type | Default | Description |
|---|---|---|---|
msg_type | class or str | required | horus.CmdVel for typed, "my_topic" for generic |
capacity | int | 1024 | Ring buffer slot count |
endpoint | str or None | None | Network endpoint for cross-process topics |
Typed vs String
# Typed — zero-copy POD transport (~1.7μs)
topic = horus.Topic(horus.CmdVel)
topic = horus.Topic(horus.Imu, capacity=64)
# String — GenericMessage with MessagePack serialization (~6-50μs)
topic = horus.Topic("my_data")
Methods
| Method | Signature | Returns | Description |
|---|---|---|---|
send | send(message) -> bool | True on success | Publish a message |
recv | recv() -> Any or None | Message or None | Get latest message |
Properties
| Property | Type | Description |
|---|---|---|
name | str | Topic name |
is_network_topic | bool | Whether this is a network-backed topic |
endpoint | str or None | Network endpoint string |
backend_type | str | Backend type name (e.g., "SpscIntra", "PodShm") |
Typed Topics (Zero-Copy)
Typed topics use the same fixed-size #[repr(C)] message types as Rust. Data transfers through shared memory with no serialization — just a memcpy of the struct.
import horus
# Publisher
pub = horus.Topic(horus.CmdVel)
pub.send(horus.CmdVel(linear=1.0, angular=0.5))
# Subscriber (same or different process)
sub = horus.Topic(horus.CmdVel)
msg = sub.recv()
if msg:
print(f"linear={msg.linear}, angular={msg.angular}")
Performance: ~1.7μs per send+recv (Python overhead from PyO3 + GIL). The underlying Rust transport is ~14-89ns.
Cross-language: Typed topics are binary-compatible between Rust and Python. A Python node publishing horus.CmdVel is received by a Rust Topic<CmdVel> subscriber with zero conversion.
GenericMessage (String Topics)
When you use a string name, Python dicts are serialized via MessagePack into a GenericMessage.
# These work
node.send("data", {"x": 1.0, "y": 2.0}) # dict
node.send("data", [1, 2, 3]) # list
node.send("data", "hello") # string
node.send("data", {"nested": {"a": [1, 2]}}) # nested
# These fail
node.send("data", my_custom_object) # TypeError
node.send("data", lambda: 42) # TypeError
Size limit: 4KB max per message. Messages exceeding 4KB are spilled to a TensorPool region automatically, but very large dicts (>100KB) should use typed messages or Image/PointCloud instead.
Serializable types: Any type MessagePack handles — dict, list, str, int, float, bool, None, bytes. Nested structures work. Custom classes do NOT work unless you convert to dict first.
Error behavior: If serialization fails, send() raises TypeError.
Performance: ~6-12μs for small dicts, ~50-110μs for large dicts (50+ keys).
Cross-language: GenericMessage does NOT cross to Rust nodes. Rust nodes use typed Topic<CmdVel> — they cannot receive Python dicts. For cross-language communication, use typed message classes.
Performance Comparison
| Transport | Latency | Use Case |
|---|---|---|
Typed topic (horus.CmdVel) | ~1.7μs | Control loops, sensor data, any cross-language |
| GenericMessage (dict) | ~6-50μs | Prototyping, config data, Python-only nodes |
| Image zero-copy (DLPack) | ~1.1μs | Camera frames to ML inference |
Rule of thumb: Use typed topics for anything running faster than 10 Hz or crossing the Rust/Python boundary.
Cross-Language Compatibility
| Type | Python → Rust | Rust → Python | Transport |
|---|---|---|---|
Typed messages (CmdVel, Imu, etc.) | Yes | Yes | Zero-copy Pod |
Image, PointCloud, DepthImage | Yes | Yes | Pool-backed descriptor |
| Python dicts (GenericMessage) | No | No | Python-only (MessagePack) |
Examples
One-Shot Publisher Script
import horus
topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))
print("Sent velocity command")
Topic Monitor Script
import horus
import time
topic = horus.Topic(horus.Imu)
while True:
msg = topic.recv()
if msg:
print(f"accel_z={msg.accel_z:.2f}")
time.sleep(0.01)
Inside Nodes vs Standalone
Inside a Node, use node.send() / node.recv() — these use the topics declared in pubs / subs:
# Inside node — uses auto-created topics from pubs/subs
def tick(node):
node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))
# Outside node — standalone Topic for scripts/tools
topic = horus.Topic(horus.CmdVel)
topic.send(horus.CmdVel(linear=1.0, angular=0.0))
Backpressure
Topics use a fixed-size ring buffer. When the buffer is full, the oldest message is dropped — send() never blocks.
Buffer capacity: 4
send(A) → [A, _, _, _]
send(B) → [A, B, _, _]
send(C) → [A, B, C, _]
send(D) → [A, B, C, D] ← buffer full
send(E) → [B, C, D, E] ← A dropped (oldest)
Implications:
- Slow consumers miss messages — the producer is never throttled
recv()always returns the most recent message (or None)recv_all()drains whatever is in the buffer (may be fewer than sent)- Default capacity is 1024 slots — sufficient for most robotics rates
When to increase capacity:
- High-rate producer (>1kHz) with slow consumer — increase to avoid drops
- Recording all messages — increase to match burst rate
When to decrease capacity:
- Memory-constrained system — reduce to lower SHM usage
- Latest-value semantics only (motor commands) — capacity=2 is fine
# High-rate lidar (40Hz, large scans) — small capacity, latest-only
topic = horus.Topic(horus.LaserScan, capacity=4)
# Event logging — large capacity to avoid drops
topic = horus.Topic("events", capacity=8192)
Network Topics
Topics can bridge across processes and machines using the endpoint parameter:
# Cross-process on same machine (automatic via SHM — no endpoint needed)
topic = horus.Topic(horus.CmdVel)
# Cross-machine via network
topic = horus.Topic(horus.CmdVel, endpoint="cmd_vel@192.168.1.10:9000")
Network topics use UDP for low-latency transport. The endpoint format is topic_name@host:port.
Note: Most robotics use cases run on a single machine — cross-process topics use shared memory automatically with no configuration. Network topics are for multi-robot systems or distributed architectures.
Pool-Backed Types
Image, PointCloud, and DepthImage are NOT transferred through the ring buffer. Instead, the data lives in a shared memory pool, and only a descriptor (~8 bytes) passes through the topic:
Producer Consumer
│ │
├─ Allocate 640×480 image │
│ in SHM pool │
├─ Write pixels to pool │
├─ send(descriptor) ─────────────►│ recv() returns descriptor
│ (8 bytes through ring buffer) ├─ to_numpy() → view into same SHM
│ │ (zero-copy, ~1.1μs)
This means:
- 640×480 RGB image = 921,600 bytes of pixel data, but only 8 bytes transit the ring buffer
to_numpy()andnp.from_dlpack()return views into the pool — no copy- The pool handles allocation, reference counting, and recycling automatically
# Sending an image — pool allocation is automatic
img = horus.Image(480, 640, "rgb8")
topic.send(img) # Sends 8-byte descriptor, not 900KB of pixels
# Receiving — zero-copy view into shared pool
received = topic.recv()
frame = received.to_numpy() # (480, 640, 3) uint8 — backed by SHM pool
See NumPy & Zero-Copy for detailed patterns and performance data.
Design Decisions
Why pull-based (recv()) not push-based (callbacks)? Pull-based keeps timing deterministic — your tick controls when data is consumed. Callbacks fire at arbitrary times, making budget compliance impossible. The scheduler needs to know exactly how long your tick takes.
Why ring buffer, not a queue? A queue grows without bound if the consumer is slow — dangerous for embedded systems with fixed memory. A ring buffer has fixed size and drops oldest messages on overflow. This matches robotics semantics: the latest sensor reading is more useful than a stale one.
Why overflow drops oldest, not newest? In robotics, the latest data is always more relevant. A 50ms-old LiDAR scan is more useful for obstacle avoidance than a 200ms-old one. Dropping the oldest preserves freshness.
Why pool-backed Image instead of ring buffer? A 640×480 RGB image is 900KB — too large for the ring buffer. Pool allocation puts the data in a separate SHM region and passes only a descriptor (~8 bytes) through the ring buffer. This gives zero-copy semantics at any image size.
Trade-offs
| Choice | Benefit | Cost |
|---|---|---|
| Ring buffer (fixed size) | Bounded memory, never blocks | Messages dropped on overflow |
| Drop oldest on overflow | Latest data always available | Historical messages lost |
| Pull-based recv() | Deterministic timing | Must poll every tick |
| Pool-backed Image/PointCloud | Zero-copy at any size | Pool memory pre-allocated |
| Auto-topic creation | Works without declaration | Undeclared topics invisible to monitoring |
See Also
- Node API — Node send/recv (auto-created topics from pubs/subs)
- Standard Messages — 75+ typed message types
- NumPy & Zero-Copy — DLPack and pool-backed image patterns
- Shared Memory — How zero-copy transport works
- Topics Deep-Dive — Topic patterns and best practices
- Rust Topic API — Rust equivalent (704 lines)