Shared Memory IPC

HORUS achieves 296ns-2.8µs latency through a zero-copy shared memory IPC architecture. This page explains how HORUS uses /dev/shm for ultra-low latency inter-process communication.

What is Shared Memory IPC?

Shared memory is a region of RAM that multiple processes can access simultaneously. Unlike network-based communication (TCP/UDP) or message queues, shared memory:

Eliminates serialization: No conversion to/from bytes

Eliminates copying: Data written once, read directly

Enables zero-copy semantics: Loan pattern for minimal allocations

Provides deterministic latency: No network stack overhead

Scales linearly: Latency proportional to message size

Architecture Overview

Storage Location

HORUS stores shared memory segments in:

/dev/shm/horus/topics/

Why /dev/shm?

RAM-backed: Stored in RAM, not disk

Fast access: Direct memory operations

Kernel-managed: Operating system handles memory mapping

Standard location: Available on all modern Linux systems

Automatic cleanup: Can be cleaned manually if needed

File Naming Convention

Shared memory topics follow this naming:

/dev/shm/horus/topics/horus_{topic_name}

Examples:

/dev/shm/horus/topics/horus_cmd_vel
/dev/shm/horus/topics/horus_laser_scan
/dev/shm/horus/topics/horus_sensor_lidar  # '/' becomes '_'

Topic name sanitization:

  • / characters become _
  • : characters become _
  • Creates safe filesystem names

ShmRegion: Memory-Mapped Files

What is ShmRegion?

ShmRegion is the low-level abstraction for creating and managing memory-mapped files in /dev/shm.

Structure

pub struct ShmRegion {
    mmap: MmapMut,      // Memory-mapped region
    size: usize,        // Size in bytes
    path: PathBuf,      // Path to /dev/shm file
    _file: File,        // Underlying file handle
    name: String,       // Topic name
    owner: bool,        // Created this region?
}

Creating a Region

// Create or open shared memory region
let region = ShmRegion::new("my_topic", 4096)?;

Behavior:

  • Creates /dev/shm/horus/topics/horus_my_topic
  • Allocates 4096 bytes
  • Zero-initializes memory if new
  • Reuses existing memory if already created

Opening Existing Region

// Open existing shared memory (no creation)
let region = ShmRegion::open("my_topic")?;

Behavior:

  • Returns error if topic doesn't exist
  • Maps existing memory
  • Detects size automatically

Ownership

The first process to create a region is the owner:

if region.is_owner() {
    println!("I created this shared memory");
} else {
    println!("I'm using existing shared memory");
}

Owner responsibilities:

  • Initializes memory to zero
  • Optionally cleans up on exit (currently disabled for debugging)

ShmTopic: Lock-Free Ring Buffer

What is ShmTopic?

ShmTopic<T> implements a lock-free ring buffer in shared memory with:

Multi-producer support: Multiple publishers can write concurrently

Multi-consumer support: Multiple subscribers each track their own position

Cache-line alignment: 64-byte alignment prevents false sharing

Zero-copy loan pattern: Direct memory access without allocations

Atomic operations: Thread-safe without locks

Structure

#[repr(align(64))]  // Cache-line aligned
pub struct ShmTopic<T> {
    _region: Arc<ShmRegion>,
    header: NonNull<RingBufferHeader>,
    data_ptr: NonNull<u8>,
    capacity: usize,
    consumer_tail: AtomicUsize,  // Per-consumer position
    _phantom: PhantomData<T>,
    _padding: [u8; 24],  // Prevent false sharing
}

Ring Buffer Header

#[repr(C, align(64))]  // Cache-line aligned
struct RingBufferHeader {
    capacity: AtomicUsize,
    head: AtomicUsize,              // Producer write position
    tail: AtomicUsize,              // Legacy (unused)
    element_size: AtomicUsize,
    consumer_count: AtomicUsize,    // Number of subscribers
    sequence_number: AtomicUsize,   // Global message counter
    _padding: [u8; 24],             // Cache-line padding
}

Why cache-line alignment?

Modern CPUs fetch memory in 64-byte cache lines. Aligning to 64 bytes:

  • Prevents false sharing between fields
  • Optimizes atomic operations
  • Reduces cache invalidations
  • Improves multi-core performance

