Topic API

Topic<T> is the primary communication primitive in HORUS. It provides typed publish/subscribe channels backed by shared memory for zero-copy IPC within a single machine.

Creating a Topic

use horus::prelude::*;

// Default capacity and slot size
let topic = Topic::<Twist>::new("cmd_vel");

// Custom capacity (number of slots) and slot size (bytes per slot)
let topic = Topic::<LaserScan>::with_capacity("scan", 8, 4096);
ConstructorReturnsDescription
Topic::<T>::new(name)Topic<T>Create with default capacity and slot size
Topic::<T>::with_capacity(name, capacity, slot_size)Topic<T>Create with explicit capacity and slot size

Most users only need Topic::new(name). Use with_capacity only if you need to tune buffer sizes for very large or very small messages.

The name string identifies the shared-memory channel. Two Topic instances with the same name and type connect automatically — one publishes, the other subscribes. Backend selection (in-process ring buffer vs cross-process SHM) is automatic based on whether publisher and subscriber live in the same process.


Sending Messages

MethodReturnsDescription
send(msg)()Publish a message (non-blocking, drops oldest if full)
try_send(msg)Result<(), T>Publish if space available; returns message back on failure
send_blocking(msg, timeout)Result<(), SendBlockingError>Block until space is available or timeout elapses

SendBlockingError

VariantDescription
TimeoutTimed out waiting for space in the ring buffer

Example

let topic = Topic::<CmdVel>::new("cmd_vel");

// Fire-and-forget (overwrites oldest if full)
topic.send(CmdVel { linear: 1.0, angular: 0.0 });

// Try without blocking
match topic.try_send(CmdVel { linear: 0.5, angular: 0.1 }) {
    Ok(()) => { /* sent */ }
    Err(msg) => { /* buffer full, msg returned */ }
}

// Block up to 10ms
use horus::prelude::*;
match topic.send_blocking(CmdVel { linear: 1.0, angular: 0.0 }, 10_u64.ms()) {
    Ok(()) => { /* sent */ }
    Err(SendBlockingError::Timeout) => { /* timed out */ }
}

Receiving Messages

MethodReturnsDescription
recv()Option<T>Take the next unread message (FIFO order)
read_latest()Option<T>Read the most recent message, skipping older ones (requires T: Copy)

recv() returns messages in order. Each message is delivered to each subscriber exactly once. read_latest() is useful for state-like data (sensor readings, poses) where you only care about the newest value.

Example

let topic = Topic::<Imu>::new("imu");

// IMPORTANT: call recv() every tick to drain the buffer and avoid stale data accumulation
// Process all pending messages
while let Some(msg) = topic.recv() {
    println!("Accel: ({}, {}, {})", msg.linear_accel.x, msg.linear_accel.y, msg.linear_accel.z);
}

// NOTE: read_latest() requires T: Copy — skips older messages and returns only the newest
if let Some(latest) = topic.read_latest() {
    println!("Latest orientation: {:?}", latest.orientation);
}

State & Metrics

MethodReturnsDescription
name()&strThe topic name
has_message()boolWhether there is at least one unread message
pending_count()usizeNumber of unread messages in the buffer
dropped_count()u64Total failed send attempts (includes buffer-full overwrites)
metrics()TopicMetricsAggregate send/receive statistics

TopicMetrics

MethodReturnsDescription
messages_sent()u64Total messages published on this topic
messages_received()u64Total messages consumed from this topic
send_failures()u64Failed send attempts (e.g., try_send on a full buffer)
recv_failures()u64Failed receive attempts (e.g., recv on empty buffer)

Example

let topic = Topic::<Twist>::new("cmd_vel");

// Check state
if topic.has_message() {
    println!("{} messages pending", topic.pending_count());
}

// Check for dropped messages
if topic.dropped_count() > 0 {
    hlog!(warn, "Dropped {} messages", topic.dropped_count());
}

// Check metrics
let m = topic.metrics();
println!("Sent: {}, Received: {}", m.messages_sent(), m.messages_received());

Pool-Backed Types (Zero-Copy)

