Publisher and Subscriber API
HORUS uses shared memory (SHM) for inter-process communication. Publisher<T> writes messages, Subscriber<T> reads them -- both zero-copy. The loan pattern lets you write directly into shared memory without any intermediate buffers or serialization.
Rust: See
Topic<T>for the Rust unified pub/sub API. Python: Seenode.send()/node.recv()for the Python wrapper.
// simplified
#include <horus/topic.hpp>
#include <horus/messages.hpp>
Quick Reference -- Publisher<T>
| Method | Returns | Description |
|---|---|---|
.loan() | LoanedSample<T> | Get a writable buffer from SHM (zero-copy) |
.publish(LoanedSample<T>&&) | void | Make a loaned sample visible to subscribers |
.send(const T&) | void | Send a message by copy (simpler API) |
.name() | const string& | Get the topic name |
Quick Reference -- Subscriber<T>
| Method | Returns | Description |
|---|---|---|
.recv() | optional<BorrowedSample<T>> | Receive the next message (nullopt if none) |
.has_msg() | bool | Check if a message is available without consuming |
.name() | const string& | Get the topic name |
Quick Reference -- LoanedSample<T>
| Method | Returns | Description |
|---|---|---|
operator->() | T* / const T* | Direct SHM pointer access (zero-copy write) |
operator*() | T& / const T& | Dereference to the data |
.get() | T* / const T* | Raw pointer to the data |
Quick Reference -- BorrowedSample<T>
| Method | Returns | Description |
|---|---|---|
operator->() | const T* | Read-only pointer access |
operator*() | const T& | Read-only dereference |
.get() | const T* | Raw read-only pointer |
Creating Topics
Topics are created on the Scheduler or inside a Node / LambdaNode. Topic names use dots as separators (not slashes), required for macOS shm_open compatibility.
On the Scheduler (for Lambda Nodes)
auto cmd_pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto scan_sub = sched.subscribe<horus::msg::LaserScan>("lidar.scan");
Inside a Node Subclass (Constructor)
class Controller : public horus::Node {
public:
Controller() : Node("controller") {
scan_sub_ = subscribe<horus::msg::LaserScan>("lidar.scan");
cmd_pub_ = advertise<horus::msg::CmdVel>("motor.cmd");
}
// ...
private:
horus::Subscriber<horus::msg::LaserScan>* scan_sub_;
horus::Publisher<horus::msg::CmdVel>* cmd_pub_;
};
Inside a LambdaNode (Builder)
auto node = horus::LambdaNode("controller")
.sub<horus::msg::LaserScan>("lidar.scan")
.pub<horus::msg::CmdVel>("motor.cmd");
Zero-Copy Loan Pattern
The loan pattern is the primary way to publish messages in HORUS. It eliminates all copies between producer and consumer.
How It Works
1. pub.loan() --> SHM allocates a slot, returns LoanedSample<T>
2. sample->field = value --> writes directly into SHM (0 ns copy overhead)
3. pub.publish(std::move(sample))--> flips visibility flag, subscribers can read
Full Example
auto pub = sched.advertise<horus::msg::Odometry>("odom");
sched.add("localizer")
.rate(100_hz)
.tick([&] {
// Step 1: Loan a writable slot from shared memory
auto sample = pub.loan();
// Step 2: Write directly into SHM (zero copy)
sample->pose.x = current_x;
sample->pose.y = current_y;
sample->pose.theta = current_theta;
sample->timestamp_ns = now_ns();
// Step 3: Publish (consumes the sample via move)
pub.publish(std::move(sample));
})
.build();
After publish(), the LoanedSample is consumed (moved). Attempting to use it after publish is a compile error.
LoanedSample Access Patterns
auto sample = pub.loan();
// Arrow operator -- most common
sample->linear = 0.5f;
sample->angular = 0.1f;
// Dereference operator
horus::msg::CmdVel& ref = *sample;
ref.linear = 0.5f;
// Raw pointer
horus::msg::CmdVel* ptr = sample.get();
ptr->linear = 0.5f;
Send-by-Copy Pattern
For simplicity or small messages, send() copies the message into shared memory in one call:
auto pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");
// Single-call publish -- copies msg into SHM
pub.send(horus::msg::CmdVel{0, 0.5f, 0.1f});
This is equivalent to loan() + write + publish() but performs one copy. For small messages like CmdVel (16 bytes), the overhead is negligible. For large messages like LaserScan (1468 bytes) or Odometry, prefer the loan pattern.
| Pattern | Copy Count | Best For |
|---|---|---|
loan() + publish() | 0 | Large messages, high-frequency topics |
send() | 1 | Small messages, simple code |
Receiving Messages
Basic Receive
recv() returns std::optional<BorrowedSample<T>>. If no message is available, it returns std::nullopt:
auto sub = sched.subscribe<horus::msg::LaserScan>("lidar.scan");
sched.add("processor")
.rate(50_hz)
.tick([&] {
auto scan = sub.recv();
if (!scan) return; // no message this tick
// Access fields through arrow operator (read-only)
float front_range = scan->ranges[0];
float min_range = scan->min_range;
// Dereference
const horus::msg::LaserScan& data = *scan;
// Raw pointer
const horus::msg::LaserScan* ptr = scan->get();
})
.build();
Check Before Consuming
Use has_msg() to check availability without consuming the message:
sched.add("conditional_processor")
.rate(50_hz)
.tick([&] {
if (!scan_sub.has_msg()) {
// No scan available -- do something else
return;
}
auto scan = scan_sub.recv();
// guaranteed to have a value
})
.build();
BorrowedSample Lifetime
The BorrowedSample is valid until it goes out of scope. RAII releases the SHM slot automatically:
{
auto scan = sub.recv();
if (scan) {
process(*scan); // valid here
}
} // BorrowedSample released here -- SHM slot freed
Move Semantics
All topic types are move-only. Copy constructors and copy assignment operators are deleted. This enforces single ownership, matching Rust's ownership model:
auto pub = sched.advertise<horus::msg::CmdVel>("cmd");
// auto pub2 = pub; // COMPILE ERROR: copy deleted
auto pub2 = std::move(pub); // OK: ownership transferred
auto sample = pub2.loan();
// auto sample2 = sample; // COMPILE ERROR: copy deleted
pub2.publish(std::move(sample)); // OK: consumed by publish
// sample->linear = 1.0; // COMPILE ERROR: sample was moved
Supported Message Types (51)
All message types live in the horus::msg namespace. They are #[repr(C)] compatible with Rust for zero-copy SHM IPC. Each type gets a Publisher<T> and Subscriber<T> template specialization generated internally via HORUS_TOPIC_IMPL in impl/topic_impl.hpp.
Geometry (12): Point3, Vector3, Quaternion, Twist, Pose2D, TransformStamped, Pose3D, PoseStamped, PoseWithCovariance, TwistWithCovariance, Accel, AccelStamped
Sensor (11): LaserScan, Imu, Odometry, RangeSensor, BatteryState, NavSatFix, MagneticField, Temperature, FluidPressure, Illuminance, JointState
Control (7): CmdVel, MotorCommand, DifferentialDriveCommand, ServoCommand, PidConfig, TrajectoryPoint, JointCommand
Navigation (5): NavGoal, GoalResult, Waypoint, VelocityObstacle, PathPlan
Diagnostics (8): Heartbeat, DiagnosticStatus, EmergencyStop, ResourceUsage, NodeHeartbeat, SafetyStatus, DiagnosticValue, DiagnosticReport
Detection (12): BoundingBox2D, BoundingBox3D, Detection, Detection3D, Landmark, Landmark3D, LandmarkArray, TrackedObject, TrackingHeader, PlaneDetection, SegmentationMask, PointField
Vision (3): CameraInfo, RegionOfInterest, StereoInfo
Force (5): WrenchStamped, ImpedanceParameters, ForceCommand, ContactInfo, HapticFeedback
Time (4): Clock, TimeReference, SimSync, RateRequest
Input (3): KeyboardInput, JoystickInput, AudioFrame
Common Patterns
Multi-Topic Node
A single node can publish and subscribe to multiple topics:
auto imu_sub = sched.subscribe<horus::msg::Imu>("imu.data");
auto odom_sub = sched.subscribe<horus::msg::Odometry>("odom");
auto cmd_pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto diag_pub = sched.advertise<horus::msg::Heartbeat>("health.ctrl");
sched.add("fusion_controller")
.rate(100_hz)
.tick([&] {
auto imu = imu_sub.recv();
auto odom = odom_sub.recv();
if (imu && odom) {
auto cmd = cmd_pub.loan();
cmd->linear = compute_speed(*odom);
cmd->angular = compute_turn(*imu);
cmd_pub.publish(std::move(cmd));
}
// Always publish heartbeat
uint64_t seq = 0;
diag_pub.send(horus::msg::Heartbeat{
{}, seq++, true, 0.0f, 0.0f, 0
});
})
.build();
Pipeline Pattern
Chain nodes through topics to form a processing pipeline:
auto raw_sub = sched.subscribe<horus::msg::LaserScan>("lidar.raw");
auto filt_pub = sched.advertise<horus::msg::LaserScan>("lidar.filtered");
auto filt_sub = sched.subscribe<horus::msg::LaserScan>("lidar.filtered");
auto cmd_pub = sched.advertise<horus::msg::CmdVel>("motor.cmd");
// Stage 1: Filter raw scan
sched.add("filter")
.rate(50_hz).order(1)
.tick([&] {
auto raw = raw_sub.recv();
if (!raw) return;
auto filtered = filt_pub.loan();
*filtered = *raw; // copy, then modify in place
for (int i = 0; i < 360; ++i) {
if (filtered->ranges[i] < 0.05f)
filtered->ranges[i] = 0.0f;
}
filt_pub.publish(std::move(filtered));
})
.build();
// Stage 2: React to filtered scan
sched.add("controller")
.rate(50_hz).order(2)
.tick([&] {
auto scan = filt_sub.recv();
if (!scan) return;
cmd_pub.send(horus::msg::CmdVel{0, 0.3f, 0.0f});
})
.build();
Topic Name Conventions
Use dot-separated hierarchical names:
sensor.lidar.scan # LiDAR data
sensor.imu.data # IMU readings
motor.cmd # Motor commands
nav.goal # Navigation goal
safety.alert # Safety alerts
diagnostics.heartbeat # Heartbeat
See Also
- Scheduler API -- Topic creation via
advertiseandsubscribe - Node API -- Node styles and lifecycle
- Services and Actions API -- Request/response and long-running tasks
- C++ API Overview -- All classes at a glance
- Rust Topic API -- Equivalent Rust API