Topic API
Topic<T> is the primary communication primitive in HORUS. It provides typed publish/subscribe channels backed by shared memory for zero-copy IPC within a single machine.
Creating a Topic
use horus::prelude::*;
// Default capacity and slot size
let topic = Topic::<Twist>::new("cmd_vel");
// Custom capacity (number of slots) and slot size (bytes per slot)
let topic = Topic::<LaserScan>::with_capacity("scan", 8, 4096);
| Constructor | Returns | Description |
|---|---|---|
Topic::<T>::new(name) | Topic<T> | Create with default capacity and slot size |
Topic::<T>::with_capacity(name, capacity, slot_size) | Topic<T> | Create with explicit capacity and slot size |
Most users only need
Topic::new(name). Usewith_capacityonly if you need to tune buffer sizes for very large or very small messages.
The name string identifies the shared-memory channel. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.
Sending Messages
| Method | Returns | Description |
|---|---|---|
send(msg) | () | Publish a message (non-blocking, drops oldest if full) |
try_send(msg) | Result<(), T> | Publish if space available; returns message back on failure |
send_blocking(msg, timeout) | Result<(), SendBlockingError> | Block until space is available or timeout elapses |
SendBlockingError
| Variant | Description |
|---|---|
Timeout | Timed out waiting for space in the ring buffer |
Example
let topic = Topic::<CmdVel>::new("cmd_vel");
// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel { linear: 1.0, angular: 0.0 });
// Try without blocking
match topic.try_send(CmdVel { linear: 0.5, angular: 0.1 }) {
Ok(()) => { /* sent */ }
Err(msg) => { /* buffer full, msg returned */ }
}
// Block up to 10ms
use horus::prelude::*;
match topic.send_blocking(CmdVel { linear: 1.0, angular: 0.0 }, 10_u64.ms()) {
Ok(()) => { /* sent */ }
Err(SendBlockingError::Timeout) => { /* timed out */ }
}
Receiving Messages
| Method | Returns | Description |
|---|---|---|
recv() | Option<T> | Take the next unread message (FIFO order) |
read_latest() | Option<T> | Read the most recent message, skipping older ones (requires T: Copy) |
recv() returns messages in order. Each message is delivered to each subscriber exactly once. read_latest() is useful for state-like data (sensor readings, poses) where you only care about the newest value.
Example
let topic = Topic::<Imu>::new("imu");
// IMPORTANT: call recv() every tick to drain the buffer and avoid stale data accumulation
// Process all pending messages
while let Some(msg) = topic.recv() {
println!("Accel: ({}, {}, {})", msg.linear_accel.x, msg.linear_accel.y, msg.linear_accel.z);
}
// NOTE: read_latest() requires T: Copy — skips older messages and returns only the newest
if let Some(latest) = topic.read_latest() {
println!("Latest orientation: {:?}", latest.orientation);
}
State & Metrics
| Method | Returns | Description |
|---|---|---|
name() | &str | The topic name |
has_message() | bool | Whether there is at least one unread message |
pending_count() | usize | Number of unread messages in the buffer |
dropped_count() | u64 | Total failed send attempts (includes buffer-full overwrites) |
metrics() | TopicMetrics | Aggregate send/receive statistics |
TopicMetrics
| Method | Returns | Description |
|---|---|---|
messages_sent() | u64 | Total messages published on this topic |
messages_received() | u64 | Total messages consumed from this topic |
send_failures() | u64 | Failed send attempts (e.g., try_send on a full buffer) |
recv_failures() | u64 | Failed receive attempts (e.g., recv on empty buffer) |
Example
let topic = Topic::<Twist>::new("cmd_vel");
// Check state
if topic.has_message() {
println!("{} messages pending", topic.pending_count());
}
// Check for dropped messages
if topic.dropped_count() > 0 {
hlog!(warn, "Dropped {} messages", topic.dropped_count());
}
// Check metrics
let m = topic.metrics();
println!("Sent: {}, Received: {}", m.messages_sent(), m.messages_received());
Pool-Backed Types (Zero-Copy)
For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation to avoid copying payloads through the ring buffer. The Topic API provides specialized methods for these types.
Image Topics
let topic = Topic::<Image>::new("camera/rgb");
// Send (moves the Image into the pool slot)
topic.send(image);
// Try send (returns the Image on failure)
match topic.try_send(image) {
Ok(()) => {}
Err(img) => { /* buffer full */ }
}
// IMPORTANT: call recv() every tick to drain the buffer — images are large and stale frames waste pool slots
if let Some(img) = topic.recv() {
println!("{}x{} image received", img.width(), img.height());
}
PointCloud Topics
let topic = Topic::<PointCloud>::new("lidar/points");
topic.send(cloud);
// IMPORTANT: call recv() every tick to avoid stale point clouds filling the pool
if let Some(cloud) = topic.recv() {
println!("{} points received", cloud.len());
}
DepthImage Topics
let topic = Topic::<DepthImage>::new("camera/depth");
topic.send(depth);
// IMPORTANT: call recv() every tick to drain depth frames
if let Some(depth) = topic.recv() {
println!("{}x{} depth image", depth.width(), depth.height());
}
Tensor Topics
For advanced ML pipelines, you can use Topic<Tensor> directly. The same zero-copy transport that backs Image, PointCloud, and DepthImage is available for raw tensor data. See the Tensor Messages page for details.
let topic = Topic::<Tensor>::new("model/output");
// Send and receive work the same as any other topic type
topic.send(tensor);
// IMPORTANT: call recv() every tick to avoid stale tensor data in the pool
if let Some(t) = topic.recv() {
println!("Tensor shape: {:?}", t.shape());
}
For most use cases, prefer the domain types above (Image, PointCloud, DepthImage) — they provide convenience methods like pixel(), point_at(), and get_depth() while using the same zero-copy path internally.
Detailed Method Reference
new
pub fn new(name: &str) -> Topic<T>
Create a topic with default capacity and slot size.
Parameters:
name: &str— Topic identifier. TwoTopicinstances with the same name and matching typeTconnect automatically. Names are case-sensitive. Use/-delimited hierarchical names:"sensors/lidar/scan".
Returns: Topic<T> connected to the named shared memory channel.
Defaults:
- Capacity: 4 slots (ring buffer)
- Slot size:
size_of::<T>()rounded up to page alignment
Backend auto-selection: If publisher and subscriber are in the same process, uses an in-process ring buffer (no SHM overhead). Cross-process uses SHM.
Example:
let pub_topic = Topic::<CmdVel>::new("cmd_vel");
let sub_topic = Topic::<CmdVel>::new("cmd_vel"); // connects to same channel
send
pub fn send(&self, msg: T)
Publish a message. Non-blocking. Overwrites oldest if buffer is full.
Parameters:
msg: T— The message to publish. Moved into the ring buffer.
Buffer full behavior: If all slots are occupied, the oldest unread message is silently overwritten. No error is returned. Use dropped_count() to detect this.
When to use: Default for most real-time robotics — you always want the latest data, and blocking is unacceptable in a control loop.
Example:
topic.send(CmdVel::new(1.0, 0.0));
// Check if messages were dropped
if topic.dropped_count() > 0 {
hlog!(warn, "{} messages dropped — subscriber too slow", topic.dropped_count());
}
recv
pub fn recv(&self) -> Option<T>
Take the next unread message in FIFO order.
Returns:
Some(T)— The oldest unread message (removed from buffer)None— No unread messages available
Important: Call recv() every tick in your node to drain the buffer. Failing to drain causes stale data accumulation and eventually message drops.
Delivery guarantee: Each message is delivered to each subscriber exactly once. After recv() returns it, the message is consumed.
Example:
// Process ALL pending messages each tick
while let Some(scan) = self.scan_topic.recv() {
self.process(scan);
}
read_latest
pub fn read_latest(&self) -> Option<T>
where T: Copy
Read the most recent message, skipping all older ones.
Returns:
Some(T)— Copy of the newest messageNone— No messages available
Type constraint: Requires T: Copy because it reads without consuming — multiple calls return the same value until a new message arrives.
When to use: State-like data where you only care about the current value: poses, sensor readings, configuration. NOT for command streams where every message matters.
Example:
// Good: pose is state — only the latest matters
if let Some(pose) = self.odom_topic.read_latest() {
self.current_position = pose;
}
// Bad: cmd_vel is a command — you'd miss intermediate commands
// Use recv() instead for command streams
try_send
pub fn try_send(&self, msg: T) -> Result<(), T>
Attempt to send without overwriting. If the buffer is full, the message is returned to the caller.
Parameters:
msg: T— The message to send. Returned on failure.
Returns:
Ok(())— Sent successfullyErr(msg)— Buffer full, message returned
When to use: When you need to detect drops — implement backpressure, count losses, or log warnings. For fire-and-forget, use send() instead.
Example:
match topic.try_send(cmd) {
Ok(()) => {}
Err(_returned) => {
self.drop_count += 1;
hlog!(warn, "Buffer full — {} drops total", self.drop_count);
}
}
try_recv
pub fn try_recv(&self) -> Option<T>
Non-blocking receive. Functionally identical to recv() — both are non-blocking and return Option<T>. Provided for API symmetry with try_send().
Returns:
Some(T)— Next unread messageNone— No messages available
Example:
if let Some(msg) = topic.try_recv() {
self.process(msg);
}
send_blocking
pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>
Send with guaranteed delivery. Blocks until buffer space is available or timeout expires. Uses a 3-phase strategy: immediate try, spin loop (sub-μs), then sleep in 100μs increments.
Parameters:
msg: T— The message to sendtimeout: Duration— Maximum time to wait
Returns:
Ok(())— DeliveredErr(SendBlockingError::Timeout)— Buffer stayed full for the entire timeout
When to use: Safety-critical commands (emergency stop, motor setpoints) where dropping a message is unacceptable. NOT for high-frequency data — use send() instead.
Example:
use horus::prelude::*;
// Critical: e-stop must be delivered
match topic.send_blocking(stop_cmd, 1.ms()) {
Ok(()) => hlog!(info, "E-stop delivered"),
Err(_) => hlog!(error, "E-stop delivery FAILED — subscriber unresponsive"),
}
with_capacity
pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> HorusResult<Self>
Create a topic with custom ring buffer capacity.
Parameters:
name: &str— Topic identifiercapacity: u32— Number of slots (rounded up to next power of 2)slot_size: Option<usize>— Bytes per slot.None= auto-sized fromsize_of::<T>()
Returns: HorusResult<Topic<T>>
When to use: Bursty producers that need more buffering, or memory-constrained systems that need smaller buffers.
Example:
// 256-slot buffer for a high-frequency sensor
let topic = Topic::<Imu>::with_capacity("imu", 256, None)?;
// Small buffer to save memory on a constrained device
let topic = Topic::<Pose2D>::with_capacity("pose", 4, None)?;
name
pub fn name(&self) -> &str
Returns the topic name string.
Example:
let topic = Topic::<f32>::new("sensor.temperature")?;
assert_eq!(topic.name(), "sensor.temperature");
has_message
pub fn has_message(&self) -> bool
Check if at least one unread message is available without consuming it.
Example:
if topic.has_message() {
let msg = topic.recv().unwrap(); // guaranteed Some
}
pending_count
pub fn pending_count(&self) -> u64
Number of unread messages in the buffer.
Example:
if topic.pending_count() > 10 {
hlog!(warn, "Topic '{}' has {} pending — subscriber may be too slow",
topic.name(), topic.pending_count());
}
dropped_count
pub fn dropped_count(&self) -> u64
Total messages dropped due to buffer overflow (oldest overwritten by send()). Resets to 0 on topic creation.
Example:
if topic.dropped_count() > 0 {
hlog!(warn, "{} messages dropped on '{}' — consider increasing capacity or subscriber rate",
topic.dropped_count(), topic.name());
}
metrics
pub fn metrics(&self) -> TopicMetrics
Aggregated send/receive statistics.
Returns: TopicMetrics with methods:
messages_sent() -> u64messages_received() -> u64send_failures() -> u64recv_failures() -> u64
Example:
let m = topic.metrics();
println!("Sent: {}, Received: {}, Failures: send={} recv={}",
m.messages_sent(), m.messages_received(),
m.send_failures(), m.recv_failures());
Using Topics in Nodes
Topics are typically used inside Node implementations registered with a Scheduler:
use horus::prelude::*;
struct LidarProcessor {
scan_in: Topic<LaserScan>,
cmd_out: Topic<CmdVel>,
}
impl LidarProcessor {
fn new() -> Self {
Self {
scan_in: Topic::new("scan"),
cmd_out: Topic::new("cmd_vel"),
}
}
}
impl Node for LidarProcessor {
fn name(&self) -> &str { "LidarProcessor" }
fn tick(&mut self) {
// IMPORTANT: call recv() every tick to drain the scan buffer
if let Some(scan) = self.scan_in.recv() {
// Find closest obstacle
let min_range = scan.ranges.iter()
.copied()
.filter(|r| *r > scan.range_min && *r < scan.range_max)
.fold(f32::MAX, f32::min);
// SAFETY: send zero velocity when obstacle is too close
let speed = if min_range < 0.5 { 0.0 } else { 1.0 };
self.cmd_out.send(CmdVel { linear: speed, angular: 0.0 });
}
}
// SAFETY: stop sending velocity commands on shutdown
fn shutdown(&mut self) -> Result<()> {
self.cmd_out.send(CmdVel { linear: 0.0, angular: 0.0 });
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
scheduler.add(LidarProcessor::new())
// NOTE: .rate() auto-enables RT scheduling with 80% budget and 95% deadline
.rate(50.hz())
.build()?;
scheduler.run()
}
Introspection & Diagnostics
| Method | Returns | Description |
|---|---|---|
name() | &str | Topic name |
has_message() | bool | At least one message available |
pending_count() | u64 | Messages waiting to be consumed |
dropped_count() | u64 | Messages dropped (buffer overflow) |
metrics() | TopicMetrics | Aggregated send/recv/failure counts |
// Monitor topic health
let m = topic.metrics();
if topic.dropped_count() > 0 {
hlog!(warn, "Topic '{}': {} messages dropped, {} pending",
topic.name(), topic.dropped_count(), topic.pending_count());
}
Real-World Scenarios
Emergency Stop Chain
Use send_blocking() for critical commands that must never be dropped:
struct SafetyMonitor {
estop_pub: Topic<EmergencyStop>,
}
impl Node for SafetyMonitor {
fn name(&self) -> &str { "SafetyMonitor" }
fn tick(&mut self) {
if self.detect_collision() {
let stop = EmergencyStop {
engaged: 1,
reason: *b"Collision detected\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
..Default::default()
};
// Block up to 1ms — guaranteed delivery for safety-critical commands
let _ = self.estop_pub.send_blocking(stop, 1.ms());
}
}
}
Multi-Sensor Fusion
Multiple subscribers reading from different sensors, fusing into a single output:
struct FusionNode {
imu_sub: Topic<Imu>,
odom_sub: Topic<Odometry>,
pose_pub: Topic<Pose3D>,
last_imu: Option<Imu>,
last_odom: Option<Odometry>,
}
impl Node for FusionNode {
fn name(&self) -> &str { "Fusion" }
fn tick(&mut self) {
// Always drain both topics every tick
if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }
if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
let fused = self.fuse(imu, odom);
self.pose_pub.send(fused);
}
}
}
See Also
- Topic Concepts — Architecture, backends, and design patterns
- Communication Overview — When to use topics vs services vs actions
- Image API — Pool-backed camera images
- PointCloud API — Pool-backed 3D point clouds
- DepthImage API — Pool-backed depth images
- Tensor Messages — Tensor, Device, TensorDtype, and domain types
- Scheduler API — Running nodes that use topics
- Python Topic API — Python topic bindings