Scheduler API

The Scheduler is the central orchestrator in HORUS. It creates topics, registers nodes, manages their lifecycle, and drives the tick loop. Configuration uses a builder pattern -- chain methods to set tick rate, RT mode, watchdog, and networking, then call spin() to run.

Rust: See Scheduler for the Rust equivalent. Python: See horus.Scheduler for the Python equivalent.

// simplified
#include <horus/scheduler.hpp>
using namespace horus::literals;

Quick Reference -- Scheduler Methods

MethodReturnsDescription
Scheduler()SchedulerConstruct a new scheduler
.tick_rate(Frequency)Scheduler&Set the global tick rate
.name(string_view)Scheduler&Set the scheduler name
.prefer_rt()Scheduler&Prefer RT scheduling (graceful degradation)
.require_rt()Scheduler&Require RT scheduling (fail if unavailable)
.deterministic(bool)Scheduler&Enable deterministic mode (SimClock + seeded RNG)
.verbose(bool)Scheduler&Enable verbose logging
.watchdog(Duration)Scheduler&Set global watchdog timeout
.blackbox(size_t)Scheduler&Set BlackBox flight recorder size in MB
.enable_network()Scheduler&Enable LAN network replication
.advertise<T>(string_view)Publisher<T>Create a publisher for a named topic
.subscribe<T>(string_view)Subscriber<T>Create a subscriber for a named topic
.add(string_view)NodeBuilderAdd a lambda node by name
.add(Node&)NodeBuilderAdd a struct-based node
.add(LambdaNode&)NodeBuilderAdd a LambdaNode
.spin()voidRun the scheduler (blocks until stopped)
.tick_once()voidExecute a single tick of all nodes
.stop()voidStop the scheduler (thread-safe)
.is_running()boolCheck if the scheduler is still running
.get_name()std::stringGet the scheduler name
.status()std::stringGet a human-readable status string
.has_full_rt()boolCheck if full RT capabilities are available
.node_list()std::vector<std::string>Get list of registered node names

Quick Reference -- NodeBuilder Methods

MethodReturnsDescription
.rate(Frequency)NodeBuilder&Set tick rate for this node
.budget(Duration)NodeBuilder&Set execution budget (auto-enables RT)
.deadline(Duration)NodeBuilder&Set hard deadline (auto-enables RT)
.on_miss(Miss)NodeBuilder&Set deadline miss policy
.compute()NodeBuilder&Mark as compute-class (CPU-bound)
.async_io()NodeBuilder&Mark as async I/O class
.on(string_view)NodeBuilder&Trigger on topic message (event-driven)
.order(uint32_t)NodeBuilder&Set execution order within tick
.pin_core(size_t)NodeBuilder&Pin to a specific CPU core
.priority(int32_t)NodeBuilder&Set thread priority
.watchdog(Duration)NodeBuilder&Set per-node watchdog timeout
.tick(function)NodeBuilder&Set the tick callback
.init(function)NodeBuilder&Set the init callback (called once)
.safe_state(function)NodeBuilder&Set the enter_safe_state callback
.build()voidFinalize and register the node

Construction and Configuration

The Scheduler uses a builder pattern. All configuration methods return Scheduler& for chaining. Configuration is deferred -- nothing runs until spin() or tick_once().

#include <horus/scheduler.hpp>
using namespace horus::literals;

auto sched = horus::Scheduler()
    .tick_rate(100_hz)         // 100 Hz global tick rate
    .name("arm_controller")    // scheduler name (shown in logs)
    .prefer_rt()               // request SCHED_FIFO (fallback to SCHED_OTHER)
    .watchdog(500_ms)          // kill nodes that exceed 500ms
    .blackbox(64)              // 64 MB flight recorder
    .verbose(true);            // print scheduling decisions

RT Mode Selection

MethodBehavior
.prefer_rt()Request real-time scheduling. Falls back gracefully if CAP_SYS_NICE is unavailable
.require_rt()Require real-time scheduling. Fails with an error if RT is unavailable
(neither)Best-effort scheduling only

Check RT availability at runtime:

if (sched.has_full_rt()) {
    printf("Running with SCHED_FIFO\n");
} else {
    printf("Falling back to SCHED_OTHER\n");
}

Deterministic Mode

Deterministic mode replaces wall clock with SimClock and seeds all RNG. Use for reproducible tests and replay:

auto sched = horus::Scheduler()
    .tick_rate(100_hz)
    .deterministic(true);   // SimClock + seeded RNG

Creating Topics

Topics are created on the scheduler before nodes are added. The scheduler owns the underlying shared memory segments.

auto cmd_pub   = sched.advertise<horus::msg::CmdVel>("motor.cmd");
auto scan_sub  = sched.subscribe<horus::msg::LaserScan>("lidar.scan");
auto imu_sub   = sched.subscribe<horus::msg::Imu>("imu.data");
auto odom_pub  = sched.advertise<horus::msg::Odometry>("odom");

Topic names use dots (not slashes) as separators. This is required for macOS shm_open compatibility.

See Publisher and Subscriber API for the full messaging API.


Adding Nodes

The scheduler supports three node styles. All go through NodeBuilder for scheduling configuration.

Style 1: Lambda Node (Inline)

The simplest approach. Pass a name and a tick callback:

sched.add("obstacle_detector")
    .rate(50_hz)
    .budget(5_ms)
    .on_miss(horus::Miss::Skip)
    .tick([&] {
        auto scan = scan_sub.recv();
        if (!scan) return;
        // process scan...
    })
    .build();

Style 2: Struct-Based Node

