Multi-Rate Pipeline (C++)
Run an IMU at 200 Hz, a controller at 100 Hz, a camera pipeline at 30 Hz, and a logger at 1 Hz — all in one scheduler with deterministic ordering and per-node budgets. Each node is a horus::Node subclass with its own tick divider.
Problem
Your robot has sensors and actuators that operate at fundamentally different rates. The IMU publishes at 200 Hz, the depth camera at 30 Hz, the control loop needs 100 Hz, and diagnostics only need 1 Hz. Running separate schedulers wastes resources and makes ordering non-deterministic. You need a single scheduler where every node runs at its natural rate with guaranteed execution order.
How It Works
A HORUS scheduler ticks at one master rate — typically the rate of the fastest node in the pipeline. Slower nodes use tick dividers: an internal counter that increments every tick and only executes the node's real work when the counter hits the divisor. For example, if the scheduler runs at 200 Hz and a node needs 100 Hz, that node's divider is 2 — it does work every 2nd tick and returns immediately on the others.
The key formula is:
divisor = master_rate / node_rate
So at 200 Hz master: IMU divisor = 1 (every tick), controller divisor = 2 (every other tick), camera divisor = 7 (approximately every 7th tick for ~28.6 Hz), logger divisor = 200 (once per second).
Ordering is deterministic within each tick. Nodes with lower order() values run first. This guarantees that sensor producers run before consumers: IMU (order 0) publishes before the controller (order 10) reads, and the controller publishes before the logger (order 100) reads. Even nodes that skip their tick via the divider still occupy their slot — they just return immediately.
Budget enforcement applies per tick, not per active cycle. If a node is in its skip phase, it consumes near-zero time. If it is in its active phase and overruns the budget, the miss policy kicks in. This means you can give tight budgets to critical nodes and generous budgets to non-critical ones without them interfering.
When To Use
- Any robot with sensors at different rates (IMU at 200 Hz, camera at 30 Hz, LiDAR at 10 Hz)
- When you need deterministic execution order across all rates
- When running multiple schedulers would waste CPU or complicate inter-node communication
- Embedded systems where a single event loop is simpler than multi-threaded pipelines
Prerequisites
- HORUS installed (Installation Guide)
- Basic understanding of nodes and topics (C++ Quick Start)
- Understanding of scheduling and ordering
Complete Code
#include <horus/horus.hpp>
#include <algorithm>
#include <cmath>
#include <cstdio>
using namespace horus::literals;
// ============================================================
// IMU Driver — 200 Hz (every tick)
// Reads the IMU hardware and publishes raw measurements.
// ============================================================
class ImuDriver : public horus::Node {
public:
ImuDriver() : Node("imu_driver") {
imu_pub_ = advertise<horus::msg::Imu>("imu.data");
}
void tick() override {
// Real driver would read SPI/I2C here
horus::msg::Imu data{};
data.linear_acceleration[0] = 0.02f; // slight forward tilt (m/s^2)
data.linear_acceleration[1] = 0.0f;
data.linear_acceleration[2] = 9.81f; // gravity
data.angular_velocity[0] = 0.001f; // near-zero roll rate (rad/s)
data.angular_velocity[1] = 0.0f;
data.angular_velocity[2] = 0.05f; // gentle yaw rate during a turn
imu_pub_->send(data);
}
private:
horus::Publisher<horus::msg::Imu>* imu_pub_;
};
// ============================================================
// Camera Pipeline — 30 Hz (every ~7th tick at 200 Hz master)
// Processes depth frames and publishes detection results.
// ============================================================
class CameraPipeline : public horus::Node {
public:
explicit CameraPipeline(int divisor)
: Node("camera_pipeline"), divisor_(divisor) {
det_pub_ = advertise<horus::msg::CmdVel>("camera.detections");
}
void tick() override {
// Tick divider: skip ticks that aren't ours
if (++tick_counter_ % divisor_ != 0) return;
// Simulate depth frame processing — in production this would
// run an inference model or stereo matching pipeline
float detection_score = 0.87f; // confidence of nearest obstacle
float detection_range = 2.4f; // meters to detected object
horus::msg::CmdVel det{};
det.linear = detection_range; // pack range into linear field
det.angular = detection_score; // pack confidence into angular field
det_pub_->send(det);
if (++active_count_ % 30 == 0) {
horus::log::info("camera", "Frame processed — 30 Hz active cycle");
}
}
private:
horus::Publisher<horus::msg::CmdVel>* det_pub_;
int divisor_;
int tick_counter_ = 0;
int active_count_ = 0;
};
// ============================================================
// Controller — 100 Hz (every 2nd tick at 200 Hz master)
// Fuses IMU + camera detections into velocity commands.
// ============================================================
class Controller : public horus::Node {
public:
explicit Controller(int divisor)
: Node("controller"), divisor_(divisor) {
imu_sub_ = subscribe<horus::msg::Imu>("imu.data");
det_sub_ = subscribe<horus::msg::CmdVel>("camera.detections");
cmd_pub_ = advertise<horus::msg::CmdVel>("cmd_vel");
}
void tick() override {
// IMPORTANT: always drain subscribers every tick, even on skip ticks.
// If you only recv() on active ticks, stale messages pile up in the buffer.
auto imu = imu_sub_->recv();
auto det = det_sub_->recv();
// Tick divider: skip the computation but still drained the buffers above
if (++tick_counter_ % divisor_ != 0) return;
// Base forward speed — 0.3 m/s cruising
float linear = 0.3f;
float angular = 0.0f;
// If camera detected an obstacle, slow down proportionally
if (det) {
float range = det->get()->linear;
float confidence = det->get()->angular;
if (confidence > 0.7f && range < 3.0f) {
// Scale speed: full at 3m, zero at 0.5m
float factor = std::clamp((range - 0.5f) / 2.5f, 0.0f, 1.0f);
linear = 0.3f * factor;
}
}
// Use IMU yaw rate for heading correction
if (imu) {
float gz = imu->get()->angular_velocity[2];
// Compensate for drift: if turning unintentionally, correct
if (std::abs(gz) > 0.02f) {
angular = -0.5f * gz; // P-controller on yaw drift
}
}
horus::msg::CmdVel cmd{};
cmd.linear = linear;
cmd.angular = angular;
cmd_pub_->send(cmd);
}
void enter_safe_state() override {
// Zero all motion on safety event
horus::msg::CmdVel stop{};
cmd_pub_->send(stop);
horus::blackbox::record("controller",
"Entered safe state — motors zeroed");
}
private:
horus::Subscriber<horus::msg::Imu>* imu_sub_;
horus::Subscriber<horus::msg::CmdVel>* det_sub_;
horus::Publisher<horus::msg::CmdVel>* cmd_pub_;
int divisor_;
int tick_counter_ = 0;
};
// ============================================================
// Diagnostics Logger — 1 Hz (every 200th tick at 200 Hz master)
// Records system health and topic throughput.
// ============================================================
class DiagLogger : public horus::Node {
public:
explicit DiagLogger(int divisor)
: Node("diag_logger"), divisor_(divisor) {
cmd_sub_ = subscribe<horus::msg::CmdVel>("cmd_vel");
imu_sub_ = subscribe<horus::msg::Imu>("imu.data");
}
void tick() override {
// Drain every tick to count messages
if (auto cmd = cmd_sub_->recv()) {
last_linear_ = cmd->get()->linear;
last_angular_ = cmd->get()->angular;
cmd_count_++;
}
if (auto imu = imu_sub_->recv()) {
imu_count_++;
}
if (++tick_counter_ % divisor_ != 0) return;
// Log once per second
char buf[256];
std::snprintf(buf, sizeof(buf),
"cmd_vel=(%.2f m/s, %.2f rad/s) "
"imu_msgs/s=%d cmd_msgs/s=%d",
last_linear_, last_angular_,
imu_count_, cmd_count_);
horus::log::info("diag", buf);
// Reset counters for next 1-second window
imu_count_ = 0;
cmd_count_ = 0;
}
private:
horus::Subscriber<horus::msg::CmdVel>* cmd_sub_;
horus::Subscriber<horus::msg::Imu>* imu_sub_;
int divisor_;
int tick_counter_ = 0;
int imu_count_ = 0;
int cmd_count_ = 0;
float last_linear_ = 0.0f;
float last_angular_ = 0.0f;
};
int main() {
horus::Scheduler sched;
// Master rate = fastest node (IMU at 200 Hz)
sched.tick_rate(200_hz).name("multi_rate_pipeline");
// ── Compute divisors from master rate ───────────────────
// IMU: 200 / 200 = 1 (every tick)
// Camera: 200 / 30 ~ 7 (every 7th tick = 28.6 Hz)
// Controller: 200 / 100 = 2 (every 2nd tick)
// Logger: 200 / 1 = 200 (every 200th tick)
ImuDriver imu;
sched.add(imu)
.order(0) // runs first — sensor producer
.budget(500_us) // SPI/I2C read should be fast
.on_miss(horus::Miss::Warn) // IMU overrun is notable but not fatal
.build();
CameraPipeline camera(7);
sched.add(camera)
.order(1) // after IMU, before controller
.budget(15_ms) // vision processing takes longer
.on_miss(horus::Miss::Skip) // skip if overrun — stale frame is useless
.build();
Controller ctrl(2);
sched.add(ctrl)
.order(10) // after all sensors
.budget(3_ms) // control math is lightweight
.on_miss(horus::Miss::Skip) // skip if overrun — stale commands dangerous
.build();
DiagLogger logger(200);
sched.add(logger)
.order(100) // runs last — reads everything
.budget(5_ms) // logging can take a moment
.on_miss(horus::Miss::Warn) // missed log is annoying but harmless
.build();
sched.spin();
}
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| Subscriber buffer overflows | Only calling recv() on active ticks, so 200 Hz messages pile up between 1 Hz reads | Always drain subscribers every tick, even on skip ticks |
| Camera runs at wrong rate | Divisor calculation is off (200/30 = 6.67, rounded to 7 gives 28.6 Hz not 30) | Accept the approximation, or set master rate to LCM of all rates (e.g., 600 Hz) |
| Controller sees stale IMU data | Controller order is lower than IMU — it runs before IMU publishes | Ensure producers have lower order values than consumers |
| Logger shows 0 messages/sec | Logger divisor is wrong or logger does not drain on skip ticks | Drain subscribers every tick; only gate the logging output |
| High CPU usage with many skip ticks | Tick overhead per node even on skip, multiplied by 200 Hz | Reduce master rate if most nodes are slow, or use Miss::Skip to shed load |
| Budget miss on camera node | 15ms budget too tight for inference on the target hardware | Profile actual inference time and set budget to p99 + 2ms headroom |
Design Decisions
| Choice | Rationale |
|---|---|
| Master rate = fastest node (200 Hz) | Ensures the fastest node gets a tick every cycle without needing sub-tick scheduling |
| Tick dividers inside each Node | Each node owns its rate logic — the scheduler does not need to know about multiple rates |
| Drain subscribers on every tick | Prevents buffer overflow when a slow consumer reads a fast producer's topic |
| Separate Node subclasses (not lambdas) | Each node has its own state, enter_safe_state(), and testable interface |
Controller has enter_safe_state() | The controller is the actuator node — it must zero outputs on any safety event |
Miss::Skip for controller and camera | Stale sensor data or stale commands are worse than skipping a cycle |
Miss::Warn for IMU and logger | IMU overrun is unusual and worth investigating; logger overrun is harmless |
Variations
LCM-based master rate — if you need exact rates for all nodes, set the master rate to the least common multiple:
// Exact 200 Hz, 100 Hz, 30 Hz, 1 Hz requires LCM = 600 Hz
sched.tick_rate(600_hz);
// Divisors: IMU=3, Controller=6, Camera=20, Logger=600
// Downside: 600 Hz master means more overhead per tick
Node-level rate configuration — pass rate as a parameter instead of computing divisors at compile time:
class RateNode : public horus::Node {
public:
RateNode(const char* name, int master_hz, int target_hz)
: Node(name), divisor_(master_hz / target_hz) {}
void tick() override {
// Common divider logic
if (++counter_ % divisor_ != 0) return;
do_work(); // subclass implements actual computation
}
protected:
virtual void do_work() = 0;
private:
int divisor_;
int counter_ = 0;
};
Priority-based budgeting — give critical nodes more budget by stealing from non-critical ones:
// Total per-tick budget: 5ms at 200 Hz
// IMU: 500us (critical — sensor timing matters)
// Controller: 3ms (critical — actuator safety)
// Camera: 15ms (only runs every 7th tick, can burst)
// Logger: 1ms (best effort — can be skipped entirely)
sched.add(logger)
.order(100)
.budget(1_ms)
.on_miss(horus::Miss::Skip) // expendable — skip if time is tight
.build();
Key Takeaways
- Set the master tick rate to the fastest node's rate — slower nodes use integer divisors
- Always drain subscriber buffers every tick, even on skip ticks, to prevent stale data accumulation
- Execution order is deterministic: lower
order()values run first, guaranteeing producer-before-consumer - Each node should be a
horus::Nodesubclass with its own state andenter_safe_state()for actuator nodes - Accept approximate rates from integer division (200/7 = 28.6 Hz instead of 30 Hz) or use LCM master rates for exactness