Multi-Rate Pipeline (C++)

Run an IMU at 200 Hz, a controller at 100 Hz, a camera pipeline at 30 Hz, and a logger at 1 Hz — all in one scheduler with deterministic ordering and per-node budgets. Each node is a horus::Node subclass with its own tick divider.

Problem

Your robot has sensors and actuators that operate at fundamentally different rates. The IMU publishes at 200 Hz, the depth camera at 30 Hz, the control loop needs 100 Hz, and diagnostics only need 1 Hz. Running separate schedulers wastes resources and makes ordering non-deterministic. You need a single scheduler where every node runs at its natural rate with guaranteed execution order.

How It Works

A HORUS scheduler ticks at one master rate — typically the rate of the fastest node in the pipeline. Slower nodes use tick dividers: an internal counter that increments every tick and only executes the node's real work when the counter hits the divisor. For example, if the scheduler runs at 200 Hz and a node needs 100 Hz, that node's divider is 2 — it does work every 2nd tick and returns immediately on the others.

The key formula is:

divisor = master_rate / node_rate

So at 200 Hz master: IMU divisor = 1 (every tick), controller divisor = 2 (every other tick), camera divisor = 7 (approximately every 7th tick for ~28.6 Hz), logger divisor = 200 (once per second).

Ordering is deterministic within each tick. Nodes with lower order() values run first. This guarantees that sensor producers run before consumers: IMU (order 0) publishes before the controller (order 10) reads, and the controller publishes before the logger (order 100) reads. Even nodes that skip their tick via the divider still occupy their slot — they just return immediately.

Budget enforcement applies per tick, not per active cycle. If a node is in its skip phase, it consumes near-zero time. If it is in its active phase and overruns the budget, the miss policy kicks in. This means you can give tight budgets to critical nodes and generous budgets to non-critical ones without them interfering.

When To Use

  • Any robot with sensors at different rates (IMU at 200 Hz, camera at 30 Hz, LiDAR at 10 Hz)
  • When you need deterministic execution order across all rates
  • When running multiple schedulers would waste CPU or complicate inter-node communication
  • Embedded systems where a single event loop is simpler than multi-threaded pipelines

Prerequisites

Complete Code

#include <horus/horus.hpp>
#include <algorithm>
#include <cmath>
#include <cstdio>
using namespace horus::literals;

// ============================================================
// IMU Driver — 200 Hz (every tick)
// Reads the IMU hardware and publishes raw measurements.
// ============================================================
class ImuDriver : public horus::Node {
public:
    ImuDriver() : Node("imu_driver") {
        imu_pub_ = advertise<horus::msg::Imu>("imu.data");
    }

    void tick() override {
        // Real driver would read SPI/I2C here
        horus::msg::Imu data{};
        data.linear_acceleration[0] = 0.02f;   // slight forward tilt (m/s^2)
        data.linear_acceleration[1] = 0.0f;
        data.linear_acceleration[2] = 9.81f;   // gravity
        data.angular_velocity[0] = 0.001f;     // near-zero roll rate (rad/s)
        data.angular_velocity[1] = 0.0f;
        data.angular_velocity[2] = 0.05f;      // gentle yaw rate during a turn
        imu_pub_->send(data);
    }

private:
    horus::Publisher<horus::msg::Imu>* imu_pub_;
};

// ============================================================
// Camera Pipeline — 30 Hz (every ~7th tick at 200 Hz master)
// Processes depth frames and publishes detection results.
// ============================================================
class CameraPipeline : public horus::Node {
public:
    explicit CameraPipeline(int divisor)
        : Node("camera_pipeline"), divisor_(divisor) {
        det_pub_ = advertise<horus::msg::CmdVel>("camera.detections");
    }

    void tick() override {
        // Tick divider: skip ticks that aren't ours
        if (++tick_counter_ % divisor_ != 0) return;

        // Simulate depth frame processing — in production this would
        // run an inference model or stereo matching pipeline
        float detection_score = 0.87f;  // confidence of nearest obstacle
        float detection_range = 2.4f;   // meters to detected object

        horus::msg::CmdVel det{};
        det.linear = detection_range;    // pack range into linear field
        det.angular = detection_score;   // pack confidence into angular field
        det_pub_->send(det);

        if (++active_count_ % 30 == 0) {
            horus::log::info("camera", "Frame processed — 30 Hz active cycle");
        }
    }

private:
    horus::Publisher<horus::msg::CmdVel>* det_pub_;
    int divisor_;
    int tick_counter_ = 0;
    int active_count_ = 0;
};

