Hub and Pub/Sub

The Hub is HORUS's ultra-low latency publish-subscribe (pub/sub) communication system. It enables nodes to exchange messages through shared memory IPC with 296ns-2.8µs latency.

What is a Hub?

A Hub<T> is a typed communication channel that connects publishers and subscribers through shared memory. Multiple nodes can publish to the same topic, and multiple nodes can subscribe to the same topic.

Key Features

Zero-Copy Communication: Messages are written directly to shared memory without serialization

Lock-Free Operations: Atomic operations for thread-safe communication without locks

Type Safety: Compile-time guarantees for message types

Cache-Aligned: Optimized memory layout prevents false sharing

Sub-Microsecond Latency: 296ns for small messages (16B), 1.31µs for larger messages (1.5KB)

Linear Scaling: Latency scales linearly with message size

Basic Usage

Creating a Hub

use horus::prelude::*;

// Create a Hub for f32 values on topic "velocity"
let hub: Hub<f32> = Hub::new("velocity")?;

The generic type T must implement:

  • Clone: For copying data
  • Debug: For logging
  • Send: For thread safety

Publishing Messages

use horus::prelude::*;

struct Publisher {
    velocity_pub: Hub<f32>,
}

impl Node for Publisher {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let velocity = 1.5;

        // Send message
        match self.velocity_pub.send(velocity, ctx) {
            Ok(()) => {
                // Message sent successfully
            }
            Err(msg) => {
                // Failed to send, msg contains the original value
            }
        }
    }
}

Subscribing to Messages

use horus::prelude::*;

struct Subscriber {
    velocity_sub: Hub<f32>,
}

impl Node for Subscriber {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Receive message
        if let Some(velocity) = self.velocity_sub.recv(ctx) {
            // Process the received message
            println!("Received velocity: {}", velocity);
        }
    }
}

Hub Architecture

Memory Layout

The Hub uses a cache-aligned structure to prevent false sharing:

#[repr(align(64))]
pub struct Hub<T> {
    shm_topic: Arc<ShmTopic<T>>,
    topic_name: String,
    state: AtomicU8,
    metrics: Arc<AtomicHubMetrics>,
    _padding: [u8; 15],
}

64-byte alignment: Matches CPU cache line size for optimal performance

Atomic state: Lock-free connection state tracking

Shared metrics: Performance counters with atomic operations

Zero unsafe code: Safe abstractions over shared memory

Shared Memory Topic

Internally, Hub uses ShmTopic<T> for actual shared memory operations:

pub struct ShmTopic<T> {
    topic_name: String,
    domain_id: String,
    _phantom: PhantomData<T>,
}

Topics are stored in /dev/shm with naming convention:

/dev/shm/horus_{domain_id}_{topic_name}

Send and Receive

The send() Method

pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>

Parameters:

  • msg: Message to send (moved into the function)
  • ctx: Optional NodeInfo for logging

Returns:

  • Ok(()): Message sent successfully
  • Err(msg): Failed to send, returns the original message

Implementation:

pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T> {
    let msg_clone = msg.clone();

    let ipc_ns = match self.shm_topic.loan() {
        Ok(mut sample) => {
            let ipc_start = Instant::now();
            sample.write(msg_clone);
            drop(sample);
            ipc_start.elapsed().as_nanos() as u64
        },
        Err(_) => 0
    };

    if let Some(ctx) = ctx {
        ctx.log_pub(&self.topic_name, &msg, ipc_ns);
    }

    if ipc_ns == 0 {
        Err(msg)
    } else {
        Ok(())
    }
}

Key Points:

  • Uses loan pattern for zero-copy writes
  • Measures IPC latency in nanoseconds
  • Automatic logging with timing information
  • Non-blocking operation

The recv() Method

pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>

Parameters:

  • ctx: Optional NodeInfo for logging

Returns:

  • Some(msg): Message received
  • None: No message available

Implementation:

pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T> {
    let ipc_start = Instant::now();

    match self.shm_topic.read() {
        Ok(sample) => {
            let msg = sample.clone();
            let ipc_ns = ipc_start.elapsed().as_nanos() as u64;

            if let Some(ctx) = ctx {
                ctx.log_sub(&self.topic_name, &msg, ipc_ns);
            }

            Some(msg)
        }
        Err(_) => None
    }
}

Key Points:

  • Non-blocking read
  • Measures IPC latency
  • Automatic logging with timing
  • Returns clone of shared memory data