Memory Layout

─────────────────────────────────────
  RingBufferHeader (64 bytes)          ← Aligned to 64 bytes
─────────────────────────────────────
  Padding (for data alignment)                              
─────────────────────────────────────
  Data Slot 0                                               
─────────────────────────────────────
  Data Slot 1                                               
─────────────────────────────────────
  ...                                                       
─────────────────────────────────────
  Data Slot N-1                                             
─────────────────────────────────────

Zero-Copy Loan Pattern

Traditional Approach (with copying)

// Message is copied multiple times
let msg = MyData { x: 1.0, y: 2.0 };
let bytes = serialize(msg);      // Copy 1
send_over_network(bytes);        // Copy 2
let received = deserialize(bytes); // Copy 3

HORUS Loan Pattern (zero-copy)

// Message written once to shared memory
let mut sample = topic.loan()?;  // Claim a slot
sample.write(MyData { x: 1.0, y: 2.0 });  // Write directly to shared memory
drop(sample);  // Automatically publishes when dropped

Zero copies: Data written once, read directly from shared memory

PublisherSample

pub struct PublisherSample<'a, T> {
    data_ptr: *mut T,
    slot_index: usize,
    topic: &'a ShmTopic<T>,
    _phantom: PhantomData<&'a mut T>,
}

Methods:

// Write data to shared memory
sample.write(value);

// Get mutable pointer
let ptr = sample.as_mut_ptr();

// Get mutable reference (unsafe)
unsafe {
    let data_ref = sample.as_mut();
    data_ref.x = 1.0;
}

Automatic publishing: When PublisherSample is dropped, it increments the global sequence number, making the message visible to subscribers.

ConsumerSample

pub struct ConsumerSample<'a, T> {
    data_ptr: *const T,
    slot_index: usize,
    topic: &'a ShmTopic<T>,
    _phantom: PhantomData<&'a T>,
}

Methods:

// Get reference to data
let data_ref = sample.get_ref();

// Get const pointer
let ptr = sample.as_ptr();

// Copy data (for Copy types)
let data_copy = sample.read();

Non-destructive reads: Multiple subscribers can read the same message.

Lock-Free Operations

Publishing (push/loan)

pub fn loan(&self) -> Result<PublisherSample<T>, &'static str> {
    let header = unsafe { self.header.as_ref() };

    loop {
        let head = header.head.load(Ordering::Relaxed);
        let next = (head + 1) % self.capacity;

        // Check buffer capacity
        let current_sequence = header.sequence_number.load(Ordering::Relaxed);
        let max_unread = (self.capacity * 3) / 4;  // 75% fill limit

        if current_sequence >= max_unread {
            return Err("Buffer full");
        }

        // Atomic compare-and-swap to claim slot
        match header.head.compare_exchange_weak(
            head,
            next,
            Ordering::Acquire,
            Ordering::Relaxed,
        ) {
            Ok(_) => {
                // Successfully claimed slot
                return Ok(PublisherSample { ... });
            }
            Err(_) => {
                // Retry if another thread claimed it first
                continue;
            }
        }
    }
}

Key Points:

  • Lock-free: Uses atomic compare-and-swap
  • Wait-free progress: Retry until success
  • Multi-producer safe: Multiple publishers don't conflict
  • 75% fill limit: Prevents overwriting unread messages

Subscribing (pop/receive)

pub fn receive(&self) -> Option<ConsumerSample<T>> {
    let header = unsafe { self.header.as_ref() };

    let my_tail = self.consumer_tail.load(Ordering::Relaxed);
    let current_head = header.head.load(Ordering::Acquire);

    if my_tail == current_head {
        return None;  // No new messages
    }

    // Move this consumer's position forward
    let next_tail = (my_tail + 1) % self.capacity;
    self.consumer_tail.store(next_tail, Ordering::Relaxed);

    // Return sample pointing to message in shared memory
    Some(ConsumerSample { ... })
}