// ============================================================
// Controller — 100 Hz (every 2nd tick at 200 Hz master)
// Fuses IMU + camera detections into velocity commands.
// ============================================================
class Controller : public horus::Node {
public:
    explicit Controller(int divisor)
        : Node("controller"), divisor_(divisor) {
        imu_sub_ = subscribe<horus::msg::Imu>("imu.data");
        det_sub_ = subscribe<horus::msg::CmdVel>("camera.detections");
        cmd_pub_ = advertise<horus::msg::CmdVel>("cmd_vel");
    }

    void tick() override {
        // IMPORTANT: always drain subscribers every tick, even on skip ticks.
        // If you only recv() on active ticks, stale messages pile up in the buffer.
        auto imu = imu_sub_->recv();
        auto det = det_sub_->recv();

        // Tick divider: skip the computation but still drained the buffers above
        if (++tick_counter_ % divisor_ != 0) return;

        // Base forward speed — 0.3 m/s cruising
        float linear = 0.3f;
        float angular = 0.0f;

        // If camera detected an obstacle, slow down proportionally
        if (det) {
            float range = det->get()->linear;
            float confidence = det->get()->angular;
            if (confidence > 0.7f && range < 3.0f) {
                // Scale speed: full at 3m, zero at 0.5m
                float factor = std::clamp((range - 0.5f) / 2.5f, 0.0f, 1.0f);
                linear = 0.3f * factor;
            }
        }

        // Use IMU yaw rate for heading correction
        if (imu) {
            float gz = imu->get()->angular_velocity[2];
            // Compensate for drift: if turning unintentionally, correct
            if (std::abs(gz) > 0.02f) {
                angular = -0.5f * gz;  // P-controller on yaw drift
            }
        }

        horus::msg::CmdVel cmd{};
        cmd.linear = linear;
        cmd.angular = angular;
        cmd_pub_->send(cmd);
    }

    void enter_safe_state() override {
        // Zero all motion on safety event
        horus::msg::CmdVel stop{};
        cmd_pub_->send(stop);
        horus::blackbox::record("controller",
            "Entered safe state — motors zeroed");
    }

private:
    horus::Subscriber<horus::msg::Imu>*     imu_sub_;
    horus::Subscriber<horus::msg::CmdVel>*  det_sub_;
    horus::Publisher<horus::msg::CmdVel>*    cmd_pub_;
    int divisor_;
    int tick_counter_ = 0;
};

// ============================================================
// Diagnostics Logger — 1 Hz (every 200th tick at 200 Hz master)
// Records system health and topic throughput.
// ============================================================
class DiagLogger : public horus::Node {
public:
    explicit DiagLogger(int divisor)
        : Node("diag_logger"), divisor_(divisor) {
        cmd_sub_ = subscribe<horus::msg::CmdVel>("cmd_vel");
        imu_sub_ = subscribe<horus::msg::Imu>("imu.data");
    }

    void tick() override {
        // Drain every tick to count messages
        if (auto cmd = cmd_sub_->recv()) {
            last_linear_ = cmd->get()->linear;
            last_angular_ = cmd->get()->angular;
            cmd_count_++;
        }
        if (auto imu = imu_sub_->recv()) {
            imu_count_++;
        }

        if (++tick_counter_ % divisor_ != 0) return;

        // Log once per second
        char buf[256];
        std::snprintf(buf, sizeof(buf),
            "cmd_vel=(%.2f m/s, %.2f rad/s)  "
            "imu_msgs/s=%d  cmd_msgs/s=%d",
            last_linear_, last_angular_,
            imu_count_, cmd_count_);
        horus::log::info("diag", buf);

        // Reset counters for next 1-second window
        imu_count_ = 0;
        cmd_count_ = 0;
    }

private:
    horus::Subscriber<horus::msg::CmdVel>* cmd_sub_;
    horus::Subscriber<horus::msg::Imu>*    imu_sub_;
    int divisor_;
    int tick_counter_ = 0;
    int imu_count_ = 0;
    int cmd_count_ = 0;
    float last_linear_ = 0.0f;
    float last_angular_ = 0.0f;
};