For large data types (Image, PointCloud, DepthImage, Tensor), HORUS uses pool-backed allocation to avoid copying payloads through the ring buffer. The Topic API provides specialized methods for these types.

Image Topics

let topic = Topic::<Image>::new("camera/rgb");

// Send (moves the Image into the pool slot)
topic.send(image);

// Try send (returns the Image on failure)
match topic.try_send(image) {
    Ok(()) => {}
    Err(img) => { /* buffer full */ }
}

// IMPORTANT: call recv() every tick to drain the buffer — images are large and stale frames waste pool slots
if let Some(img) = topic.recv() {
    println!("{}x{} image received", img.width(), img.height());
}

PointCloud Topics

let topic = Topic::<PointCloud>::new("lidar/points");

topic.send(cloud);

// IMPORTANT: call recv() every tick to avoid stale point clouds filling the pool
if let Some(cloud) = topic.recv() {
    println!("{} points received", cloud.len());
}

DepthImage Topics

let topic = Topic::<DepthImage>::new("camera/depth");

topic.send(depth);

// IMPORTANT: call recv() every tick to drain depth frames
if let Some(depth) = topic.recv() {
    println!("{}x{} depth image", depth.width(), depth.height());
}

Tensor Topics

For advanced ML pipelines, you can use Topic<Tensor> directly. The same zero-copy transport that backs Image, PointCloud, and DepthImage is available for raw tensor data. See the Tensor Messages page for details.

let topic = Topic::<Tensor>::new("model/output");

// Send and receive work the same as any other topic type
topic.send(tensor);

// IMPORTANT: call recv() every tick to avoid stale tensor data in the pool
if let Some(t) = topic.recv() {
    println!("Tensor shape: {:?}", t.shape());
}

For most use cases, prefer the domain types above (Image, PointCloud, DepthImage) — they provide convenience methods like pixel(), point_at(), and get_depth() while using the same zero-copy path internally.


Detailed Method Reference


new

pub fn new(name: &str) -> Topic<T>

Create a topic with default capacity and slot size.

Parameters:

  • name: &str — Topic identifier. Two Topic instances with the same name and matching type T connect automatically. Names are case-sensitive. Use /-delimited hierarchical names: "sensors/lidar/scan".

Returns: Topic<T> connected to the named shared memory channel.

Defaults:

  • Capacity: 4 slots (ring buffer)
  • Slot size: size_of::<T>() rounded up to page alignment

Backend auto-selection: If publisher and subscriber are in the same process, uses an in-process ring buffer (no SHM overhead). Cross-process uses SHM.

Example:

let pub_topic = Topic::<CmdVel>::new("cmd_vel");
let sub_topic = Topic::<CmdVel>::new("cmd_vel"); // connects to same channel

send

pub fn send(&self, msg: T)

Publish a message. Non-blocking. Overwrites oldest if buffer is full.

Parameters:

  • msg: T — The message to publish. Moved into the ring buffer.

Buffer full behavior: If all slots are occupied, the oldest unread message is silently overwritten. No error is returned. Use dropped_count() to detect this.

When to use: Default for most real-time robotics — you always want the latest data, and blocking is unacceptable in a control loop.

Example:

topic.send(CmdVel::new(1.0, 0.0));

// Check if messages were dropped
if topic.dropped_count() > 0 {
    hlog!(warn, "{} messages dropped — subscriber too slow", topic.dropped_count());
}

recv

pub fn recv(&self) -> Option<T>

Take the next unread message in FIFO order.

Returns:

  • Some(T) — The oldest unread message (removed from buffer)
  • None — No unread messages available

Important: Call recv() every tick in your node to drain the buffer. Failing to drain causes stale data accumulation and eventually message drops.

Delivery guarantee: Each message is delivered to each subscriber exactly once. After recv() returns it, the message is consumed.

Example:

// Process ALL pending messages each tick
while let Some(scan) = self.scan_topic.recv() {
    self.process(scan);
}

read_latest

pub fn read_latest(&self) -> Option<T>
where T: Copy

Read the most recent message, skipping all older ones.

Returns:

  • Some(T) — Copy of the newest message
  • None — No messages available

Type constraint: Requires T: Copy because it reads without consuming — multiple calls return the same value until a new message arrives.

