Link API Reference
The Link<T> provides ultra-low latency point-to-point (Single Producer Single Consumer) communication through shared memory.
When to use Link vs Hub:
- Use Link for: Tight control loops, 1-to-1 communication, critical latency paths
- Use Hub for: Broadcasting, multiple subscribers, flexible topologies
Performance advantage: Link is ~1.56x faster than Hub (389ns vs 606ns round-trip)
Creating a Link
Link::producer(topic_name)
Create a Link as a producer (sender).
pub fn producer(topic: &str) -> HorusResult<Self>
Parameters:
topic: Name of the link (string)
Returns: Result<Link<T>, HorusError>
Example:
use horus::prelude::*;
let sensor_output: Link<f32> = Link::producer("sensor_data")?;
Type Constraints:
Tmust implementClone + Debug + Send + Sync
Link::consumer(topic_name)
Create a Link as a consumer (receiver).
pub fn consumer(topic: &str) -> HorusResult<Self>
Parameters:
topic: Name of the link (must match producer's topic)
Returns: Result<Link<T>, HorusError>
Example:
use horus::prelude::*;
let sensor_input: Link<f32> = Link::consumer("sensor_data")?;
Link::producer_with_capacity(topic, capacity)
Create a producer with custom buffer capacity.
pub fn producer_with_capacity(
topic: &str,
capacity: usize
) -> HorusResult<Self>
Parameters:
topic: Name of the linkcapacity: Buffer size (automatically rounded to next power of 2)
Returns: Result<Link<T>, HorusError>
Default capacity: 1024 messages
Example:
// High-frequency control loop needs larger buffer
let output: Link<f32> = Link::producer_with_capacity("fast_data", 4096)?;
Link::consumer_with_capacity(topic, capacity)
Create a consumer with custom buffer capacity.
pub fn consumer_with_capacity(
topic: &str,
capacity: usize
) -> HorusResult<Self>
Parameters:
topic: Name of the linkcapacity: Buffer size (must match producer's capacity)
Returns: Result<Link<T>, HorusError>
Example:
let input: Link<f32> = Link::consumer_with_capacity("fast_data", 4096)?;
Important: Both producer and consumer must use the same capacity!
Sending Messages (Producer Only)
send(msg, ctx)
Send a message to the consumer (non-blocking).
pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>
Parameters:
msg: Message to send (moved, then cloned internally)ctx: Optional NodeInfo for automatic logging
Returns:
Ok(()): Message sent successfullyErr(msg): Buffer full (returns original message)
Performance: ~89ns for send operation alone, ~389ns full round-trip
Example:
// Without logging
link.send(42.0, None).ok();
// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Err(data) = link.send(42.0, ctx) {
eprintln!("Buffer full! Dropped: {}", data);
}
}
Error Handling:
match link.send(data, ctx) {
Ok(()) => {
// Message sent successfully
}
Err(original_data) => {
// Buffer full - consumer isn't reading fast enough
// original_data contains the message that couldn't be sent
eprintln!("Warning: Dropping message");
}
}
Receiving Messages (Consumer Only)
recv(ctx)
Receive a message from the producer (non-blocking).
pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>
Parameters:
ctx: Optional NodeInfo for automatic logging
Returns:
Some(msg): Message receivedNone: No message available (not an error)
Performance: ~300ns receive operation (estimated)
Example:
// Without logging
if let Some(value) = link.recv(None) {
println!("Received: {}", value);
}
// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(value) = link.recv(ctx) {
self.process(value);
}
}
Best Practices:
- Always check
recv()every tick for responsive nodes Noneis normal - means no new messages- Messages are cloned when received
Utility Methods
is_producer()
Check if this Link end is a producer.
pub fn is_producer(&self) -> bool
Returns: true if producer, false if consumer
Example:
if link.is_producer() {
link.send(data, None).ok();
}
is_consumer()
Check if this Link end is a consumer.
pub fn is_consumer(&self) -> bool
Returns: true if consumer, false if producer
Example:
if link.is_consumer() {
if let Some(data) = link.recv(None) {
println!("Got: {:?}", data);
}
}
role()
Get the role of this Link end.
pub fn role(&self) -> LinkRole
Returns: LinkRole::Producer or LinkRole::Consumer
Example:
match link.role() {
LinkRole::Producer => println!("I'm a producer"),
LinkRole::Consumer => println!("I'm a consumer"),
}
get_topic_name()
Get the topic name for this link.
pub fn get_topic_name(&self) -> &str
Returns: Topic name as string slice
Example:
println!("Topic: {}", link.get_topic_name());
capacity()
Get the buffer capacity.
pub fn capacity(&self) -> usize
Returns: Buffer capacity (power of 2)
Example:
println!("Buffer can hold {} messages", link.capacity());
has_messages()
Check if messages are available (consumer only).
pub fn has_messages(&self) -> bool
Returns: true if messages are available to read
Example:
if link.has_messages() {
let msg = link.recv(None).unwrap();
}
Advanced API
loan() - Zero-Copy Publishing
Loan a slot in shared memory for zero-copy writing (producer only).
pub fn loan(&self) -> Result<LinkSample<T>, &'static str>
Returns: LinkSample<T> - a write handle to shared memory
Use when: Avoiding clone overhead for large messages
Example:
if let Ok(sample) = link.loan() {
// Write directly to shared memory (no clone!)
sample.write(large_message);
// Message automatically published when sample is dropped
}
Performance: Eliminates clone overhead (~10-50% faster for large messages)
Valid Message Types
Primitives
Link::<f32>::producer("float")?;
Link::<f64>::producer("double")?;
Link::<i32>::producer("int")?;
Link::<u32>::producer("uint")?;
Link::<bool>::producer("bool")?;
Arrays
Link::<[f32; 3]>::producer("position")?;
Link::<[u8; 1024]>::producer("buffer")?;
Structs
Must implement Clone + Debug:
#[derive(Clone, Debug)]
struct MotorCommand {
voltage: f32,
enable: bool,
}
let cmd_link: Link<MotorCommand> = Link::producer("motor_cmd")?;
Standard Messages
HORUS provides pre-defined message types:
use horus::prelude::*;
use horus_library::messages::{CmdVel, LaserScan, Imu};
let cmd_link: Link<CmdVel> = Link::producer("cmd_vel")?;
let scan_link: Link<LaserScan> = Link::producer("scan")?;
let imu_link: Link<Imu> = Link::producer("imu")?;
Communication Patterns
Point-to-Point Control Loop
Typical motor controller with encoder feedback:
// Motor driver node (produces encoder data, consumes commands)
struct MotorDriver {
cmd_input: Link<MotorCommand>, // Consumer
encoder_output: Link<EncoderData>, // Producer
}
impl Node for MotorDriver {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Receive command
if let Some(cmd) = self.cmd_input.recv(ctx) {
self.apply_voltage(cmd.voltage);
}
// Send encoder feedback
let reading = self.read_encoder();
self.encoder_output.send(reading, ctx).ok();
}
}
// Controller node (consumes encoder data, produces commands)
struct Controller {
encoder_input: Link<EncoderData>, // Consumer
cmd_output: Link<MotorCommand>, // Producer
}
impl Node for Controller {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Receive feedback
if let Some(reading) = self.encoder_input.recv(ctx) {
// PD control
let cmd = self.compute_control(reading);
// Send command
self.cmd_output.send(cmd, ctx).ok();
}
}
}
Sensor Processing Pipeline
Chain processing nodes with minimal latency:
// Raw sensor -> Preprocessor -> Filter -> Output
struct SensorNode {
output: Link<RawData>,
}
struct PreprocessorNode {
input: Link<RawData>,
output: Link<ProcessedData>,
}
struct FilterNode {
input: Link<ProcessedData>,
output: Link<FilteredData>,
}
Bidirectional Communication
Two nodes communicating in both directions:
struct NodeA {
to_b: Link<RequestMsg>,
from_b: Link<ResponseMsg>,
}
struct NodeB {
from_a: Link<RequestMsg>,
to_a: Link<ResponseMsg>,
}
Performance Characteristics
Latency
| Message Size | Link Latency | Hub Latency | Link Advantage |
|---|---|---|---|
| 16B (CmdVel) | 389ns | 606ns | 1.56x faster |
| 304B (IMU) | ~600ns | ~800ns | 1.33x faster |
| 1.5KB (LaserScan) | ~1.2µs | ~1.8µs | 1.5x faster |
Latency breakdown:
- Send operation: ~89ns
- Atomic ordering: ~50ns
- Receive operation: ~250ns
- Total round-trip: ~389ns
Comparison with ROS 2
| System | Latency | Speedup |
|---|---|---|
| ROS 2 Intra-process | 50-100µs | 100-250x |
| ROS 2 Inter-process | 200-500µs | 500-1300x |
| HORUS Link | 0.39µs | Baseline |
| HORUS Hub | 0.61µs | 0.64x |
When Link Matters
Link's performance advantage is critical for:
- Control loops >1kHz (1ms cycle time or faster)
- Sensor fusion with high-frequency IMU (100Hz-1kHz)
- Motor control with encoder feedback
- Vision processing pipelines with frame handoff
- Real-time planning with feedback loops
Best Practices
Topic Naming
Use descriptive names that indicate the data flow:
// Good
Link::producer("motor_cmd")?;
Link::producer("encoder_feedback")?;
Link::producer("vision_to_planning")?;
// Bad
Link::producer("data")?; // Too vague
Link::producer("link1")?; // Not descriptive
Error Handling
Always handle send errors (buffer full):
if let Err(data) = link.send(data, ctx) {
eprintln!("Warning: Link buffer full, dropping message");
// Consider: increase buffer size, slow down producer, or speed up consumer
}
Buffer Sizing
Choose capacity based on your loop rate:
// High-frequency (1kHz) with occasional consumer slowdown
Link::producer_with_capacity("data", 2048)?; // 2 seconds of buffer
// Normal frequency (100Hz)
Link::producer_with_capacity("data", 1024)?; // Default - 10 seconds buffer
// Low frequency (10Hz)
Link::producer_with_capacity("data", 128)?; // 12 seconds buffer
Rule of thumb: capacity = loop_rate_hz * max_consumer_delay_seconds
Type Safety
Producer and consumer must use the same type:
// Producer
let producer: Link<f32> = Link::producer("velocity")?;
// Consumer - must match type!
let consumer: Link<f32> = Link::consumer("velocity")?;
Message Ordering
Link guarantees FIFO ordering (First In First Out):
// Producer sends: A, B, C
producer.send(msg_a, None).ok();
producer.send(msg_b, None).ok();
producer.send(msg_c, None).ok();
// Consumer receives: A, B, C (same order)
let a = consumer.recv(None); // msg_a
let b = consumer.recv(None); // msg_b
let c = consumer.recv(None); // msg_c
Common Patterns
Buffered Consumer
Always maintain last known value:
struct ControllerNode {
encoder_input: Link<f32>,
last_reading: Option<f32>,
}
impl Node for ControllerNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
// Update cache if new message available
if let Some(reading) = self.encoder_input.recv(ctx) {
self.last_reading = Some(reading);
}
// Always have access to last value
if let Some(reading) = self.last_reading {
self.compute_control(reading);
}
}
}
Conditional Sending
Only send when value changes significantly:
struct SensorNode {
output: Link<f32>,
last_sent: f32,
}
impl Node for SensorNode {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
let value = self.read_sensor();
// Only send if changed by >1%
if (value - self.last_sent).abs() > 0.01 {
self.output.send(value, ctx).ok();
self.last_sent = value;
}
}
}
Timeout Detection
Detect when producer stops sending:
use std::time::{Duration, Instant};
struct WatchdogConsumer {
input: Link<f32>,
last_recv: Instant,
}
impl Node for WatchdogConsumer {
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
if let Some(value) = self.input.recv(ctx) {
self.last_recv = Instant::now();
self.process(value);
}
// Check for timeout
if self.last_recv.elapsed() > Duration::from_millis(100) {
eprintln!("Warning: No data for 100ms!");
}
}
}
Shared Memory Details
Location
Links store data in /dev/shm (Linux shared memory):
# View HORUS Link shared memory
ls -lh /dev/shm/horus/topics/horus_links_*
Cleanup
HORUS automatically cleans up on graceful shutdown (Ctrl+C).
Manual cleanup:
rm -f /dev/shm/horus/topics/horus_links_*
Size Calculation
Memory usage per Link:
size = header (128 bytes) + (capacity * sizeof(T))
Example:
// Link<f32> with capacity 1024
// size = 128 + (1024 * 4) = 4224 bytes (~4 KB)
// Link<LaserScan> (1.5KB) with capacity 1024
// size = 128 + (1024 * 1536) = 1,573,120 bytes (~1.5 MB)
Troubleshooting
Buffer Full Errors
Symptom: send() returns Err(msg)
Causes:
- Consumer not reading fast enough
- Producer sending too fast
- Buffer too small
Solutions:
// 1. Increase buffer size
Link::producer_with_capacity("data", 4096)?;
// 2. Add error handling
if let Err(data) = link.send(data, ctx) {
eprintln!("Dropped: {:?}", data);
}
// 3. Slow down producer
thread::sleep(Duration::from_millis(1));
No Messages Received
Symptom: recv() always returns None
Checklist:
- Producer and consumer use same topic name?
- Producer sending before consumer receiving?
- Both using same type
T? - Producer calling
send()?
Type Mismatch
Symptom: Compile error when creating consumer
Fix: Ensure exact type match:
// Producer
let p: Link<f32> = Link::producer("data")?;
// Consumer - MUST be f32, not f64!
let c: Link<f32> = Link::consumer("data")?;
Link vs Hub Decision Guide
| Use Link When | Use Hub When |
|---|---|
| 1-to-1 communication | 1-to-many broadcasting |
| Latency <1µs required | Latency <10µs acceptable |
| Fixed producer/consumer | Dynamic subscribers |
| Control loops >100Hz | General pub/sub |
| Critical path | Non-critical path |
Example scenarios:
Use Link:
- Motor encoder Controller (1kHz control loop)
- Camera Vision processor (30Hz frame processing)
- IMU State estimator (100Hz sensor fusion)
Use Hub:
- Sensor Multiple loggers/visualizers
- Status updates Dashboard + Recorder
- Commands Multiple actuators
Complete Example
See examples/link_motor_control.rs for a full working example of Link in a PD control loop:
//! Real-world motor control with Link
//!
//! Demonstrates:
//! - Bidirectional communication (commands + feedback)
//! - 1kHz control loop
//! - PD controller implementation
//! - Buffer management
use horus::prelude::*;
// Run with:
// cargo run --example link_motor_control
See Also
- Hub API Reference - Many-to-many pub/sub
- Node API Reference - Node implementation
- Scheduler API Reference - Node orchestration
- Benchmarks - Performance analysis