int main() {
    horus::Scheduler sched;
    // Master rate = fastest node (IMU at 200 Hz)
    sched.tick_rate(200_hz).name("multi_rate_pipeline");

    // ── Compute divisors from master rate ───────────────────
    // IMU:        200 / 200 = 1  (every tick)
    // Camera:     200 / 30  ~ 7  (every 7th tick = 28.6 Hz)
    // Controller: 200 / 100 = 2  (every 2nd tick)
    // Logger:     200 / 1   = 200 (every 200th tick)

    ImuDriver imu;
    sched.add(imu)
        .order(0)                        // runs first — sensor producer
        .budget(500_us)                  // SPI/I2C read should be fast
        .on_miss(horus::Miss::Warn)      // IMU overrun is notable but not fatal
        .build();

    CameraPipeline camera(7);
    sched.add(camera)
        .order(1)                        // after IMU, before controller
        .budget(15_ms)                   // vision processing takes longer
        .on_miss(horus::Miss::Skip)      // skip if overrun — stale frame is useless
        .build();

    Controller ctrl(2);
    sched.add(ctrl)
        .order(10)                       // after all sensors
        .budget(3_ms)                    // control math is lightweight
        .on_miss(horus::Miss::Skip)      // skip if overrun — stale commands dangerous
        .build();

    DiagLogger logger(200);
    sched.add(logger)
        .order(100)                      // runs last — reads everything
        .budget(5_ms)                    // logging can take a moment
        .on_miss(horus::Miss::Warn)      // missed log is annoying but harmless
        .build();

    sched.spin();
}

Common Errors

SymptomCauseFix
Subscriber buffer overflowsOnly calling recv() on active ticks, so 200 Hz messages pile up between 1 Hz readsAlways drain subscribers every tick, even on skip ticks
Camera runs at wrong rateDivisor calculation is off (200/30 = 6.67, rounded to 7 gives 28.6 Hz not 30)Accept the approximation, or set master rate to LCM of all rates (e.g., 600 Hz)
Controller sees stale IMU dataController order is lower than IMU — it runs before IMU publishesEnsure producers have lower order values than consumers
Logger shows 0 messages/secLogger divisor is wrong or logger does not drain on skip ticksDrain subscribers every tick; only gate the logging output
High CPU usage with many skip ticksTick overhead per node even on skip, multiplied by 200 HzReduce master rate if most nodes are slow, or use Miss::Skip to shed load
Budget miss on camera node15ms budget too tight for inference on the target hardwareProfile actual inference time and set budget to p99 + 2ms headroom

Design Decisions

ChoiceRationale
Master rate = fastest node (200 Hz)Ensures the fastest node gets a tick every cycle without needing sub-tick scheduling
Tick dividers inside each NodeEach node owns its rate logic — the scheduler does not need to know about multiple rates
Drain subscribers on every tickPrevents buffer overflow when a slow consumer reads a fast producer's topic
Separate Node subclasses (not lambdas)Each node has its own state, enter_safe_state(), and testable interface
Controller has enter_safe_state()The controller is the actuator node — it must zero outputs on any safety event
Miss::Skip for controller and cameraStale sensor data or stale commands are worse than skipping a cycle
Miss::Warn for IMU and loggerIMU overrun is unusual and worth investigating; logger overrun is harmless

Variations

LCM-based master rate — if you need exact rates for all nodes, set the master rate to the least common multiple:

// Exact 200 Hz, 100 Hz, 30 Hz, 1 Hz requires LCM = 600 Hz
sched.tick_rate(600_hz);
// Divisors: IMU=3, Controller=6, Camera=20, Logger=600
// Downside: 600 Hz master means more overhead per tick

Node-level rate configuration — pass rate as a parameter instead of computing divisors at compile time:

class RateNode : public horus::Node {
public:
    RateNode(const char* name, int master_hz, int target_hz)
        : Node(name), divisor_(master_hz / target_hz) {}

    void tick() override {
        // Common divider logic
        if (++counter_ % divisor_ != 0) return;
        do_work();  // subclass implements actual computation
    }

protected:
    virtual void do_work() = 0;

private:
    int divisor_;
    int counter_ = 0;
};

Priority-based budgeting — give critical nodes more budget by stealing from non-critical ones:

// Total per-tick budget: 5ms at 200 Hz
// IMU:        500us (critical — sensor timing matters)
// Controller: 3ms   (critical — actuator safety)
// Camera:     15ms  (only runs every 7th tick, can burst)
// Logger:     1ms   (best effort — can be skipped entirely)
sched.add(logger)
    .order(100)
    .budget(1_ms)
    .on_miss(horus::Miss::Skip)  // expendable — skip if time is tight
    .build();

Key Takeaways

  • Set the master tick rate to the fastest node's rate — slower nodes use integer divisors
  • Always drain subscriber buffers every tick, even on skip ticks, to prevent stale data accumulation
  • Execution order is deterministic: lower order() values run first, guaranteeing producer-before-consumer
  • Each node should be a horus::Node subclass with its own state and enter_safe_state() for actuator nodes
  • Accept approximate rates from integer division (200/7 = 28.6 Hz instead of 30 Hz) or use LCM master rates for exactness