Link API Reference

The Link<T> provides ultra-low latency point-to-point (Single Producer Single Consumer) communication through shared memory.

When to use Link vs Hub:

  • Use Link for: Tight control loops, 1-to-1 communication, critical latency paths
  • Use Hub for: Broadcasting, multiple subscribers, flexible topologies

Performance advantage: Link is ~1.56x faster than Hub (389ns vs 606ns round-trip)

Link::producer(topic_name)

Create a Link as a producer (sender).

pub fn producer(topic: &str) -> HorusResult<Self>

Parameters:

  • topic: Name of the link (string)

Returns: Result<Link<T>, HorusError>

Example:

use horus::prelude::*;

let sensor_output: Link<f32> = Link::producer("sensor_data")?;

Type Constraints:

  • T must implement Clone + Debug + Send + Sync

Link::consumer(topic_name)

Create a Link as a consumer (receiver).

pub fn consumer(topic: &str) -> HorusResult<Self>

Parameters:

  • topic: Name of the link (must match producer's topic)

Returns: Result<Link<T>, HorusError>

Example:

use horus::prelude::*;

let sensor_input: Link<f32> = Link::consumer("sensor_data")?;

Link::producer_with_capacity(topic, capacity)

Create a producer with custom buffer capacity.

pub fn producer_with_capacity(
    topic: &str,
    capacity: usize
) -> HorusResult<Self>

Parameters:

  • topic: Name of the link
  • capacity: Buffer size (automatically rounded to next power of 2)

Returns: Result<Link<T>, HorusError>

Default capacity: 1024 messages

Example:

// High-frequency control loop needs larger buffer
let output: Link<f32> = Link::producer_with_capacity("fast_data", 4096)?;

Link::consumer_with_capacity(topic, capacity)

Create a consumer with custom buffer capacity.

pub fn consumer_with_capacity(
    topic: &str,
    capacity: usize
) -> HorusResult<Self>

Parameters:

  • topic: Name of the link
  • capacity: Buffer size (must match producer's capacity)

Returns: Result<Link<T>, HorusError>

Example:

let input: Link<f32> = Link::consumer_with_capacity("fast_data", 4096)?;

Important: Both producer and consumer must use the same capacity!

Sending Messages (Producer Only)

send(msg, ctx)

Send a message to the consumer (non-blocking).

pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T>

Parameters:

  • msg: Message to send (moved, then cloned internally)
  • ctx: Optional NodeInfo for automatic logging

Returns:

  • Ok(()): Message sent successfully
  • Err(msg): Buffer full (returns original message)

Performance: ~89ns for send operation alone, ~389ns full round-trip

Example:

// Without logging
link.send(42.0, None).ok();

// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    if let Err(data) = link.send(42.0, ctx) {
        eprintln!("Buffer full! Dropped: {}", data);
    }
}

Error Handling:

match link.send(data, ctx) {
    Ok(()) => {
        // Message sent successfully
    }
    Err(original_data) => {
        // Buffer full - consumer isn't reading fast enough
        // original_data contains the message that couldn't be sent
        eprintln!("Warning: Dropping message");
    }
}

Receiving Messages (Consumer Only)

recv(ctx)

Receive a message from the producer (non-blocking).

pub fn recv(&self, ctx: Option<&mut NodeInfo>) -> Option<T>

Parameters:

  • ctx: Optional NodeInfo for automatic logging

Returns:

  • Some(msg): Message received
  • None: No message available (not an error)

Performance: ~300ns receive operation (estimated)

Example:

// Without logging
if let Some(value) = link.recv(None) {
    println!("Received: {}", value);
}

// With automatic logging (recommended)
fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
    if let Some(value) = link.recv(ctx) {
        self.process(value);
    }
}

Best Practices:

  • Always check recv() every tick for responsive nodes
  • None is normal - means no new messages
  • Messages are cloned when received

Utility Methods

is_producer()

Check if this Link end is a producer.

pub fn is_producer(&self) -> bool

Returns: true if producer, false if consumer

Example:

if link.is_producer() {
    link.send(data, None).ok();
}

is_consumer()

Check if this Link end is a consumer.

pub fn is_consumer(&self) -> bool

Returns: true if consumer, false if producer

Example:

if link.is_consumer() {
    if let Some(data) = link.recv(None) {
        println!("Got: {:?}", data);
    }
}

role()

Get the role of this Link end.

pub fn role(&self) -> LinkRole

Returns: LinkRole::Producer or LinkRole::Consumer

Example:

match link.role() {
    LinkRole::Producer => println!("I'm a producer"),
    LinkRole::Consumer => println!("I'm a consumer"),
}

get_topic_name()

Get the topic name for this link.

pub fn get_topic_name(&self) -> &str

Returns: Topic name as string slice

Example:

println!("Topic: {}", link.get_topic_name());

capacity()

Get the buffer capacity.

pub fn capacity(&self) -> usize

Returns: Buffer capacity (power of 2)

Example:

println!("Buffer can hold {} messages", link.capacity());

has_messages()

Check if messages are available (consumer only).

