Topics: How Nodes Talk

Imagine a factory floor. A quality camera spots a defective part on the conveyor belt. It needs to tell the robot arm to reject the part, the logging system to record the defect, and the operator dashboard to flash a warning. In a traditional program, the camera code would call the robot arm code directly — but then adding the logger means editing the camera. Adding the dashboard means editing it again. And if the robot arm code crashes, it takes the camera down with it.

This is the coupling problem: when components talk directly, every new connection requires changing existing code, and one failure cascades through the entire system.

HORUS solves this with topics: named channels that carry typed data. The camera publishes to "quality.defect". The robot arm, the logger, and the dashboard all subscribe to "quality.defect". None of them know the others exist. Adding a new subscriber requires zero changes to existing code. If the dashboard crashes, the camera and robot arm keep running.

This pattern is called publish/subscribe (or pub/sub). Think of a radio station: the broadcaster (publisher) sends a signal, and any number of receivers (subscribers) tune in. The broadcaster doesn't know who's listening. Listeners don't know about each other. Anyone can start or stop listening at any time.

For capacity tuning, performance optimization, communication patterns, and multi-process details, see Topics — Full Reference.

How It Works

The Big Picture

A topic is a named channel — one publisher, any number of subscribers

A topic is a named, typed communication channel. Under the hood it's backed by shared memory with a ring buffer — but you never interact with memory directly. You just call send() and recv().

Here's what those terms mean if you've never encountered them:

  • Named: Every topic has a string name like "sensor.temperature". Two nodes that create a topic with the same name are automatically connected — no configuration, no broker, no server.
  • Typed: A topic carries one type of data. A Topic<f32> carries floating-point numbers. A Topic<CmdVel> carries velocity commands. The compiler enforces this — you can't accidentally subscribe with the wrong type.
  • Shared memory: Normally, sending data between two programs requires the operating system's kernel to copy bytes from one program's memory to the other. For a 6 MB camera image, that copy takes milliseconds. Shared memory is a region of RAM that both programs can read and write directly. The publisher writes the image once; the subscriber reads from the same address. No copies, no kernel involvement — just nanoseconds.
  • Ring buffer: The shared memory is organized as a fixed-size circular buffer (imagine slots arranged in a circle). When all slots are full and a new message arrives, the oldest unread message is overwritten. This guarantees the system never runs out of memory and the publisher never blocks — but slow subscribers may miss messages.

Your First Topic

// simplified
use horus::prelude::*;

// --- Publisher node ---
struct Thermometer {
    publisher: Topic<f32>,  // This node SENDS f32 values
}

impl Thermometer {
    fn new() -> Result<Self> {
        // "sensor.temperature" is the topic name — any node using
        // this exact name and type connects automatically
        Ok(Self { publisher: Topic::new("sensor.temperature")? })
    }
}

impl Node for Thermometer {
    fn name(&self) -> &str { "Thermometer" }

    fn tick(&mut self) {
        let temp = 22.5;  // In a real node, read from hardware
        self.publisher.send(temp);  // Publish — always succeeds, never blocks
    }
}

// --- Subscriber node ---
struct Display {
    subscriber: Topic<f32>,  // This node RECEIVES f32 values
}

impl Display {
    fn new() -> Result<Self> {
        // Same name as the publisher — HORUS connects them automatically
        Ok(Self { subscriber: Topic::new("sensor.temperature")? })
    }
}

impl Node for Display {
    fn name(&self) -> &str { "Display" }

    fn tick(&mut self) {
        // recv() returns Option<f32>:
        //   Some(value) → a message was waiting, process it
        //   None        → no message yet, nothing to do
        if let Some(temp) = self.subscriber.recv() {
            println!("Temperature: {:.1}°C", temp);
        }
    }
}

Both nodes create a topic with the name "sensor.temperature". That's all it takes — HORUS detects they should be connected and wires them together. No configuration files, no broker process, no network setup.

Topic Naming

Topic names are simple strings. Use dots for hierarchy:

  • "sensor.temperature" — sensor readings
  • "camera.rgb" — camera frames
  • "motor.cmd_vel" — velocity commands
  • "robot.sensors.imu.raw" — deeply nested namespaces

Always use dots (.) — never slashes (/).

Slashes work on Linux but fail on macOS because shm_open (the system call that creates shared memory) interprets slashes as directory separators. Using dots guarantees your code works on every platform.

FrameworkSeparatorExample
ROS / ROS2//sensor/lidar
HORUS.sensor.lidar

Sending Data

HORUS provides three ways to send data, each for a different situation:

send() — Fire and Forget

// simplified
self.publisher.send(CmdVel::new(1.0, 0.0));