Connection States

Hub tracks connection state through an atomic enum:

pub enum ConnectionState {
    Uninitialized = 0,
    Connecting = 1,
    Connected = 2,
    Disconnecting = 3,
    Disconnected = 4,
    Error = 5,
}

Checking Connection State

pub fn state(&self) -> ConnectionState {
    // Returns current connection state
}

State Transitions

Uninitialized  Connecting  Connected
                                ->
                            Disconnecting  Disconnected
                                ->
                              Error

Performance Metrics

Hub tracks detailed performance metrics atomically:

pub struct AtomicHubMetrics {
    pub total_messages_sent: AtomicU64,
    pub total_messages_received: AtomicU64,
    pub total_bytes_sent: AtomicU64,
    pub total_bytes_received: AtomicU64,
    pub avg_send_latency_ns: AtomicU64,
    pub avg_recv_latency_ns: AtomicU64,
    pub last_send_time_ns: AtomicU64,
    pub last_recv_time_ns: AtomicU64,
}

Accessing Metrics

let hub: Hub<f32> = Hub::new("velocity")?;

// Send some messages
hub.send(1.0, None).ok();
hub.send(2.0, None).ok();

// Get metrics
let metrics = hub.metrics();
println!("Total sent: {}", metrics.total_messages_sent);
println!("Avg latency: {}ns", metrics.avg_send_latency_ns);

Communication Patterns

One-to-One

Single publisher, single subscriber:

// Publisher
struct PubNode {
    data_pub: Hub<f32>,
}

impl Node for PubNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        self.data_pub.send(42.0, ctx).ok();
    }
}

// Subscriber
struct SubNode {
    data_sub: Hub<f32>,
}

impl Node for SubNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            println!("Got: {}", data);
        }
    }
}

One-to-Many (Broadcast)

Single publisher, multiple subscribers:

// One publisher
struct Broadcaster {
    broadcast_pub: Hub<String>,
}

// Multiple subscribers
struct Listener1 {
    broadcast_sub: Hub<String>,
}

struct Listener2 {
    broadcast_sub: Hub<String>,
}

struct Listener3 {
    broadcast_sub: Hub<String>,
}

// All subscribers receive the same message

Many-to-One (Aggregation)

Multiple publishers, single subscriber:

// Multiple publishers
struct Sensor1 {
    reading_pub: Hub<f32>,
}

struct Sensor2 {
    reading_pub: Hub<f32>,
}

// Single aggregator
struct Aggregator {
    reading_sub: Hub<f32>,
}

impl Node for Aggregator {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Receives from any publisher on this topic
        if let Some(reading) = self.reading_sub.recv(ctx) {
            self.process(reading);
        }
    }
}

Many-to-Many

Multiple publishers and subscribers:

// All nodes can publish and subscribe to the same topic
struct Agent1 {
    state_pub: Hub<RobotState>,
    state_sub: Hub<RobotState>,
}

struct Agent2 {
    state_pub: Hub<RobotState>,
    state_sub: Hub<RobotState>,
}

// Agents share state with each other

Topic Naming

Best Practices

Use descriptive names:

let hub = Hub::new("cmd_vel");           // Good
let hub = Hub::new("data");              // Too vague

Follow conventions:

let hub = Hub::new("sensor/lidar");      // Hierarchical
let hub = Hub::new("robot1/cmd_vel");    // Namespaced
let hub = Hub::new("diagnostics/cpu");   // Categorized

Be consistent:

// Pick one style and stick to it
"cmd_vel"       // Snake case
"cmdVel"        // Camel case
"CmdVel"        // Pascal case

Reserved Topic Names

Avoid using these patterns:

  • Topics starting with _ (internal use)
  • Topics containing /dev/ (conflicts with paths)
  • Topics with special characters: !@#$%^&*()

Error Handling

Send Errors

match hub.send(data, ctx) {
    Ok(()) => {
        // Success
    }
    Err(original_data) => {
        // Failed to send - shared memory full or not available
        // original_data contains the message that couldn't be sent
        if let Some(ctx) = ctx {
            ctx.log_warning("Failed to publish message");
        }
    }
}

Receive Errors

recv() returns None when:

  • No message is available (not an error)
  • Topic doesn't exist yet
  • Shared memory not initialized
match hub.recv(ctx) {
    Some(data) => {
        // Process data
    }
    None => {
        // No data available - this is normal
        // Don't treat this as an error
    }
}