pub fn has_messages(&self) -> bool

Returns: true if messages are available to read

Example:

if link.has_messages() {
    let msg = link.recv(None).unwrap();
}

Advanced API

loan() - Zero-Copy Publishing

Loan a slot in shared memory for zero-copy writing (producer only).

pub fn loan(&self) -> Result<LinkSample<T>, &'static str>

Returns: LinkSample<T> - a write handle to shared memory

Use when: Avoiding clone overhead for large messages

Example:

if let Ok(sample) = link.loan() {
    // Write directly to shared memory (no clone!)
    sample.write(large_message);
    // Message automatically published when sample is dropped
}

Performance: Eliminates clone overhead (~10-50% faster for large messages)

Valid Message Types

Primitives

Link::<f32>::producer("float")?;
Link::<f64>::producer("double")?;
Link::<i32>::producer("int")?;
Link::<u32>::producer("uint")?;
Link::<bool>::producer("bool")?;

Arrays

Link::<[f32; 3]>::producer("position")?;
Link::<[u8; 1024]>::producer("buffer")?;

Structs

Must implement Clone + Debug:

#[derive(Clone, Debug)]
struct MotorCommand {
    voltage: f32,
    enable: bool,
}

let cmd_link: Link<MotorCommand> = Link::producer("motor_cmd")?;

Standard Messages

HORUS provides pre-defined message types:

use horus::prelude::*;
use horus_library::messages::{CmdVel, LaserScan, Imu};

let cmd_link: Link<CmdVel> = Link::producer("cmd_vel")?;
let scan_link: Link<LaserScan> = Link::producer("scan")?;
let imu_link: Link<Imu> = Link::producer("imu")?;

Communication Patterns

Point-to-Point Control Loop

Typical motor controller with encoder feedback:

// Motor driver node (produces encoder data, consumes commands)
struct MotorDriver {
    cmd_input: Link<MotorCommand>,      // Consumer
    encoder_output: Link<EncoderData>,  // Producer
}

impl Node for MotorDriver {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Receive command
        if let Some(cmd) = self.cmd_input.recv(ctx) {
            self.apply_voltage(cmd.voltage);
        }

        // Send encoder feedback
        let reading = self.read_encoder();
        self.encoder_output.send(reading, ctx).ok();
    }
}

// Controller node (consumes encoder data, produces commands)
struct Controller {
    encoder_input: Link<EncoderData>,   // Consumer
    cmd_output: Link<MotorCommand>,     // Producer
}

impl Node for Controller {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Receive feedback
        if let Some(reading) = self.encoder_input.recv(ctx) {
            // PD control
            let cmd = self.compute_control(reading);

            // Send command
            self.cmd_output.send(cmd, ctx).ok();
        }
    }
}

Sensor Processing Pipeline

Chain processing nodes with minimal latency:

// Raw sensor -> Preprocessor -> Filter -> Output
struct SensorNode {
    output: Link<RawData>,
}

struct PreprocessorNode {
    input: Link<RawData>,
    output: Link<ProcessedData>,
}

struct FilterNode {
    input: Link<ProcessedData>,
    output: Link<FilteredData>,
}

Bidirectional Communication

Two nodes communicating in both directions:

struct NodeA {
    to_b: Link<RequestMsg>,
    from_b: Link<ResponseMsg>,
}

struct NodeB {
    from_a: Link<RequestMsg>,
    to_a: Link<ResponseMsg>,
}

Performance Characteristics

Latency

Message SizeLink LatencyHub LatencyLink Advantage
16B (CmdVel)389ns606ns1.56x faster
304B (IMU)~600ns~800ns1.33x faster
1.5KB (LaserScan)~1.2µs~1.8µs1.5x faster

Latency breakdown:

  • Send operation: ~89ns
  • Atomic ordering: ~50ns
  • Receive operation: ~250ns
  • Total round-trip: ~389ns

Comparison with ROS 2

SystemLatencySpeedup
ROS 2 Intra-process50-100µs100-250x
ROS 2 Inter-process200-500µs500-1300x
HORUS Link0.39µsBaseline
HORUS Hub0.61µs0.64x

Link's performance advantage is critical for:

  • Control loops >1kHz (1ms cycle time or faster)
  • Sensor fusion with high-frequency IMU (100Hz-1kHz)
  • Motor control with encoder feedback
  • Vision processing pipelines with frame handoff
  • Real-time planning with feedback loops

Best Practices

Topic Naming

Use descriptive names that indicate the data flow:

// Good
Link::producer("motor_cmd")?;
Link::producer("encoder_feedback")?;
Link::producer("vision_to_planning")?;

// Bad
Link::producer("data")?;  // Too vague
Link::producer("link1")?; // Not descriptive

Error Handling

Always handle send errors (buffer full):

if let Err(data) = link.send(data, ctx) {
    eprintln!("Warning: Link buffer full, dropping message");
    // Consider: increase buffer size, slow down producer, or speed up consumer
}

Buffer Sizing

Choose capacity based on your loop rate:

// High-frequency (1kHz) with occasional consumer slowdown
Link::producer_with_capacity("data", 2048)?;  // 2 seconds of buffer