send() always succeeds. It never blocks. It never returns an error. If the ring buffer is full (the subscriber hasn't read fast enough), the oldest unread message is silently overwritten.

Why this is the default for robotics: A motor controller runs at 1000 Hz — it expects a new velocity command every millisecond. If it falls 3 messages behind, you don't want it processing stale commands from 3 ms ago. You want it to skip ahead to the latest command. Overwriting old data is the correct behavior.

Use send() for almost everything: sensor data, velocity commands, state updates, telemetry. It's the fastest option (~3 ns same-thread) and the simplest to reason about.

try_send() — Send With Feedback

// simplified
match self.publisher.try_send(expensive_result) {
    Ok(()) => { /* message delivered to buffer */ }
    Err(returned_msg) => {
        // Buffer full — the message is returned to you (not consumed)
        hlog!(warn, "Buffer full, discarding result");
    }
}

try_send() attempts to publish. If the buffer is full, instead of overwriting the oldest message, it gives the message back to you. This lets you count drops, log warnings, or retry later.

When to use: When the message is expensive to create (e.g., a computed path plan) and you want to know if it was accepted.

send_blocking() — Wait for Space

// simplified
use horus::prelude::*;

match self.publisher.send_blocking(critical_command, 10_u64.ms()) {
    Ok(()) => { /* delivered */ }
    Err(SendBlockingError::Timeout) => {
        hlog!(error, "Could not deliver command within 10ms!");
    }
}

send_blocking() waits until buffer space opens up, or until your timeout expires. It uses a graduated wait strategy to minimize latency: first it spins (sub-microsecond), then yields the thread (microseconds), then sleeps in 100 µs increments.

Never use send_blocking() in real-time control loops. Blocking in tick() causes deadline misses, which can trigger the safety monitor. Use send() instead — in real-time code, dropping a stale message is always better than blocking.

When to use: Logging pipelines, recording, non-real-time data transfer — anywhere brief blocking is acceptable and message loss is not.

Choosing a Send Method

MethodBlocks?On buffer fullBest for
send()NeverOverwrites oldestControl loops, sensor data, telemetry
try_send()NeverReturns message to callerExpensive messages, drop detection
send_blocking()Up to timeoutWaits for spaceLogging, recording, non-RT pipelines

Receiving Data

recv() — Take the Next Message

// simplified
if let Some(msg) = self.subscriber.recv() {
    self.process(msg);
}

recv() returns the oldest unread message and removes it from the buffer. If nothing is available, it returns None — immediately, without blocking or waiting.

Key detail: Each call to recv() consumes one message. If a publisher sends 5 messages between your tick() calls, you need 5 recv() calls to get all of them. In practice, drain the buffer every tick:

// simplified
fn tick(&mut self) {
    // IMPORTANT: drain all pending messages every tick
    // If you only call recv() once, messages pile up and you process stale data
    while let Some(msg) = self.commands.recv() {
        self.latest_command = Some(msg);
    }

    // Now act on only the latest
    if let Some(cmd) = &self.latest_command {
        self.execute(cmd);
    }
}

read_latest() — Peek at the Newest

// simplified
if let Some(pose) = self.position_sub.read_latest() {
    self.current_position = pose;
}

read_latest() skips all older messages and gives you only the newest one. Unlike recv(), it does not remove the message — calling it again returns the same value until a new message arrives.

read_latest() requires your message type to implement Copy (simple types like f32, Pose2D, Odometry — types without String, Vec, or heap data). This is a safety requirement: since the message stays in the buffer, another thread could be reading it simultaneously, so the data must be safe to bitwise-copy.

When to use: State-like data where you only care about the current value — robot position, transform frames, configuration parameters.

Choosing a Receive Method

MethodConsumes?OrderBest for
recv()Yes — removes from bufferFIFO (oldest first)Commands, events, data streams
read_latest()No — stays in bufferLatest onlyState, poses, configuration

What Happens at the Edges

No Subscribers?

send() still succeeds. The data goes into the ring buffer and waits. If a subscriber connects later, it reads whatever is still in the buffer. If the buffer fills up before anyone subscribes, older messages are overwritten — but nothing crashes or blocks.

No Data Yet?

recv() returns None. It never blocks and never panics. Your node continues to the next tick:

// simplified
fn tick(&mut self) {
    match self.subscriber.recv() {
        Some(data) => self.process(data),
        None => {} // Nothing published yet — normal at startup
    }
}

This means your nodes are startup-order independent. It doesn't matter whether the publisher or subscriber starts first.

Subscriber Too Slow?

If a publisher sends faster than a subscriber reads, the ring buffer fills up. The next send() overwrites the oldest unread message. You can detect this:

// simplified
fn tick(&mut self) {
    if self.subscriber.dropped_count() > 0 {
        hlog!(warn, "{} messages dropped on '{}' — subscriber too slow",
            self.subscriber.dropped_count(), self.subscriber.name());
    }
}

Solutions:

  • Increase buffer capacity: Topic::with_capacity("name", 16, None) instead of the default 4 slots
  • Read faster: Use read_latest() instead of recv() if you only need the newest value
  • Accept the drops: For telemetry or logging, some dropped messages are often fine

Monitoring Topics at Runtime

Use horus monitor to see all active topics, their publishers, subscribers, and message rates in real time:

horus monitor --tui   # Interactive terminal dashboard

How Fast Is This?

Numbers without context are meaningless. Here's what topic latency means for a real robot:

A motor controller running at 1000 Hz has a 1 millisecond (1,000,000 nanoseconds) budget per cycle. The time spent delivering a velocity command via a topic:

ScenarioLatency% of 1 ms budget
Same thread~3 ns0.0003%
Same process, 1 publisher + 1 subscriber~18 ns0.002%
Same process, many-to-many~36 ns0.004%
Cross-process (shared memory)~50–167 ns0.005–0.017%

For comparison, a traditional message-passing system using sockets or pipes adds 50,000–100,000 ns per message — eating 5–10% of the motor controller's budget. Topics use shared memory to eliminate serialization and kernel transitions, keeping overhead negligible even at kilohertz control rates.

What's a nanosecond? Light travels about 30 centimeters (1 foot) in one nanosecond. HORUS can deliver a message between two nodes in the time it takes light to travel one meter. This matters because robots have strict timing budgets — every nanosecond spent on communication is a nanosecond not spent on control computation. A motor controller that misses its deadline can cause a robot arm to overshoot its target.

A Complete Example

A temperature monitor with publisher, subscriber, and scheduler — copy-paste and run:

// simplified
use horus::prelude::*;

struct Sensor {
    pub_temp: Topic<f32>,
    value: f32,
}

impl Sensor {
    fn new() -> Result<Self> {
        Ok(Self { pub_temp: Topic::new("sensor.temperature")?, value: 20.0 })
    }
}

impl Node for Sensor {
    fn name(&self) -> &str { "Sensor" }
    fn tick(&mut self) {
        self.value += 0.1;
        self.pub_temp.send(self.value);
    }
}

struct Monitor {
    sub_temp: Topic<f32>,
}

impl Monitor {
    fn new() -> Result<Self> {
        Ok(Self { sub_temp: Topic::new("sensor.temperature")? })
    }
}

impl Node for Monitor {
    fn name(&self) -> &str { "Monitor" }
    fn tick(&mut self) {
        if let Some(temp) = self.sub_temp.recv() {
            if temp > 25.0 {
                println!("WARNING: {:.1}°C exceeds threshold!", temp);
            } else {
                println!("Temperature: {:.1}°C", temp);
            }
        }
    }
}

fn main() -> Result<()> {
    let mut sched = Scheduler::new()
        .tick_rate(1_u64.hz());  // 1 Hz — slow enough to watch

    sched.add(Sensor::new()?)
        .order(0)  // Sensor runs first (publishes data)
        .build()?;

    sched.add(Monitor::new()?)
        .order(1)  // Monitor runs second (reads the data)
        .build()?;

    sched.run()  // Runs until Ctrl+C
}

Design Decisions

Why named channels instead of direct function calls? If the camera node calls robot_arm.reject_part() directly, the camera must know about the robot arm's API. Adding a logger means editing the camera code. Adding a dashboard means editing it again. With topics, the camera publishes to "quality.defect" and knows nothing about who's listening. Adding the logger, dashboard, or a dozen more subscribers requires zero changes to the camera. This is the same pattern used by ROS, MQTT, and Kafka — proven at scale across millions of deployments.

Why shared memory instead of sockets or pipes? Sockets and pipes require the operating system's kernel to copy data between processes. Each message needs a write() system call on one side and a read() on the other, with the kernel copying bytes in between. This adds 50,000–100,000 nanoseconds per message. Shared memory puts data in a region of RAM that both processes can access directly — the publisher writes once and the subscriber reads from the same address. No copies, no kernel involvement. For a 6 MB camera image, this is the difference between waiting milliseconds (sockets) and waiting nanoseconds (shared memory).

Why a ring buffer that overwrites old data? A robot's control loop cares about what's happening now, not what happened 10 milliseconds ago. If the motor controller falls behind, you want it to skip ahead to the latest velocity command, not process 10 stale commands in sequence. A ring buffer with overwrite semantics guarantees this: the latest data is always available, memory usage is bounded (no unbounded queues that eat all your RAM), and the publisher never blocks waiting for a slow subscriber. If you need guaranteed delivery (no overwrites), use try_send() to detect drops or send_blocking() to wait.

Why automatic backend selection? During development, all your nodes typically run in one process — the fastest communication path. In production, you might split the camera and controller into separate processes for fault isolation. If you had to manually choose the communication backend (in-process ring buffer vs cross-process shared memory), you'd need to reconfigure every time you changed the deployment topology. HORUS detects whether publisher and subscriber are in the same thread, same process, or different processes, and automatically selects the fastest available path. Your code stays identical across all deployment configurations.

Trade-offs

GainCost
Zero coupling — publishers don't know subscribers existIndirect communication — harder to trace data flow (use horus monitor to visualize)
Zero-copy for large data — images and point clouds transfer in nanosecondsFixed-size ring buffers — must choose capacity at creation (default: 4 slots)
Automatic backend selection — fastest path chosen for youCannot force a specific backend (the auto-selection is always optimal)
Multi-subscriber fan-out — one publisher serves any number of subscribersLate subscribers miss messages published before they connected
Never blockssend() always returns immediatelySlow subscribers lose old messages (detectable via dropped_count())

See Also