Shared Memory IPC
HORUS achieves 296ns-2.8µs latency through a zero-copy shared memory IPC architecture. This page explains how HORUS uses /dev/shm for ultra-low latency inter-process communication.
What is Shared Memory IPC?
Shared memory is a region of RAM that multiple processes can access simultaneously. Unlike network-based communication (TCP/UDP) or message queues, shared memory:
Eliminates serialization: No conversion to/from bytes
Eliminates copying: Data written once, read directly
Enables zero-copy semantics: Loan pattern for minimal allocations
Provides deterministic latency: No network stack overhead
Scales linearly: Latency proportional to message size
Architecture Overview
Storage Location
HORUS stores shared memory segments in:
/dev/shm/horus/topics/
Why /dev/shm?
RAM-backed: Stored in RAM, not disk
Fast access: Direct memory operations
Kernel-managed: Operating system handles memory mapping
Standard location: Available on all modern Linux systems
Automatic cleanup: Can be cleaned manually if needed
File Naming Convention
Shared memory topics follow this naming:
/dev/shm/horus/topics/horus_{topic_name}
Examples:
/dev/shm/horus/topics/horus_cmd_vel
/dev/shm/horus/topics/horus_laser_scan
/dev/shm/horus/topics/horus_sensor_lidar # '/' becomes '_'
Topic name sanitization:
/characters become_:characters become_- Creates safe filesystem names
ShmRegion: Memory-Mapped Files
What is ShmRegion?
ShmRegion is the low-level abstraction for creating and managing memory-mapped files in /dev/shm.
Structure
pub struct ShmRegion {
mmap: MmapMut, // Memory-mapped region
size: usize, // Size in bytes
path: PathBuf, // Path to /dev/shm file
_file: File, // Underlying file handle
name: String, // Topic name
owner: bool, // Created this region?
}
Creating a Region
// Create or open shared memory region
let region = ShmRegion::new("my_topic", 4096)?;
Behavior:
- Creates
/dev/shm/horus/topics/horus_my_topic - Allocates 4096 bytes
- Zero-initializes memory if new
- Reuses existing memory if already created
Opening Existing Region
// Open existing shared memory (no creation)
let region = ShmRegion::open("my_topic")?;
Behavior:
- Returns error if topic doesn't exist
- Maps existing memory
- Detects size automatically
Ownership
The first process to create a region is the owner:
if region.is_owner() {
println!("I created this shared memory");
} else {
println!("I'm using existing shared memory");
}
Owner responsibilities:
- Initializes memory to zero
- Optionally cleans up on exit (currently disabled for debugging)
ShmTopic: Lock-Free Ring Buffer
What is ShmTopic?
ShmTopic<T> implements a lock-free ring buffer in shared memory with:
Multi-producer support: Multiple publishers can write concurrently
Multi-consumer support: Multiple subscribers each track their own position
Cache-line alignment: 64-byte alignment prevents false sharing
Zero-copy loan pattern: Direct memory access without allocations
Atomic operations: Thread-safe without locks
Structure
#[repr(align(64))] // Cache-line aligned
pub struct ShmTopic<T> {
_region: Arc<ShmRegion>,
header: NonNull<RingBufferHeader>,
data_ptr: NonNull<u8>,
capacity: usize,
consumer_tail: AtomicUsize, // Per-consumer position
_phantom: PhantomData<T>,
_padding: [u8; 24], // Prevent false sharing
}
Ring Buffer Header
#[repr(C, align(64))] // Cache-line aligned
struct RingBufferHeader {
capacity: AtomicUsize,
head: AtomicUsize, // Producer write position
tail: AtomicUsize, // Legacy (unused)
element_size: AtomicUsize,
consumer_count: AtomicUsize, // Number of subscribers
sequence_number: AtomicUsize, // Global message counter
_padding: [u8; 24], // Cache-line padding
}
Why cache-line alignment?
Modern CPUs fetch memory in 64-byte cache lines. Aligning to 64 bytes:
- Prevents false sharing between fields
- Optimizes atomic operations
- Reduces cache invalidations
- Improves multi-core performance
Memory Layout
─────────────────────────────────────
RingBufferHeader (64 bytes) ← Aligned to 64 bytes
─────────────────────────────────────
Padding (for data alignment)
─────────────────────────────────────
Data Slot 0
─────────────────────────────────────
Data Slot 1
─────────────────────────────────────
...
─────────────────────────────────────
Data Slot N-1
─────────────────────────────────────
Zero-Copy Loan Pattern
Traditional Approach (with copying)
// Message is copied multiple times
let msg = MyData { x: 1.0, y: 2.0 };
let bytes = serialize(msg); // Copy 1
send_over_network(bytes); // Copy 2
let received = deserialize(bytes); // Copy 3
HORUS Loan Pattern (zero-copy)
// Message written once to shared memory
let mut sample = topic.loan()?; // Claim a slot
sample.write(MyData { x: 1.0, y: 2.0 }); // Write directly to shared memory
drop(sample); // Automatically publishes when dropped
Zero copies: Data written once, read directly from shared memory
PublisherSample
pub struct PublisherSample<'a, T> {
data_ptr: *mut T,
slot_index: usize,
topic: &'a ShmTopic<T>,
_phantom: PhantomData<&'a mut T>,
}
Methods:
// Write data to shared memory
sample.write(value);
// Get mutable pointer
let ptr = sample.as_mut_ptr();
// Get mutable reference (unsafe)
unsafe {
let data_ref = sample.as_mut();
data_ref.x = 1.0;
}
Automatic publishing: When PublisherSample is dropped, it increments the global sequence number, making the message visible to subscribers.
ConsumerSample
pub struct ConsumerSample<'a, T> {
data_ptr: *const T,
slot_index: usize,
topic: &'a ShmTopic<T>,
_phantom: PhantomData<&'a T>,
}
Methods:
// Get reference to data
let data_ref = sample.get_ref();
// Get const pointer
let ptr = sample.as_ptr();
// Copy data (for Copy types)
let data_copy = sample.read();
Non-destructive reads: Multiple subscribers can read the same message.
Lock-Free Operations
Publishing (push/loan)
pub fn loan(&self) -> Result<PublisherSample<T>, &'static str> {
let header = unsafe { self.header.as_ref() };
loop {
let head = header.head.load(Ordering::Relaxed);
let next = (head + 1) % self.capacity;
// Check buffer capacity
let current_sequence = header.sequence_number.load(Ordering::Relaxed);
let max_unread = (self.capacity * 3) / 4; // 75% fill limit
if current_sequence >= max_unread {
return Err("Buffer full");
}
// Atomic compare-and-swap to claim slot
match header.head.compare_exchange_weak(
head,
next,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
// Successfully claimed slot
return Ok(PublisherSample { ... });
}
Err(_) => {
// Retry if another thread claimed it first
continue;
}
}
}
}
Key Points:
- Lock-free: Uses atomic compare-and-swap
- Wait-free progress: Retry until success
- Multi-producer safe: Multiple publishers don't conflict
- 75% fill limit: Prevents overwriting unread messages
Subscribing (pop/receive)
pub fn receive(&self) -> Option<ConsumerSample<T>> {
let header = unsafe { self.header.as_ref() };
let my_tail = self.consumer_tail.load(Ordering::Relaxed);
let current_head = header.head.load(Ordering::Acquire);
if my_tail == current_head {
return None; // No new messages
}
// Move this consumer's position forward
let next_tail = (my_tail + 1) % self.capacity;
self.consumer_tail.store(next_tail, Ordering::Relaxed);
// Return sample pointing to message in shared memory
Some(ConsumerSample { ... })
}
Key Points:
- Per-consumer tracking: Each subscriber has independent position
- Non-destructive: Message stays for other subscribers
- Lock-free: Atomic operations only
- Multi-consumer safe: Subscribers don't interfere
Multi-Consumer Architecture
How Multiple Subscribers Work
Each subscriber maintains its own consumer_tail position:
Publisher writes: HEAD [0] [1] [2] [3] [4]
Subscriber A: TAIL_A [0] (just joined)
Subscriber B: TAIL_B [2] (caught up partially)
Subscriber C: TAIL_C [4] (fully caught up)
Each subscriber:
- Tracks its own position independently
- Can join at any time (starts from current HEAD)
- Reads at its own pace
- Doesn't affect other subscribers
Buffer Fill Management
To prevent overwriting unread messages:
let max_unread = (self.capacity * 3) / 4; // 75% fill limit
Why 75% limit?
- Allows slower subscribers to catch up
- Prevents buffer wraparound issues
- Trades capacity for safety
- Ensures deterministic behavior
What happens when full?
push()returnsErr(msg)with original messageloan()returnsErr("Buffer full")- Publishers can retry or drop message
- Subscribers continue reading
Safety Features
Comprehensive Bounds Checking
Every memory access is validated:
// Validate index is in bounds
if head >= self.capacity {
panic!("Critical safety violation: head index >= capacity");
}
// Validate byte offset is in bounds
let byte_offset = head * mem::size_of::<T>();
let data_region_size = self.capacity * mem::size_of::<T>();
if byte_offset + mem::size_of::<T>() > data_region_size {
panic!("Critical safety violation: write would exceed bounds");
}
Capacity Limits
Safety constants prevent dangerous configurations:
const MAX_CAPACITY: usize = 1_000_000; // Max elements
const MIN_CAPACITY: usize = 1; // Min elements
const MAX_ELEMENT_SIZE: usize = 1_000_000; // Max size per element
const MAX_TOTAL_SIZE: usize = 100_000_000; // Max total (100MB)
Validation:
if capacity > MAX_CAPACITY {
return Err("Capacity too large");
}
if element_size > MAX_ELEMENT_SIZE {
return Err("Element size too large");
}
Type Safety
Element size is validated when opening existing topics:
let stored_element_size = header.element_size.load(Ordering::Relaxed);
let expected_element_size = mem::size_of::<T>();
if stored_element_size != expected_element_size {
return Err("Element size mismatch");
}
Prevents:
- Opening topic with wrong type
- Mismatched publisher/subscriber types
- Memory corruption from type confusion
Performance Optimizations
Cache-Line Alignment
#[repr(align(64))]
Benefits:
- Prevents false sharing between cores
- Optimizes atomic operations
- Reduces cache invalidations
- Each field gets its own cache line
Atomic Operations
Using appropriate memory ordering:
// Relaxed for non-critical reads
let head = header.head.load(Ordering::Relaxed);
// Acquire for critical synchronization
let current_head = header.head.load(Ordering::Acquire);
// Release when publishing
header.head.compare_exchange_weak(..., Ordering::Release, ...);
Why different orderings?
- Relaxed: Fastest, no synchronization guarantees
- Acquire: Synchronizes with Release operations
- Release: Makes all previous writes visible
- Balanced for performance and correctness
Zero-Copy Semantics
No allocations in the hot path:
// No allocations - just pointer arithmetic
let sample = topic.loan()?; // Returns stack-allocated sample
sample.write(data); // Writes directly to shared memory
// Drop publishes (no allocation)
Hub Integration
Hub<T> uses ShmTopic internally:
pub struct Hub<T> {
shm_topic: Arc<ShmTopic<T>>, // Shared memory topic
// ... other fields
}
pub fn send(&self, msg: T, ctx: Option<&mut NodeInfo>) -> Result<(), T> {
let msg_clone = msg.clone();
let ipc_ns = match self.shm_topic.loan() {
Ok(mut sample) => {
let ipc_start = Instant::now();
sample.write(msg_clone); // Zero-copy write
drop(sample); // Automatic publish
ipc_start.elapsed().as_nanos() as u64
},
Err(_) => 0
};
// ... logging and error handling
}
Managing Shared Memory
Viewing Active Topics
# List all HORUS shared memory topics
ls -lh /dev/shm/horus/topics/
# Example output:
# -rw-r--r-- 1 user user 4.0K Oct 5 12:34 horus_cmd_vel
# -rw-r--r-- 1 user user 8.0K Oct 5 12:34 horus_laser_scan
Checking Available Space
df -h /dev/shm
# Example output:
# Filesystem Size Used Avail Use% Mounted on
# tmpfs 7.8G 128M 7.7G 2% /dev/shm
Cleaning Up Stale Topics
# Remove all HORUS shared memory
rm -rf /dev/shm/horus/
# Remove specific topic
rm /dev/shm/horus/topics/horus_cmd_vel
When to clean up?
- After testing
- When topics are no longer needed
- If shared memory is full
- After crashes (stale topics)
Automatic cleanup: HORUS nodes clean up gracefully on Ctrl+C, but crashes may leave stale topics.
Monitoring Memory Usage
# Watch memory usage in real-time
watch -n 1 'du -sh /dev/shm/horus/'
# Show per-topic sizes
du -h /dev/shm/horus/topics/*
Platform Considerations
Linux
HORUS is optimized for Linux:
Native /dev/shm support: Tmpfs filesystem in RAM
Excellent performance: Direct kernel support
No configuration needed: Works out of the box
Typical limits: 50% of RAM or configurable
Increasing /dev/shm Size
If you need more shared memory:
# Check current size
df -h /dev/shm
# Increase to 4GB (requires sudo)
sudo mount -o remount,size=4G /dev/shm
# Make permanent (add to /etc/fstab):
# tmpfs /dev/shm tmpfs defaults,size=4G 0 0
macOS
HORUS works on macOS with limitations:
Different implementation: macOS uses different shared memory primitives
Size limits: Smaller default limits than Linux
Performance: May vary compared to Linux
Recommendation: Use Linux for production
Windows (WSL)
Use Windows Subsystem for Linux:
# Install WSL2
wsl --install
# Inside WSL, HORUS works like native Linux
Best Practices
Choose Appropriate Capacity
// Small messages, high frequency
ShmTopic::<CmdVel>::new("cmd_vel", 100)?; // 100 slots
// Large messages, lower frequency
ShmTopic::<PointCloud>::new("points", 10)?; // 10 slots
// Balance between latency and memory usage
Monitor Buffer Utilization
let metrics = hub.metrics();
if metrics.total_messages_sent > capacity * 100 {
println!("Consider increasing buffer capacity");
}
Handle Buffer Full Errors
match hub.send(data, ctx) {
Ok(()) => {},
Err(original_data) => {
// Buffer full - decide what to do
// Option 1: Log and drop
if let Some(ctx) = ctx {
ctx.log_warning("Buffer full, dropping message");
}
// Option 2: Retry with backoff
// Option 3: Use larger buffer
}
}
Clean Up Regularly
// In development, clean shared memory between runs
#[cfg(debug_assertions)]
fn cleanup_shm() {
let _ = std::fs::remove_dir_all("/dev/shm/horus/");
}
Troubleshooting
"No space left on device"
Cause: /dev/shm is full
Solution:
# Check usage
df -h /dev/shm
# Clean up
rm -rf /dev/shm/horus/
# Increase size
sudo mount -o remount,size=2G /dev/shm
"Permission denied"
Cause: Insufficient permissions
Solution:
# Check permissions
ls -la /dev/shm/horus/
# Fix permissions (if needed)
chmod 755 /dev/shm/horus/
Stale Shared Memory
Cause: Node crashed without cleanup
Solution:
# List stale topics
ls -l /dev/shm/horus/topics/
# Remove all
rm -rf /dev/shm/horus/
"Element size mismatch"
Cause: Publisher and subscriber using different types
Solution: Ensure both use the same type:
// Publisher
let pub_hub: Hub<f32> = Hub::new("data");
// Subscriber
let sub_hub: Hub<f32> = Hub::new("data"); // Same type!
Next Steps
- Learn about Message Types for standard robotics data
- Read the Performance Guide for optimization tips
- Explore Examples showing shared memory usage
- Check the API Reference for detailed documentation