Type Constraints

Valid Message Types

Messages must implement Clone + Debug + Send:

// Simple types work out of the box
Hub::<f32>::new("float_topic");
Hub::<bool>::new("bool_topic");
Hub::<u32>::new("int_topic");

// Structs need derives
#[derive(Clone, Debug)]
struct MyMessage {
    x: f32,
    y: f32,
}

Hub::<MyMessage>::new("my_topic");

// Arrays work
Hub::<[f32; 100]>::new("array_topic");

// Strings work
Hub::<String>::new("string_topic");

Invalid Message Types

// Missing Clone
struct NoClone {
    data: f32,
}
// Hub::<NoClone>::new("topic"); // Won't compile

// Missing Send
struct NoSend {
    ptr: *const u8,  // Raw pointers are !Send
}
// Hub::<NoSend>::new("topic"); // Won't compile

Advanced Usage

Conditional Publishing

Only publish when certain conditions are met:

impl Node for ConditionalPublisher {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let data = self.read_sensor();

        // Only publish if above threshold
        if data > self.threshold {
            self.alert_pub.send(data, ctx).ok();
        }
    }
}

Message Buffering

Cache the last received message:

struct BufferedSubscriber {
    data_sub: Hub<f32>,
    last_value: Option<f32>,
}

impl Node for BufferedSubscriber {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Update cached value if new message available
        if let Some(value) = self.data_sub.recv(ctx) {
            self.last_value = Some(value);
        }

        // Always have access to last value
        if let Some(value) = self.last_value {
            self.process(value);
        }
    }
}

Rate Limiting

Publish at a specific rate:

struct RateLimitedPublisher {
    data_pub: Hub<f32>,
    tick_count: u32,
    publish_every_n_ticks: u32,
}

impl Node for RateLimitedPublisher {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        self.tick_count += 1;

        // Publish every 10 ticks (~6 Hz at 60 FPS)
        if self.tick_count % self.publish_every_n_ticks == 0 {
            self.data_pub.send(42.0, ctx).ok();
        }
    }
}

Message Filtering

Filter messages before processing:

impl Node for FilteringSubscriber {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            // Only process valid data
            if data.is_valid() && data.quality > 0.8 {
                self.process(data);
            }
        }
    }
}

Shared Memory Details

Location

Hub stores data in /dev/shm (Linux shared memory):

# View HORUS shared memory segments
ls -lh /dev/shm/horus_*

Size Limitations

Shared memory has finite space. Check available space:

df -h /dev/shm

Typical default: 50% of RAM or 32MB-2GB depending on system.

Cleaning Up

Remove stale shared memory segments:

# Remove all HORUS shared memory
rm -f /dev/shm/horus_*

HORUS automatically cleans up when nodes shut down gracefully (Ctrl+C).

Performance Characteristics

Latency by Message Size

Based on benchmark data:

Message TypeSizeLatency
CmdVel16B296ns
IMU304B718ns
LaserScan1.5KB1.31µs
PointCloud120KB2.8µs

Key Insight: Latency scales linearly with message size.

Throughput

HORUS can handle:

  • Millions of messages per second for small messages
  • Gigabytes per second for large messages
  • Deterministic latency regardless of system load

Performance Benchmarks

HORUS achieves sub-microsecond latency across different message sizes:

Message TypeSizeHORUS Latency
CmdVel16B296ns
IMU304B718ns
LaserScan1.5KB1.31µs
PointCloud120KB2.8µs

Best Practices

Use Appropriate Types

Choose the right message type for your data:

// Good: Fixed-size array for known dimensions
Hub::<[f32; 3]>::new("position");

// Bad: Vec requires heap allocation
Hub::<Vec<f32>>::new("position");

Minimize Cloning

Hub clones messages internally. Keep messages small:

// Good: Small struct
#[derive(Clone, Debug)]
struct Pose {
    x: f32,
    y: f32,
    theta: f32,
}

// Consider Arc for large data
Hub::<Arc<LargeData>>::new("big_data");

Check recv() Every Tick

Always check for new messages:

fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    // Check EVERY tick
    if let Some(msg) = self.sub.recv(ctx) {
        self.process(msg);
    }
}

Handle Send Failures

Don't ignore send errors:

if let Err(data) = self.pub.send(data, ctx) {
    if let Some(ctx) = ctx {
        ctx.log_warning("Publish failed");
    }
}

Next Steps