Hub and Pub/Sub
The Hub is HORUS's ultra-low latency publish-subscribe (pub/sub) communication system. It enables nodes to exchange messages through shared memory IPC with 296ns-2.8µs latency.
What is a Hub?
A Hub<T> is a typed communication channel that connects publishers and subscribers through shared memory. Multiple nodes can publish to the same topic, and multiple nodes can subscribe to the same topic.
Key Features
Zero-Copy Communication: Messages are written directly to shared memory without serialization
Lock-Free Operations: Atomic operations for thread-safe communication without locks
Type Safety: Compile-time guarantees for message types
Cache-Aligned: Optimized memory layout prevents false sharing
Sub-Microsecond Latency: 296ns for small messages (16B), 1.31µs for larger messages (1.5KB)
Linear Scaling: Latency scales linearly with message size
Basic Usage
Creating a Hub
use horus::prelude::*;
// Create a Hub for f32 values on topic "velocity"
let hub: Hub<f32> = Hub::new("velocity")?;
The generic type T must implement:
Clone: For copying dataDebug: For loggingSend: For thread safety
Publishing Messages
use horus::prelude::*;
struct Publisher {
velocity_pub: Hub<f32>,
}
impl Node for Publisher {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let velocity = 1.5;
// Send message
match self.velocity_pub.send(velocity, ctx) {
Ok(()) => {
// Message sent successfully
}
Err(msg) => {
// Failed to send, msg contains the original value
}
}
}
}
Subscribing to Messages
use horus::prelude::*;
struct Subscriber {
velocity_sub: Hub<f32>,
}
impl Node for Subscriber {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Receive message
if let Some(velocity) = self.velocity_sub.recv(ctx) {
// Process the received message
println!("Received velocity: {}", velocity);
}
}
}
Hub Architecture
Memory Layout
The Hub uses a cache-aligned structure to prevent false sharing:
#[repr(align(64))]
pub struct Hub<T> {
shm_topic: Arc<ShmTopic<T>>,
topic_name: String,
state: AtomicU8,
metrics: Arc<AtomicHubMetrics>,
_padding: [u8; 15],
}
64-byte alignment: Matches CPU cache line size for optimal performance
Atomic state: Lock-free connection state tracking
Shared metrics: Performance counters with atomic operations
Zero unsafe code: Safe abstractions over shared memory
Shared Memory Topic
Internally, Hub uses ShmTopic<T> for actual shared memory operations:
pub struct ShmTopic<T> {
topic_name: String,
domain_id: String,
_phantom: PhantomData<T>,
}
Topics are stored in /dev/shm with naming convention:
/dev/shm/horus_{domain_id}_{topic_name}
Send and Receive
The send() Method
pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>
Parameters:
msg: Message to send (moved into the function)ctx: Optional NodeInfo for logging
Returns:
Ok(()): Message sent successfullyErr(msg): Failed to send, returns the original message
Implementation:
pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T> {
let msg_clone = msg.clone();
let ipc_ns = match self.shm_topic.loan() {
Ok(mut sample) => {
let ipc_start = Instant::now();
sample.write(msg_clone);
drop(sample);
ipc_start.elapsed().as_nanos() as u64
},
Err(_) => 0
};
if let Some(ctx) = ctx {
ctx.log_pub(&self.topic_name, &msg, ipc_ns);
}
if ipc_ns == 0 {
Err(msg)
} else {
Ok(())
}
}
Key Points:
- Uses loan pattern for zero-copy writes
- Measures IPC latency in nanoseconds
- Automatic logging with timing information
- Non-blocking operation
The recv() Method
pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>
Parameters:
ctx: Optional NodeInfo for logging
Returns:
Some(msg): Message receivedNone: No message available
Implementation:
pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T> {
let ipc_start = Instant::now();
match self.shm_topic.read() {
Ok(sample) => {
let msg = sample.clone();
let ipc_ns = ipc_start.elapsed().as_nanos() as u64;
if let Some(ctx) = ctx {
ctx.log_sub(&self.topic_name, &msg, ipc_ns);
}
Some(msg)
}
Err(_) => None
}
}
Key Points:
- Non-blocking read
- Measures IPC latency
- Automatic logging with timing
- Returns clone of shared memory data
Connection States
Hub tracks connection state through an atomic enum:
pub enum ConnectionState {
Uninitialized = 0,
Connecting = 1,
Connected = 2,
Disconnecting = 3,
Disconnected = 4,
Error = 5,
}
Checking Connection State
pub fn state(&self) -> ConnectionState {
// Returns current connection state
}
State Transitions
Uninitialized Connecting Connected
->
Disconnecting Disconnected
->
Error
Performance Metrics
Hub tracks detailed performance metrics atomically:
pub struct AtomicHubMetrics {
pub total_messages_sent: AtomicU64,
pub total_messages_received: AtomicU64,
pub total_bytes_sent: AtomicU64,
pub total_bytes_received: AtomicU64,
pub avg_send_latency_ns: AtomicU64,
pub avg_recv_latency_ns: AtomicU64,
pub last_send_time_ns: AtomicU64,
pub last_recv_time_ns: AtomicU64,
}
Accessing Metrics
let hub: Hub<f32> = Hub::new("velocity")?;
// Send some messages
hub.send(1.0, None).ok();
hub.send(2.0, None).ok();
// Get metrics
let metrics = hub.metrics();
println!("Total sent: {}", metrics.total_messages_sent);
println!("Avg latency: {}ns", metrics.avg_send_latency_ns);
Communication Patterns
One-to-One
Single publisher, single subscriber:
// Publisher
struct PubNode {
data_pub: Hub<f32>,
}
impl Node for PubNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.data_pub.send(42.0, ctx).ok();
}
}
// Subscriber
struct SubNode {
data_sub: Hub<f32>,
}
impl Node for SubNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
println!("Got: {}", data);
}
}
}
One-to-Many (Broadcast)
Single publisher, multiple subscribers:
// One publisher
struct Broadcaster {
broadcast_pub: Hub<String>,
}
// Multiple subscribers
struct Listener1 {
broadcast_sub: Hub<String>,
}
struct Listener2 {
broadcast_sub: Hub<String>,
}
struct Listener3 {
broadcast_sub: Hub<String>,
}
// All subscribers receive the same message
Many-to-One (Aggregation)
Multiple publishers, single subscriber:
// Multiple publishers
struct Sensor1 {
reading_pub: Hub<f32>,
}
struct Sensor2 {
reading_pub: Hub<f32>,
}
// Single aggregator
struct Aggregator {
reading_sub: Hub<f32>,
}
impl Node for Aggregator {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Receives from any publisher on this topic
if let Some(reading) = self.reading_sub.recv(ctx) {
self.process(reading);
}
}
}
Many-to-Many
Multiple publishers and subscribers:
// All nodes can publish and subscribe to the same topic
struct Agent1 {
state_pub: Hub<RobotState>,
state_sub: Hub<RobotState>,
}
struct Agent2 {
state_pub: Hub<RobotState>,
state_sub: Hub<RobotState>,
}
// Agents share state with each other
Topic Naming
Best Practices
Use descriptive names:
let hub = Hub::new("cmd_vel"); // Good
let hub = Hub::new("data"); // Too vague
Follow conventions:
let hub = Hub::new("sensor/lidar"); // Hierarchical
let hub = Hub::new("robot1/cmd_vel"); // Namespaced
let hub = Hub::new("diagnostics/cpu"); // Categorized
Be consistent:
// Pick one style and stick to it
"cmd_vel" // Snake case
"cmdVel" // Camel case
"CmdVel" // Pascal case
Reserved Topic Names
Avoid using these patterns:
- Topics starting with
_(internal use) - Topics containing
/dev/(conflicts with paths) - Topics with special characters:
!@#$%^&*()
Error Handling
Send Errors
match hub.send(data, ctx) {
Ok(()) => {
// Success
}
Err(original_data) => {
// Failed to send - shared memory full or not available
// original_data contains the message that couldn't be sent
if let Some(ctx) = ctx {
ctx.log_warning("Failed to publish message");
}
}
}
Receive Errors
recv() returns None when:
- No message is available (not an error)
- Topic doesn't exist yet
- Shared memory not initialized
match hub.recv(ctx) {
Some(data) => {
// Process data
}
None => {
// No data available - this is normal
// Don't treat this as an error
}
}
Type Constraints
Valid Message Types
Messages must implement Clone + Debug + Send:
// Simple types work out of the box
Hub::<f32>::new("float_topic");
Hub::<bool>::new("bool_topic");
Hub::<u32>::new("int_topic");
// Structs need derives
#[derive(Clone, Debug)]
struct MyMessage {
x: f32,
y: f32,
}
Hub::<MyMessage>::new("my_topic");
// Arrays work
Hub::<[f32; 100]>::new("array_topic");
// Strings work
Hub::<String>::new("string_topic");
Invalid Message Types
// Missing Clone
struct NoClone {
data: f32,
}
// Hub::<NoClone>::new("topic"); // Won't compile
// Missing Send
struct NoSend {
ptr: *const u8, // Raw pointers are !Send
}
// Hub::<NoSend>::new("topic"); // Won't compile
Advanced Usage
Conditional Publishing
Only publish when certain conditions are met:
impl Node for ConditionalPublisher {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let data = self.read_sensor();
// Only publish if above threshold
if data > self.threshold {
self.alert_pub.send(data, ctx).ok();
}
}
}
Message Buffering
Cache the last received message:
struct BufferedSubscriber {
data_sub: Hub<f32>,
last_value: Option<f32>,
}
impl Node for BufferedSubscriber {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Update cached value if new message available
if let Some(value) = self.data_sub.recv(ctx) {
self.last_value = Some(value);
}
// Always have access to last value
if let Some(value) = self.last_value {
self.process(value);
}
}
}
Rate Limiting
Publish at a specific rate:
struct RateLimitedPublisher {
data_pub: Hub<f32>,
tick_count: u32,
publish_every_n_ticks: u32,
}
impl Node for RateLimitedPublisher {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.tick_count += 1;
// Publish every 10 ticks (~6 Hz at 60 FPS)
if self.tick_count % self.publish_every_n_ticks == 0 {
self.data_pub.send(42.0, ctx).ok();
}
}
}
Message Filtering
Filter messages before processing:
impl Node for FilteringSubscriber {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
// Only process valid data
if data.is_valid() && data.quality > 0.8 {
self.process(data);
}
}
}
}
Shared Memory Details
Location
Hub stores data in /dev/shm (Linux shared memory):
# View HORUS shared memory segments
ls -lh /dev/shm/horus_*
Size Limitations
Shared memory has finite space. Check available space:
df -h /dev/shm
Typical default: 50% of RAM or 32MB-2GB depending on system.
Cleaning Up
Remove stale shared memory segments:
# Remove all HORUS shared memory
rm -f /dev/shm/horus_*
HORUS automatically cleans up when nodes shut down gracefully (Ctrl+C).
Performance Characteristics
Latency by Message Size
Based on benchmark data:
| Message Type | Size | Latency |
|---|---|---|
| CmdVel | 16B | 296ns |
| IMU | 304B | 718ns |
| LaserScan | 1.5KB | 1.31µs |
| PointCloud | 120KB | 2.8µs |
Key Insight: Latency scales linearly with message size.
Throughput
HORUS can handle:
- Millions of messages per second for small messages
- Gigabytes per second for large messages
- Deterministic latency regardless of system load
Performance Benchmarks
HORUS achieves sub-microsecond latency across different message sizes:
| Message Type | Size | HORUS Latency |
|---|---|---|
| CmdVel | 16B | 296ns |
| IMU | 304B | 718ns |
| LaserScan | 1.5KB | 1.31µs |
| PointCloud | 120KB | 2.8µs |
Best Practices
Use Appropriate Types
Choose the right message type for your data:
// Good: Fixed-size array for known dimensions
Hub::<[f32; 3]>::new("position");
// Bad: Vec requires heap allocation
Hub::<Vec<f32>>::new("position");
Minimize Cloning
Hub clones messages internally. Keep messages small:
// Good: Small struct
#[derive(Clone, Debug)]
struct Pose {
x: f32,
y: f32,
theta: f32,
}
// Consider Arc for large data
Hub::<Arc<LargeData>>::new("big_data");
Check recv() Every Tick
Always check for new messages:
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Check EVERY tick
if let Some(msg) = self.sub.recv(ctx) {
self.process(msg);
}
}
Handle Send Failures
Don't ignore send errors:
if let Err(data) = self.pub.send(data, ctx) {
if let Some(ctx) = ctx {
ctx.log_warning("Publish failed");
}
}
Next Steps
- Learn about the Scheduler for orchestrating nodes
- Understand Shared Memory internals
- Explore Message Types for standard robotics messages
- Read the API Reference for complete Hub documentation