Hub API Reference
The Hub<T> provides ultra-low latency pub/sub communication through shared memory.
Creating a Hub
Hub::new(topic_name)
Create a new hub for publishing and subscribing to a topic.
pub fn new(topic_name: &str) -> HorusResult<Self>
Parameters:
topic_name: Name of the topic (string)
Returns: Result<Hub<T>, HorusError>
Example:
use horus::prelude::*;
let velocity_hub: Hub<f32> = Hub::new("cmd_vel")?;
Type Constraints:
Tmust implementClone + Debug + Send + Sync
Hub::new_with_capacity(topic_name, capacity)
Create a hub with custom message capacity.
pub fn new_with_capacity(
topic_name: &str,
capacity: usize
) -> HorusResult<Self>
Parameters:
topic_name: Name of the topiccapacity: Maximum number of messages to buffer
Returns: Result<Hub<T>, HorusError>
Example:
// Create hub with buffer for 10 messages
let hub: Hub<f32> = Hub::new_with_capacity("data", 10)?;
Publishing Messages
send(msg, ctx)
Send a message to all subscribers on this topic.
pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>
Parameters:
msg: Message to send (moved, then cloned internally)ctx: Optional NodeInfo for automatic logging
Returns:
Ok(()): Message sent successfullyErr(msg): Failed to send (returns original message)
Performance: 296ns for small messages (16B), scales linearly with size
Example:
// Without logging
hub.send(42.0, None).ok();
// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
hub.send(42.0, ctx).ok();
// Automatically logs: [12:34:56.789] [IPC: 296ns] MyNode --PUB--> 'topic' = 42.0
}
Error Handling:
match hub.send(data, ctx) {
Ok(()) => {
// Message sent successfully
}
Err(original_data) => {
// Failed - shared memory full or unavailable
// original_data contains the message that couldn't be sent
}
}
Subscribing to Messages
recv(ctx)
Receive a message from the topic (non-blocking).
pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>
Parameters:
ctx: Optional NodeInfo for automatic logging
Returns:
Some(msg): Message receivedNone: No message available (not an error)
Performance: 296ns for small messages (16B)
Example:
// Without logging
if let Some(velocity) = hub.recv(None) {
println!("Received: {}", velocity);
}
// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(velocity) = hub.recv(ctx) {
// Automatically logs: [12:34:56.789] [IPC: 142ns] MyNode <--SUB-- 'topic' = 42.0
self.process(velocity);
}
}
Best Practices:
- Always check
recv()every tick for responsive nodes Noneis normal - means no new messages- Messages are cloned when received
Monitoring Methods
get_connection_state()
Get current connection state (lock-free).
pub fn get_connection_state(&self) -> ConnectionState
Returns: ConnectionState enum
Connection States:
Disconnected- Not connectedConnecting- Establishing connectionConnected- Ready for communicationReconnecting- Attempting to reconnectFailed- Connection failed
Example:
if hub.get_connection_state() == ConnectionState::Connected {
hub.send(data, ctx).ok();
}
get_metrics()
Get performance metrics snapshot (lock-free).
pub fn get_metrics(&self) -> HubMetrics
Returns: HubMetrics struct with:
messages_sent: u64messages_received: u64send_failures: u64recv_failures: u64
Example:
let metrics = hub.get_metrics();
println!("Sent: {}, Received: {}",
metrics.messages_sent,
metrics.messages_received);
get_topic_name()
Get the topic name for this hub.
pub fn get_topic_name(&self) -> &str
Returns: Topic name as string slice
Example:
println!("Topic: {}", hub.get_topic_name());
Valid Message Types
Primitives
Hub::<f32>::new("float")?;
Hub::<f64>::new("double")?;
Hub::<i32>::new("int")?;
Hub::<u32>::new("uint")?;
Hub::<bool>::new("bool")?;
Hub::<String>::new("string")?;
Arrays
Hub::<[f32; 3]>::new("position")?;
Hub::<[u8; 1024]>::new("buffer")?;
Structs
Must implement Clone + Debug:
#[derive(Clone, Debug)]
struct RobotPose {
x: f32,
y: f32,
theta: f32,
}
let pose_hub: Hub<RobotPose> = Hub::new("pose")?;
Standard Messages
HORUS provides pre-defined message types:
use horus::prelude::*;
use horus_library::messages::{CmdVel, LaserScan, Imu};
let cmd_hub: Hub<CmdVel> = Hub::new("cmd_vel")?;
let scan_hub: Hub<LaserScan> = Hub::new("scan")?;
let imu_hub: Hub<Imu> = Hub::new("imu")?;
Communication Patterns
One-to-One
Single publisher, single subscriber:
// Publisher node
struct PublisherNode {
data_pub: Hub<f32>,
}
impl Node for PublisherNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.data_pub.send(42.0, ctx).ok();
}
}
// Subscriber node
struct SubscriberNode {
data_sub: Hub<f32>,
}
impl Node for SubscriberNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(data) = self.data_sub.recv(ctx) {
println!("Got: {}", data);
}
}
}
One-to-Many (Broadcast)
Single publisher, multiple subscribers:
// One publisher
struct Broadcaster {
broadcast_pub: Hub<String>,
}
// Multiple subscribers automatically receive
struct Listener1 {
broadcast_sub: Hub<String>,
}
struct Listener2 {
broadcast_sub: Hub<String>,
}
Many-to-One (Aggregation)
Multiple publishers, single subscriber:
// Multiple sensors publishing
struct Sensor1 {
reading_pub: Hub<f32>,
}
struct Sensor2 {
reading_pub: Hub<f32>,
}
// One aggregator subscribing
struct Aggregator {
reading_sub: Hub<f32>,
}
Processing Pipeline
Chain multiple nodes with pub/sub:
impl Node for ProcessorNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Receive input
if let Some(input) = self.input_sub.recv(ctx) {
// Process
let output = self.process(input);
// Publish result
self.output_pub.send(output, ctx).ok();
}
}
}
Performance Characteristics
Latency
| Message Size | Latency |
|---|---|
| 16B (CmdVel) | 296ns |
| 304B (IMU) | 718ns |
| 1.5KB (LaserScan) | 1.31µs |
| 120KB (PointCloud) | 2.8µs |
Latency scales linearly with message size.
Zero-Copy Design
Hub uses shared memory with atomic operations:
- No serialization/deserialization
- No network overhead
- No kernel context switches
- Cache-line aligned to prevent false sharing
Best Practices
Topic Naming
Use descriptive, consistent names:
// Good
Hub::new("cmd_vel")?;
Hub::new("sensor/lidar")?;
Hub::new("robot1/pose")?;
// Bad
Hub::new("data")?; // Too vague
Hub::new("x")?; // Not descriptive
Error Handling
Always handle send errors:
if let Err(data) = hub.send(data, ctx) {
ctx.log_warning("Failed to publish message");
// Handle error (retry, log, etc.)
}
Type Safety
Use the same type for publishers and subscribers:
// Publisher
let pub_hub: Hub<f32> = Hub::new("velocity")?;
// Subscriber - must match type!
let sub_hub: Hub<f32> = Hub::new("velocity")?;
Message Size
Prefer fixed-size types for predictable performance:
// Good - fixed size
Hub::<[f32; 100]>::new("data")?;
// Less ideal - dynamic allocation
Hub::<Vec<f32>>::new("data")?;
Common Patterns
Conditional Publishing
if value > threshold {
hub.send(alert, ctx).ok();
}
Message Buffering
struct BufferedNode {
data_sub: Hub<f32>,
last_value: Option<f32>,
}
impl Node for BufferedNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Update cache if new message available
if let Some(value) = self.data_sub.recv(ctx) {
self.last_value = Some(value);
}
// Always have access to last value
if let Some(value) = self.last_value {
self.process(value);
}
}
}
Rate Limiting
struct RateLimitedPublisher {
data_pub: Hub<f32>,
tick_count: u32,
}
impl Node for RateLimitedPublisher {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
self.tick_count += 1;
// Publish every 10 ticks (~6 Hz at 60 FPS)
if self.tick_count % 10 == 0 {
self.data_pub.send(42.0, ctx).ok();
}
}
}
Shared Memory Details
Location
Hubs store data in /dev/shm (Linux shared memory):
# View HORUS shared memory
ls -lh /dev/shm/horus_*
Cleanup
HORUS automatically cleans up on graceful shutdown (Ctrl+C).
Manual cleanup:
rm -f /dev/shm/horus_*
Size Limits
Check available shared memory:
df -h /dev/shm
Typical: 50% of RAM or 32MB-2GB depending on system.
See Also
- Node API Reference - Node implementation
- Scheduler API Reference - Node orchestration
- Core Concepts: Hub - Detailed guide
- Message Types - Standard messages