// Normal frequency (100Hz)
Link::producer_with_capacity("data", 1024)?;  // Default - 10 seconds buffer

// Low frequency (10Hz)
Link::producer_with_capacity("data", 128)?;   // 12 seconds buffer

Rule of thumb: capacity = loop_rate_hz * max_consumer_delay_seconds

Type Safety

Producer and consumer must use the same type:

// Producer
let producer: Link<f32> = Link::producer("velocity")?;

// Consumer - must match type!
let consumer: Link<f32> = Link::consumer("velocity")?;

Message Ordering

Link guarantees FIFO ordering (First In First Out):

// Producer sends: A, B, C
producer.send(msg_a, None).ok();
producer.send(msg_b, None).ok();
producer.send(msg_c, None).ok();

// Consumer receives: A, B, C (same order)
let a = consumer.recv(None); // msg_a
let b = consumer.recv(None); // msg_b
let c = consumer.recv(None); // msg_c

Common Patterns

Buffered Consumer

Always maintain last known value:

struct ControllerNode {
    encoder_input: Link<f32>,
    last_reading: Option<f32>,
}

impl Node for ControllerNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        // Update cache if new message available
        if let Some(reading) = self.encoder_input.recv(ctx) {
            self.last_reading = Some(reading);
        }

        // Always have access to last value
        if let Some(reading) = self.last_reading {
            self.compute_control(reading);
        }
    }
}

Conditional Sending

Only send when value changes significantly:

struct SensorNode {
    output: Link<f32>,
    last_sent: f32,
}

impl Node for SensorNode {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        let value = self.read_sensor();

        // Only send if changed by >1%
        if (value - self.last_sent).abs() > 0.01 {
            self.output.send(value, ctx).ok();
            self.last_sent = value;
        }
    }
}

Timeout Detection

Detect when producer stops sending:

use std::time::{Duration, Instant};

struct WatchdogConsumer {
    input: Link<f32>,
    last_recv: Instant,
}

impl Node for WatchdogConsumer {
    fn tick(&mut self, ctx: Option<&mut NodeInfo>) {
        if let Some(value) = self.input.recv(ctx) {
            self.last_recv = Instant::now();
            self.process(value);
        }

        // Check for timeout
        if self.last_recv.elapsed() > Duration::from_millis(100) {
            eprintln!("Warning: No data for 100ms!");
        }
    }
}

Shared Memory Details

Location

Links store data in /dev/shm (Linux shared memory):

# View HORUS Link shared memory
ls -lh /dev/shm/horus/topics/horus_links_*

Cleanup

HORUS automatically cleans up on graceful shutdown (Ctrl+C).

Manual cleanup:

rm -f /dev/shm/horus/topics/horus_links_*

Size Calculation

Memory usage per Link:

size = header (128 bytes) + (capacity * sizeof(T))

Example:

// Link<f32> with capacity 1024
// size = 128 + (1024 * 4) = 4224 bytes (~4 KB)

// Link<LaserScan> (1.5KB) with capacity 1024
// size = 128 + (1024 * 1536) = 1,573,120 bytes (~1.5 MB)

Troubleshooting

Buffer Full Errors

Symptom: send() returns Err(msg)

Causes:

  1. Consumer not reading fast enough
  2. Producer sending too fast
  3. Buffer too small

Solutions:

// 1. Increase buffer size
Link::producer_with_capacity("data", 4096)?;

// 2. Add error handling
if let Err(data) = link.send(data, ctx) {
    eprintln!("Dropped: {:?}", data);
}

// 3. Slow down producer
thread::sleep(Duration::from_millis(1));

No Messages Received

Symptom: recv() always returns None

Checklist:

  1. Producer and consumer use same topic name?
  2. Producer sending before consumer receiving?
  3. Both using same type T?
  4. Producer calling send()?

Type Mismatch

Symptom: Compile error when creating consumer

Fix: Ensure exact type match:

// Producer
let p: Link<f32> = Link::producer("data")?;

// Consumer - MUST be f32, not f64!
let c: Link<f32> = Link::consumer("data")?;
Use Link WhenUse Hub When
1-to-1 communication1-to-many broadcasting
Latency <1µs requiredLatency <10µs acceptable
Fixed producer/consumerDynamic subscribers
Control loops >100HzGeneral pub/sub
Critical pathNon-critical path

Example scenarios:

Use Link:

  • Motor encoder Controller (1kHz control loop)
  • Camera Vision processor (30Hz frame processing)
  • IMU State estimator (100Hz sensor fusion)

Use Hub:

  • Sensor Multiple loggers/visualizers
  • Status updates Dashboard + Recorder
  • Commands Multiple actuators

Complete Example

See examples/link_motor_control.rs for a full working example of Link in a PD control loop:

//! Real-world motor control with Link
//!
//! Demonstrates:
//! - Bidirectional communication (commands + feedback)
//! - 1kHz control loop
//! - PD controller implementation
//! - Buffer management

use horus::prelude::*;

// Run with:
// cargo run --example link_motor_control

See Also