Hub API Reference

The Hub<T> provides ultra-low latency pub/sub communication through shared memory.

Creating a Hub

Hub::new(topic_name)

Create a new hub for publishing and subscribing to a topic.

pub fn new(topic_name: &str) -> HorusResult<Self>

Parameters:

  • topic_name: Name of the topic (string)

Returns: Result<Hub<T>, HorusError>

Example:

use horus::prelude::*;

let velocity_hub: Hub<f32> = Hub::new("cmd_vel")?;

Type Constraints:

  • T must implement Clone + Debug + Send + Sync

Hub::new_with_capacity(topic_name, capacity)

Create a hub with custom message capacity.

pub fn new_with_capacity(
    topic_name: &str,
    capacity: usize
) -> HorusResult<Self>

Parameters:

  • topic_name: Name of the topic
  • capacity: Maximum number of messages to buffer

Returns: Result<Hub<T>, HorusError>

Example:

// Create hub with buffer for 10 messages
let hub: Hub<f32> = Hub::new_with_capacity("data", 10)?;

Publishing Messages

send(msg, ctx)

Send a message to all subscribers on this topic.

pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>

Parameters:

  • msg: Message to send (moved, then cloned internally)
  • ctx: Optional NodeInfo for automatic logging

Returns:

  • Ok(()): Message sent successfully
  • Err(msg): Failed to send (returns original message)

Performance: 296ns for small messages (16B), scales linearly with size

Example:

// Without logging
hub.send(42.0, None).ok();

// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    hub.send(42.0, ctx).ok();
    // Automatically logs: [12:34:56.789] [IPC: 296ns] MyNode --PUB--> 'topic' = 42.0
}

Error Handling:

match hub.send(data, ctx) {
    Ok(()) => {
        // Message sent successfully
    }
    Err(original_data) => {
        // Failed - shared memory full or unavailable
        // original_data contains the message that couldn't be sent
    }
}

Subscribing to Messages

recv(ctx)

Receive a message from the topic (non-blocking).

pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>

Parameters:

  • ctx: Optional NodeInfo for automatic logging

Returns:

  • Some(msg): Message received
  • None: No message available (not an error)

Performance: 296ns for small messages (16B)

Example:

// Without logging
if let Some(velocity) = hub.recv(None) {
    println!("Received: {}", velocity);
}

// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    if let Some(velocity) = hub.recv(ctx) {
        // Automatically logs: [12:34:56.789] [IPC: 142ns] MyNode <--SUB-- 'topic' = 42.0
        self.process(velocity);
    }
}

Best Practices:

  • Always check recv() every tick for responsive nodes
  • None is normal - means no new messages
  • Messages are cloned when received

Monitoring Methods

get_connection_state()

Get current connection state (lock-free).

pub fn get_connection_state(&self) -> ConnectionState

Returns: ConnectionState enum

Connection States:

  • Disconnected - Not connected
  • Connecting - Establishing connection
  • Connected - Ready for communication
  • Reconnecting - Attempting to reconnect
  • Failed - Connection failed

Example:

if hub.get_connection_state() == ConnectionState::Connected {
    hub.send(data, ctx).ok();
}

get_metrics()

Get performance metrics snapshot (lock-free).

pub fn get_metrics(&self) -> HubMetrics

Returns: HubMetrics struct with:

  • messages_sent: u64
  • messages_received: u64
  • send_failures: u64
  • recv_failures: u64

Example:

let metrics = hub.get_metrics();
println!("Sent: {}, Received: {}",
         metrics.messages_sent,
         metrics.messages_received);

get_topic_name()

Get the topic name for this hub.

pub fn get_topic_name(&self) -> &str

Returns: Topic name as string slice

Example:

println!("Topic: {}", hub.get_topic_name());

Valid Message Types

Primitives

Hub::<f32>::new("float")?;
Hub::<f64>::new("double")?;
Hub::<i32>::new("int")?;
Hub::<u32>::new("uint")?;
Hub::<bool>::new("bool")?;
Hub::<String>::new("string")?;

Arrays

Hub::<[f32; 3]>::new("position")?;
Hub::<[u8; 1024]>::new("buffer")?;

Structs

Must implement Clone + Debug:

#[derive(Clone, Debug)]
struct RobotPose {
    x: f32,
    y: f32,
    theta: f32,
}

let pose_hub: Hub<RobotPose> = Hub::new("pose")?;

Standard Messages

HORUS provides pre-defined message types:

use horus::prelude::*;
use horus_library::messages::{CmdVel, LaserScan, Imu};

let cmd_hub: Hub<CmdVel> = Hub::new("cmd_vel")?;
let scan_hub: Hub<LaserScan> = Hub::new("scan")?;
let imu_hub: Hub<Imu> = Hub::new("imu")?;

Communication Patterns

One-to-One

Single publisher, single subscriber:

// Publisher node
struct PublisherNode {
    data_pub: Hub<f32>,
}

impl Node for PublisherNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        self.data_pub.send(42.0, ctx).ok();
    }
}

// Subscriber node
struct SubscriberNode {
    data_sub: Hub<f32>,
}

impl Node for SubscriberNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(data) = self.data_sub.recv(ctx) {
            println!("Got: {}", data);
        }
    }
}

One-to-Many (Broadcast)

Single publisher, multiple subscribers:

// One publisher
struct Broadcaster {
    broadcast_pub: Hub<String>,
}

// Multiple subscribers automatically receive
struct Listener1 {
    broadcast_sub: Hub<String>,
}

struct Listener2 {
    broadcast_sub: Hub<String>,
}

Many-to-One (Aggregation)

Multiple publishers, single subscriber:

// Multiple sensors publishing
struct Sensor1 {
    reading_pub: Hub<f32>,
}

struct Sensor2 {
    reading_pub: Hub<f32>,
}

// One aggregator subscribing
struct Aggregator {
    reading_sub: Hub<f32>,
}

Processing Pipeline

Chain multiple nodes with pub/sub:

impl Node for ProcessorNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Receive input
        if let Some(input) = self.input_sub.recv(ctx) {
            // Process
            let output = self.process(input);

            // Publish result
            self.output_pub.send(output, ctx).ok();
        }
    }
}

Performance Characteristics

Latency

Message SizeLatency
16B (CmdVel)296ns
304B (IMU)718ns
1.5KB (LaserScan)1.31µs
120KB (PointCloud)2.8µs

Latency scales linearly with message size.

Zero-Copy Design

Hub uses shared memory with atomic operations:

  • No serialization/deserialization
  • No network overhead
  • No kernel context switches
  • Cache-line aligned to prevent false sharing

Best Practices

Topic Naming

Use descriptive, consistent names:

// Good
Hub::new("cmd_vel")?;
Hub::new("sensor/lidar")?;
Hub::new("robot1/pose")?;

// Bad
Hub::new("data")?;  // Too vague
Hub::new("x")?;     // Not descriptive

Error Handling

Always handle send errors:

if let Err(data) = hub.send(data, ctx) {
    ctx.log_warning("Failed to publish message");
    // Handle error (retry, log, etc.)
}

Type Safety

Use the same type for publishers and subscribers:

// Publisher
let pub_hub: Hub<f32> = Hub::new("velocity")?;

// Subscriber - must match type!
let sub_hub: Hub<f32> = Hub::new("velocity")?;

Message Size

Prefer fixed-size types for predictable performance:

// Good - fixed size
Hub::<[f32; 100]>::new("data")?;

// Less ideal - dynamic allocation
Hub::<Vec<f32>>::new("data")?;

Common Patterns

Conditional Publishing

if value > threshold {
    hub.send(alert, ctx).ok();
}

Message Buffering

struct BufferedNode {
    data_sub: Hub<f32>,
    last_value: Option<f32>,
}

impl Node for BufferedNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Update cache if new message available
        if let Some(value) = self.data_sub.recv(ctx) {
            self.last_value = Some(value);
        }

        // Always have access to last value
        if let Some(value) = self.last_value {
            self.process(value);
        }
    }
}

Rate Limiting

struct RateLimitedPublisher {
    data_pub: Hub<f32>,
    tick_count: u32,
}

impl Node for RateLimitedPublisher {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        self.tick_count += 1;

        // Publish every 10 ticks (~6 Hz at 60 FPS)
        if self.tick_count % 10 == 0 {
            self.data_pub.send(42.0, ctx).ok();
        }
    }
}

Shared Memory Details

Location

Hubs store data in /dev/shm (Linux shared memory):

# View HORUS shared memory
ls -lh /dev/shm/horus_*

Cleanup

HORUS automatically cleans up on graceful shutdown (Ctrl+C).

Manual cleanup:

rm -f /dev/shm/horus_*

Size Limits

Check available shared memory:

df -h /dev/shm

Typical: 50% of RAM or 32MB-2GB depending on system.

See Also