Key Points:

  • Per-consumer tracking: Each subscriber has independent position
  • Non-destructive: Message stays for other subscribers
  • Lock-free: Atomic operations only
  • Multi-consumer safe: Subscribers don't interfere

Multi-Consumer Architecture

How Multiple Subscribers Work

Each subscriber maintains its own consumer_tail position:

Publisher writes:    HEAD  [0] [1] [2] [3] [4]

Subscriber A:        TAIL_A  [0]  (just joined)
Subscriber B:        TAIL_B  [2]  (caught up partially)
Subscriber C:        TAIL_C  [4]  (fully caught up)

Each subscriber:

  • Tracks its own position independently
  • Can join at any time (starts from current HEAD)
  • Reads at its own pace
  • Doesn't affect other subscribers

Buffer Fill Management

To prevent overwriting unread messages:

let max_unread = (self.capacity * 3) / 4;  // 75% fill limit

Why 75% limit?

  • Allows slower subscribers to catch up
  • Prevents buffer wraparound issues
  • Trades capacity for safety
  • Ensures deterministic behavior

What happens when full?

  • push() returns Err(msg) with original message
  • loan() returns Err("Buffer full")
  • Publishers can retry or drop message
  • Subscribers continue reading

Safety Features

Comprehensive Bounds Checking

Every memory access is validated:

// Validate index is in bounds
if head >= self.capacity {
    panic!("Critical safety violation: head index >= capacity");
}

// Validate byte offset is in bounds
let byte_offset = head * mem::size_of::<T>();
let data_region_size = self.capacity * mem::size_of::<T>();
if byte_offset + mem::size_of::<T>() > data_region_size {
    panic!("Critical safety violation: write would exceed bounds");
}

Capacity Limits

Safety constants prevent dangerous configurations:

const MAX_CAPACITY: usize = 1_000_000;        // Max elements
const MIN_CAPACITY: usize = 1;                // Min elements
const MAX_ELEMENT_SIZE: usize = 1_000_000;    // Max size per element
const MAX_TOTAL_SIZE: usize = 100_000_000;    // Max total (100MB)

Validation:

if capacity > MAX_CAPACITY {
    return Err("Capacity too large");
}
if element_size > MAX_ELEMENT_SIZE {
    return Err("Element size too large");
}

Type Safety

Element size is validated when opening existing topics:

let stored_element_size = header.element_size.load(Ordering::Relaxed);
let expected_element_size = mem::size_of::<T>();
if stored_element_size != expected_element_size {
    return Err("Element size mismatch");
}

Prevents:

  • Opening topic with wrong type
  • Mismatched publisher/subscriber types
  • Memory corruption from type confusion

Performance Optimizations

Cache-Line Alignment

#[repr(align(64))]

Benefits:

  • Prevents false sharing between cores
  • Optimizes atomic operations
  • Reduces cache invalidations
  • Each field gets its own cache line

Atomic Operations

Using appropriate memory ordering:

// Relaxed for non-critical reads
let head = header.head.load(Ordering::Relaxed);

// Acquire for critical synchronization
let current_head = header.head.load(Ordering::Acquire);

// Release when publishing
header.head.compare_exchange_weak(..., Ordering::Release, ...);

Why different orderings?

  • Relaxed: Fastest, no synchronization guarantees
  • Acquire: Synchronizes with Release operations
  • Release: Makes all previous writes visible
  • Balanced for performance and correctness

Zero-Copy Semantics

No allocations in the hot path:

// No allocations - just pointer arithmetic
let sample = topic.loan()?;  // Returns stack-allocated sample
sample.write(data);          // Writes directly to shared memory
// Drop publishes (no allocation)

Hub Integration

Hub<T> uses ShmTopic internally:

pub struct Hub<T> {
    shm_topic: Arc<ShmTopic<T>>,  // Shared memory topic
    // ... other fields
}

pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T> {
    let msg_clone = msg.clone();

    let ipc_ns = match self.shm_topic.loan() {
        Ok(mut sample) => {
            let ipc_start = Instant::now();
            sample.write(msg_clone);  // Zero-copy write
            drop(sample);             // Automatic publish
            ipc_start.elapsed().as_nanos() as u64
        },
        Err(_) => 0
    };

    // ... logging and error handling
}