When to use: State-like data where you only care about the current value: poses, sensor readings, configuration. NOT for command streams where every message matters.

Example:

// Good: pose is state — only the latest matters
if let Some(pose) = self.odom_topic.read_latest() {
    self.current_position = pose;
}

// Bad: cmd_vel is a command — you'd miss intermediate commands
// Use recv() instead for command streams

try_send

pub fn try_send(&self, msg: T) -> Result<(), T>

Attempt to send without overwriting. If the buffer is full, the message is returned to the caller.

Parameters:

  • msg: T — The message to send. Returned on failure.

Returns:

  • Ok(()) — Sent successfully
  • Err(msg) — Buffer full, message returned

When to use: When you need to detect drops — implement backpressure, count losses, or log warnings. For fire-and-forget, use send() instead.

Example:

match topic.try_send(cmd) {
    Ok(()) => {}
    Err(_returned) => {
        self.drop_count += 1;
        hlog!(warn, "Buffer full — {} drops total", self.drop_count);
    }
}

try_recv

pub fn try_recv(&self) -> Option<T>

Non-blocking receive. Functionally identical to recv() — both are non-blocking and return Option<T>. Provided for API symmetry with try_send().

Returns:

  • Some(T) — Next unread message
  • None — No messages available

Example:

if let Some(msg) = topic.try_recv() {
    self.process(msg);
}

send_blocking

pub fn send_blocking(&self, msg: T, timeout: Duration) -> Result<(), SendBlockingError>

Send with guaranteed delivery. Blocks until buffer space is available or timeout expires. Uses a 3-phase strategy: immediate try, spin loop (sub-μs), then sleep in 100μs increments.

Parameters:

  • msg: T — The message to send
  • timeout: Duration — Maximum time to wait

Returns:

  • Ok(()) — Delivered
  • Err(SendBlockingError::Timeout) — Buffer stayed full for the entire timeout

When to use: Safety-critical commands (emergency stop, motor setpoints) where dropping a message is unacceptable. NOT for high-frequency data — use send() instead.

Example:

use horus::prelude::*;

// Critical: e-stop must be delivered
match topic.send_blocking(stop_cmd, 1.ms()) {
    Ok(()) => hlog!(info, "E-stop delivered"),
    Err(_) => hlog!(error, "E-stop delivery FAILED — subscriber unresponsive"),
}

with_capacity

pub fn with_capacity(name: &str, capacity: u32, slot_size: Option<usize>) -> HorusResult<Self>

Create a topic with custom ring buffer capacity.

Parameters:

  • name: &str — Topic identifier
  • capacity: u32 — Number of slots (rounded up to next power of 2)
  • slot_size: Option<usize> — Bytes per slot. None = auto-sized from size_of::<T>()

Returns: HorusResult<Topic<T>>

When to use: Bursty producers that need more buffering, or memory-constrained systems that need smaller buffers.

Example:

// 256-slot buffer for a high-frequency sensor
let topic = Topic::<Imu>::with_capacity("imu", 256, None)?;

// Small buffer to save memory on a constrained device
let topic = Topic::<Pose2D>::with_capacity("pose", 4, None)?;

name

pub fn name(&self) -> &str

Returns the topic name string.

Example:

let topic = Topic::<f32>::new("sensor.temperature")?;
assert_eq!(topic.name(), "sensor.temperature");

has_message

pub fn has_message(&self) -> bool

Check if at least one unread message is available without consuming it.

Example:

if topic.has_message() {
    let msg = topic.recv().unwrap();  // guaranteed Some
}

pending_count

pub fn pending_count(&self) -> u64

Number of unread messages in the buffer.

Example:

if topic.pending_count() > 10 {
    hlog!(warn, "Topic '{}' has {} pending — subscriber may be too slow",
        topic.name(), topic.pending_count());
}

dropped_count

pub fn dropped_count(&self) -> u64

Total messages dropped due to buffer overflow (oldest overwritten by send()). Resets to 0 on topic creation.

Example:

if topic.dropped_count() > 0 {
    hlog!(warn, "{} messages dropped on '{}' — consider increasing capacity or subscriber rate",
        topic.dropped_count(), topic.name());
}