Subclass horus::Node for complex nodes with state (see Node API):

ArmController ctrl;  // subclass of horus::Node
sched.add(ctrl).rate(100_hz).budget(2_ms).build();

Style 3: LambdaNode

Declarative node with builder pattern for pub/sub. Like Python's horus.Node():

auto nav = horus::LambdaNode("navigator")
    .sub<horus::msg::Odometry>("odom")
    .pub<horus::msg::CmdVel>("motor.cmd")
    .on_tick([](horus::LambdaNode& self) {
        auto odom = self.recv<horus::msg::Odometry>("odom");
        if (!odom) return;
        self.send("motor.cmd", horus::msg::CmdVel{0, 0.3f, 0.0f});
    });

sched.add(nav)
    .rate(20_hz)
    .build();

See Node API for the full lifecycle and introspection API.


NodeBuilder Configuration

Every sched.add(...) call returns a NodeBuilder. Chain scheduling options before calling .build().

Execution Class Auto-Detection

The scheduler automatically assigns an execution class based on what you configure:

ConfigurationDetected ClassThread
.rate() + .budget() or .deadline()RtDedicated RT thread, SCHED_FIFO
.rate() onlyBestEffortShared thread pool
.compute()ComputeCPU-bound thread pool
.async_io()AsyncIoI/O thread pool
.on("topic")EventWakes on message arrival
// RT node: rate + budget auto-detects as Rt class
sched.add("safety_monitor")
    .rate(1000_hz)
    .budget(100_us)
    .deadline(900_us)
    .on_miss(horus::Miss::SafeMode)
    .priority(90)
    .pin_core(3)
    .tick([&] { /* safety checks */ })
    .build();

// Compute node: long-running CPU work
sched.add("path_planner")
    .compute()
    .tick([&] { /* A* search */ })
    .build();

// Event-driven node: wakes on message
sched.add("logger")
    .on("diagnostics.status")
    .tick([&] { /* log message */ })
    .build();

Deadline Miss Policies

PolicyBehavior
Miss::WarnLog a warning, continue execution
Miss::SkipSkip the current tick, reset for next cycle
Miss::SafeModeCall enter_safe_state(), then continue
Miss::StopStop the node permanently

Init and Safe State Callbacks

Lambda nodes can set lifecycle callbacks through the builder:

sched.add("motor_driver")
    .rate(100_hz)
    .budget(2_ms)
    .init([&] {
        printf("Motor driver initialized\n");
        // one-time hardware setup
    })
    .safe_state([&] {
        // send zero velocity on watchdog timeout
        cmd_pub.send(horus::msg::CmdVel{0, 0.0f, 0.0f});
    })
    .tick([&] {
        // normal motor control
    })
    .build();

Running the Scheduler

Blocking Spin

spin() blocks the calling thread until the scheduler is stopped (via Ctrl+C, SIGTERM, or .stop()):

sched.spin();
// execution resumes here after shutdown

Single Tick

tick_once() executes exactly one tick of all registered nodes. Useful for testing and stepped simulation:

for (int i = 0; i < 1000; ++i) {
    sched.tick_once();
}

Stopping from Another Thread

stop() is thread-safe. Call it from a signal handler, another thread, or a node's tick callback:

// Stop programmatically after 10 seconds:
std::thread timer([&] {
    std::this_thread::sleep_for(std::chrono::seconds(10));
    sched.stop();
});
sched.spin();
timer.join();

Runtime Queries

Query the scheduler state at any time (all methods are thread-safe):

// Check if still running
if (sched.is_running()) { /* ... */ }

// Get the scheduler name
std::string name = sched.get_name();

// Get human-readable status
std::string info = sched.status();

// List all registered nodes
auto nodes = sched.node_list();
for (const auto& n : nodes) {
    printf("  node: %s\n", n.c_str());
}

Common Patterns

Multi-Rate System

Different nodes run at different rates within the same scheduler:

auto sched = horus::Scheduler()
    .tick_rate(1000_hz)   // GCD of all node rates
    .prefer_rt();

sched.add("safety")
    .rate(1000_hz).budget(50_us).priority(99)
    .tick([&] { /* fastest, highest priority */ }).build();

sched.add("controller")
    .rate(100_hz).budget(2_ms)
    .tick([&] { /* medium rate */ }).build();

sched.add("planner")
    .rate(10_hz).compute()
    .tick([&] { /* slow, CPU-heavy */ }).build();

sched.spin();

Test Harness with tick_once

Step through execution deterministically for unit tests:

auto sched = horus::Scheduler().tick_rate(100_hz).deterministic(true);
auto pub = sched.advertise<horus::msg::CmdVel>("cmd");
auto sub = sched.subscribe<horus::msg::CmdVel>("cmd");

int tick_count = 0;
sched.add("producer").rate(100_hz)
    .tick([&] { pub.send(horus::msg::CmdVel{0, 1.0f, 0.0f}); tick_count++; })
    .build();

sched.tick_once();
assert(tick_count == 1);
auto msg = sub.recv();
assert(msg.has_value());

Network-Enabled Scheduler

auto sched = horus::Scheduler().tick_rate(100_hz)
    .enable_network().name("robot_01");  // topics visible on LAN
auto pub = sched.advertise<horus::msg::Odometry>("robot_01.odom");

Ownership

Scheduler is move-only. It owns the underlying Rust Box<FfiScheduler> and releases it in the destructor. Copy is deleted:

horus::Scheduler a;
// horus::Scheduler b = a;           // COMPILE ERROR
horus::Scheduler b = std::move(a);   // OK

See Also