Managing Shared Memory

Viewing Active Topics

# List all HORUS shared memory topics
ls -lh /dev/shm/horus/topics/

# Example output:
# -rw-r--r-- 1 user user 4.0K Oct 5 12:34 horus_cmd_vel
# -rw-r--r-- 1 user user 8.0K Oct 5 12:34 horus_laser_scan

Checking Available Space

df -h /dev/shm

# Example output:
# Filesystem      Size  Used Avail Use% Mounted on
# tmpfs           7.8G  128M  7.7G   2% /dev/shm

Cleaning Up Stale Topics

# Remove all HORUS shared memory
rm -rf /dev/shm/horus/

# Remove specific topic
rm /dev/shm/horus/topics/horus_cmd_vel

When to clean up?

  • After testing
  • When topics are no longer needed
  • If shared memory is full
  • After crashes (stale topics)

Automatic cleanup: HORUS nodes clean up gracefully on Ctrl+C, but crashes may leave stale topics.

Monitoring Memory Usage

# Watch memory usage in real-time
watch -n 1 'du -sh /dev/shm/horus/'

# Show per-topic sizes
du -h /dev/shm/horus/topics/*

Platform Considerations

Linux

HORUS is optimized for Linux:

Native /dev/shm support: Tmpfs filesystem in RAM

Excellent performance: Direct kernel support

No configuration needed: Works out of the box

Typical limits: 50% of RAM or configurable

Increasing /dev/shm Size

If you need more shared memory:

# Check current size
df -h /dev/shm

# Increase to 4GB (requires sudo)
sudo mount -o remount,size=4G /dev/shm

# Make permanent (add to /etc/fstab):
# tmpfs /dev/shm tmpfs defaults,size=4G 0 0

macOS

HORUS works on macOS with limitations:

Different implementation: macOS uses different shared memory primitives

Size limits: Smaller default limits than Linux

Performance: May vary compared to Linux

Recommendation: Use Linux for production

Windows (WSL)

Use Windows Subsystem for Linux:

# Install WSL2
wsl --install

# Inside WSL, HORUS works like native Linux

Best Practices

Choose Appropriate Capacity

// Small messages, high frequency
ShmTopic::<CmdVel>::new("cmd_vel", 100)?;  // 100 slots

// Large messages, lower frequency
ShmTopic::<PointCloud>::new("points", 10)?;  // 10 slots

// Balance between latency and memory usage

Monitor Buffer Utilization

let metrics = hub.metrics();
if metrics.total_messages_sent > capacity * 100 {
    println!("Consider increasing buffer capacity");
}

Handle Buffer Full Errors

match hub.send(data, ctx) {
    Ok(()) => {},
    Err(original_data) => {
        // Buffer full - decide what to do
        // Option 1: Log and drop
        if let Some(ctx) = ctx {
            ctx.log_warning("Buffer full, dropping message");
        }

        // Option 2: Retry with backoff
        // Option 3: Use larger buffer
    }
}

Clean Up Regularly

// In development, clean shared memory between runs
#[cfg(debug_assertions)]
fn cleanup_shm() {
    let _ = std::fs::remove_dir_all("/dev/shm/horus/");
}

Troubleshooting

"No space left on device"

Cause: /dev/shm is full

Solution:

# Check usage
df -h /dev/shm

# Clean up
rm -rf /dev/shm/horus/

# Increase size
sudo mount -o remount,size=2G /dev/shm

"Permission denied"

Cause: Insufficient permissions

Solution:

# Check permissions
ls -la /dev/shm/horus/

# Fix permissions (if needed)
chmod 755 /dev/shm/horus/

Stale Shared Memory

Cause: Node crashed without cleanup

Solution:

# List stale topics
ls -l /dev/shm/horus/topics/

# Remove all
rm -rf /dev/shm/horus/

"Element size mismatch"

Cause: Publisher and subscriber using different types

Solution: Ensure both use the same type:

// Publisher
let pub_hub: Hub<f32> = Hub::new("data");

// Subscriber
let sub_hub: Hub<f32> = Hub::new("data");  // Same type!

Next Steps