metrics

pub fn metrics(&self) -> TopicMetrics

Aggregated send/receive statistics.

Returns: TopicMetrics with methods:

  • messages_sent() -> u64
  • messages_received() -> u64
  • send_failures() -> u64
  • recv_failures() -> u64

Example:

let m = topic.metrics();
println!("Sent: {}, Received: {}, Failures: send={} recv={}",
    m.messages_sent(), m.messages_received(),
    m.send_failures(), m.recv_failures());

Using Topics in Nodes

Topics are typically used inside Node implementations registered with a Scheduler:

use horus::prelude::*;

struct LidarProcessor {
    scan_in: Topic<LaserScan>,
    cmd_out: Topic<CmdVel>,
}

impl LidarProcessor {
    fn new() -> Self {
        Self {
            scan_in: Topic::new("scan"),
            cmd_out: Topic::new("cmd_vel"),
        }
    }
}

impl Node for LidarProcessor {
    fn name(&self) -> &str { "LidarProcessor" }

    fn tick(&mut self) {
        // IMPORTANT: call recv() every tick to drain the scan buffer
        if let Some(scan) = self.scan_in.recv() {
            // Find closest obstacle
            let min_range = scan.ranges.iter()
                .copied()
                .filter(|r| *r > scan.range_min && *r < scan.range_max)
                .fold(f32::MAX, f32::min);

            // SAFETY: send zero velocity when obstacle is too close
            let speed = if min_range < 0.5 { 0.0 } else { 1.0 };
            self.cmd_out.send(CmdVel { linear: speed, angular: 0.0 });
        }
    }

    // SAFETY: stop sending velocity commands on shutdown
    fn shutdown(&mut self) -> Result<()> {
        self.cmd_out.send(CmdVel { linear: 0.0, angular: 0.0 });
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();
    scheduler.add(LidarProcessor::new())
        // NOTE: .rate() auto-enables RT scheduling with 80% budget and 95% deadline
        .rate(50.hz())
        .build()?;
    scheduler.run()
}

Introspection & Diagnostics

MethodReturnsDescription
name()&strTopic name
has_message()boolAt least one message available
pending_count()u64Messages waiting to be consumed
dropped_count()u64Messages dropped (buffer overflow)
metrics()TopicMetricsAggregated send/recv/failure counts
// Monitor topic health
let m = topic.metrics();
if topic.dropped_count() > 0 {
    hlog!(warn, "Topic '{}': {} messages dropped, {} pending",
        topic.name(), topic.dropped_count(), topic.pending_count());
}

Real-World Scenarios

Emergency Stop Chain

Use send_blocking() for critical commands that must never be dropped:

struct SafetyMonitor {
    estop_pub: Topic<EmergencyStop>,
}

impl Node for SafetyMonitor {
    fn name(&self) -> &str { "SafetyMonitor" }

    fn tick(&mut self) {
        if self.detect_collision() {
            let stop = EmergencyStop {
                engaged: 1,
                reason: *b"Collision detected\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
                ..Default::default()
            };
            // Block up to 1ms — guaranteed delivery for safety-critical commands
            let _ = self.estop_pub.send_blocking(stop, 1.ms());
        }
    }
}

Multi-Sensor Fusion

Multiple subscribers reading from different sensors, fusing into a single output:

struct FusionNode {
    imu_sub: Topic<Imu>,
    odom_sub: Topic<Odometry>,
    pose_pub: Topic<Pose3D>,
    last_imu: Option<Imu>,
    last_odom: Option<Odometry>,
}

impl Node for FusionNode {
    fn name(&self) -> &str { "Fusion" }

    fn tick(&mut self) {
        // Always drain both topics every tick
        if let Some(imu) = self.imu_sub.recv() { self.last_imu = Some(imu); }
        if let Some(odom) = self.odom_sub.recv() { self.last_odom = Some(odom); }

        if let (Some(imu), Some(odom)) = (&self.last_imu, &self.last_odom) {
            let fused = self.fuse(imu, odom);
            self.pose_pub.send(fused);
        }
    }
}

See Also