Scheduler API

The Scheduler orchestrates node execution with tick-rate control, real-time scheduling, watchdogs, and recording.

import horus

sched = horus.Scheduler(tick_rate=100, rt=True)
sched.add(my_node)
sched.run()

Constructor

horus.Scheduler(
    *,                              # keyword-only
    tick_rate=1000.0,               # Global tick rate in Hz
    rt=False,                       # Enable RT scheduling (memory locking, SCHED_FIFO)
    deterministic=False,            # SimClock, fixed dt, seeded RNG
    blackbox_mb=0,                  # Flight recorder size (0 = disabled)
    watchdog_ms=0,                  # Global watchdog timeout (0 = disabled)
    recording=False,                # Enable session recording
    name=None,                      # Scheduler name for logging
    cores=None,                     # CPU affinity list (e.g., [0, 1])
    max_deadline_misses=None,       # Escalation threshold
    verbose=False,                  # Debug logging
    telemetry=None,                 # Telemetry endpoint URL
)

Lifecycle Methods

MethodSignatureReturnsDescription
addadd(node: Node) -> Schedulerself (chaining)Register a node
runrun(duration: float = None)Start tick loop. None = run forever
stopstop()Signal graceful shutdown
is_runningis_running() -> boolboolCheck if scheduler is running
statusstatus() -> str"idle", "running", "stopped"Current state
current_tickcurrent_tick() -> intTick countCurrent tick number
scheduler_namescheduler_name() -> strNameScheduler name for logging

add()

sched.add(node) -> Scheduler  # Returns self for chaining

Registers a node with the scheduler. Returns self for chaining: sched.add(a).add(b).add(c).

Edge cases:

  • Duplicate name raises an error — node names must be unique
  • Can be called before run() only — adding nodes during run() is not supported
  • Does NOT call init() — init happens lazily on first run() or tick_once()

run()

sched.run(duration: float = None)

Start the tick loop. Blocks until completion.

  • duration=None — run forever (until Ctrl+C, SIGTERM, or request_stop())
  • duration=10.0 — run for 10 seconds, then return
  • GIL is released during the Rust scheduler loop — other Python threads run freely
  • GIL is re-acquired only when calling Python tick/init/shutdown callbacks (~500ns per acquire)
  • Ctrl+C triggers graceful shutdown: all nodes get shutdown() called

stop()

sched.stop()

Signal graceful shutdown from another thread or from within a node's request_stop().


Single-Tick Execution (Testing & Simulation)

MethodSignatureReturnsDescription
tick_oncetick_once(node_names: list = None)Execute one tick cycle (lazy init on first call)
tick_fortick_for(duration: float, node_names: list = None)Run tick loop for a duration, then return

tick_once()

sched.tick_once(node_names: list = None)

Execute exactly one tick cycle and return. Lazy init: on first call, init() is called on all nodes.

  • node_names=None — tick all nodes in order
  • node_names=["sensor", "controller"] — tick only named nodes (skip others)
  • Each call advances the tick counter by 1
  • In deterministic mode, horus.dt() returns fixed 1/rate per tick
  • Async nodes are handled transparently (async event loop runs internally)

Edge cases:

  • Calling before add() does nothing (no nodes to tick)
  • Filtered node_names that don't exist are silently ignored
  • If a node's init() fails, tick_once() raises based on failure_policy

tick_for()

sched.tick_for(duration: float, node_names: list = None)

Run the tick loop for duration seconds, then return. Useful for bounded test runs:

sched.tick_for(1.0)  # Run for 1 second at tick_rate, then return

Example: Testing with tick_once()

Step through ticks manually for unit testing and simulation:

import horus

results = []

def sensor_tick(node):
    node.send("temp", {"value": 25.0 + horus.tick() * 0.1})

def logger_tick(node):
    msg = node.recv("temp")
    if msg:
        results.append(msg["value"])

sensor = horus.Node(name="sensor", pubs=["temp"], tick=sensor_tick, rate=100, order=0)
logger = horus.Node(name="logger", subs=["temp"], tick=logger_tick, rate=100, order=1)

sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(sensor)
sched.add(logger)

# Step through 5 ticks
for _ in range(5):
    sched.tick_once()

assert len(results) == 5
assert results[0] == 25.0
print(f"Passed: {results}")

Runtime Mutation

MethodSignatureReturnsDescription
set_node_rateset_node_rate(name: str, rate: float)Change a node's tick rate at runtime
set_tick_budgetset_tick_budget(name: str, budget_us: int)Change a node's tick budget at runtime (microseconds)
add_critical_nodeadd_critical_node(name: str, timeout_ms: int)Mark node as safety-critical with watchdog timeout
remove_noderemove_node(name: str) -> boolboolExclude a node from stats and queries (node still ticks until next restart)

Example: Runtime Safety Configuration

Mark safety-critical nodes and adjust budgets at runtime:

import horus

def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd:
        node.send("motor_cmd", {"rpm": cmd.linear * 100})

motor = horus.Node(
    name="motor",
    subs=[horus.CmdVel],
    pubs=["motor_cmd"],
    tick=motor_tick,
    rate=1000,
    budget=300 * horus.us,
    on_miss="safe_mode",
)

sched = horus.Scheduler(tick_rate=1000, watchdog_ms=500)
sched.add(motor)

# Mark motor as critical — triggers enter_safe_state() on all nodes if motor
# exceeds 500ms without ticking
sched.add_critical_node("motor", timeout_ms=500)

# Adjust budget at runtime (e.g., after profiling shows headroom)
sched.set_tick_budget("motor", 200)  # tighten to 200μs

# Check RT capabilities
if sched.has_full_rt():
    print("Full RT: memory locked, SCHED_FIFO active")
else:
    for d in sched.degradations():
        print(f"RT degradation: {d['feature']} — {d['reason']}")

sched.run()

# After run, inspect safety stats
stats = sched.safety_stats()
if stats:
    print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
    print(f"Watchdog expirations: {stats.get('watchdog_expirations', 0)}")

Introspection

MethodSignatureReturnsDescription
get_node_statsget_node_stats(name: str) -> dictMetrics dictGet node performance stats
get_all_nodesget_all_nodes() -> listNode info listGet all registered nodes
get_node_namesget_node_names() -> list[str]Name listGet all node names
get_node_countget_node_count() -> intCountNumber of registered nodes
has_nodehas_node(name: str) -> boolboolCheck if a node exists
get_node_infoget_node_info(name: str) -> Optional[int]Order or NoneGet execution order of a node

Example: Introspection at Runtime

sched = horus.Scheduler(tick_rate=100, name="my_robot")
sched.add(sensor)
sched.add(controller)

# After starting (e.g., in a monitoring thread)
print(f"Scheduler: {sched.scheduler_name()}")
print(f"Status: {sched.status()}")
print(f"Nodes: {sched.get_node_names()}")
print(f"Count: {sched.get_node_count()}")
print(f"Has motor? {sched.has_node('motor')}")

# Per-node stats
for name in sched.get_node_names():
    stats = sched.get_node_stats(name)
    print(f"  {name}: {stats['total_ticks']} ticks, avg {stats.get('avg_tick_duration_ms', 0):.2f}ms")

RT & Safety

MethodSignatureReturnsDescription
capabilitiescapabilities() -> dictCapability dictRT support, CPU features
has_full_rthas_full_rt() -> boolboolFull RT capabilities available?
degradationsdegradations() -> list[dict]Degradation listRT features requested but unavailable (dicts with feature, reason, severity keys)
safety_statssafety_stats() -> dictStats dictWatchdog stats, deadline misses, health states

Recording & Replay

MethodSignatureReturnsDescription
is_recordingis_recording() -> boolboolSession recording active?
is_replayingis_replaying() -> boolboolSession replay active?
stop_recordingstop_recording() -> list[str]File pathsStop recording, return session files
list_recordingslist_recordings() -> list[str]Session listList available recordings
delete_recordingdelete_recording(name: str)NoneDelete a recorded session

Example: Recording and Replay

import horus

def sensor_tick(node):
    node.send("imu", horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81))

sensor = horus.Node(name="imu", pubs=[horus.Imu], tick=sensor_tick, rate=100)

# Record a 5-second session
sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.run(duration=5.0)

# Stop recording and get file paths
files = sched.stop_recording()
print(f"Recorded to: {files}")

# List all recordings
for rec in sched.list_recordings():
    print(f"  Session: {rec}")

# Delete a recording
sched.delete_recording(files[0])

Context Manager

with horus.Scheduler(tick_rate=100) as sched:
    sched.add(sensor_node)
    sched.add(controller_node)
    sched.run(duration=10.0)  # Run for 10 seconds
# sched.stop() called automatically on exit

Deterministic Mode

Deterministic mode uses SimClock (fixed dt), seeded RNG, and sequential execution — every run produces identical output:

import horus

outputs = []

def physics_tick(node):
    # horus.dt() returns fixed 1/rate in deterministic mode
    # horus.rng_float() returns tick-seeded values (reproducible)
    noise = horus.rng_float() * 0.01
    position = horus.dt() * 10.0 + noise
    outputs.append(position)

node = horus.Node(name="physics", tick=physics_tick, rate=100)

# Run 1
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(node)
sched.run(duration=1.0)
run1 = outputs.copy()

# Run 2 — identical output
outputs.clear()
sched2 = horus.Scheduler(tick_rate=100, deterministic=True)
sched2.add(horus.Node(name="physics", tick=physics_tick, rate=100))
sched2.run(duration=1.0)
run2 = outputs.copy()

assert run1 == run2, "Deterministic mode guarantees identical output"

Multi-Node System

import horus

def sensor_tick(node):
    reading = horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81)
    node.send("imu", reading)

def controller_tick(node):
    imu = node.recv("imu")
    if imu:
        cmd = horus.CmdVel(linear=0.5 if imu.accel_z > 9.0 else 0.0, angular=0.0)
        node.send("cmd_vel", cmd)

sensor = horus.Node(name="imu_sensor", pubs=[horus.Imu], tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="nav", subs=[horus.Imu], pubs=[horus.CmdVel], tick=controller_tick, rate=50, order=1)

sched = horus.Scheduler(tick_rate=100, watchdog_ms=500)
sched.add(sensor)
sched.add(controller)
sched.run()

run() Convenience Function

One-liner — creates a Scheduler, adds all nodes, and runs:

horus.run(
    *nodes,                         # Node instances to run
    duration=None,                  # Seconds to run (None = forever)
    tick_rate=1000.0,               # Global tick rate
    rt=False,                       # RT scheduling
    deterministic=False,            # Deterministic mode
    watchdog_ms=0,                  # Watchdog timeout
    blackbox_mb=0,                  # Flight recorder
    recording=False,                # Session recording
    name=None,                      # Scheduler name
    cores=None,                     # CPU affinity
    max_deadline_misses=None,       # Miss threshold
    verbose=False,                  # Debug logging
    telemetry=None,                 # Telemetry endpoint
)

Equivalent to:

sched = horus.Scheduler(tick_rate=tick_rate, rt=rt, ...)
for node in nodes:
    sched.add(node)
sched.run(duration=duration)

Execution Classes

The scheduler automatically classifies each node into an execution class based on its configuration:

Node ConfigurationExecution ClassThreadTiming
rate=100, budget=X or deadline=XRt (auto-detected)Dedicated RT thread (SCHED_FIFO if available)Strict budget/deadline enforcement
compute=TrueComputeWorker thread poolNo timing guarantee
on="topic_name"EventEvent-triggeredTick only when topic has data
async def tickAsyncIoAsync I/O thread poolAsync event loop scheduling
None of the aboveBestEffortMain tick threadBest-effort timing

RT auto-detection: Setting rate with budget or deadline automatically classifies the node as RT — no explicit flag needed on the node. The scheduler's rt=True enables the RT runtime (memory locking, SCHED_FIFO); individual nodes opt in via timing constraints.

Mutual exclusion: compute=True, on="topic", and async def tick are mutually exclusive. Combining them raises an error.


Design Decisions

Why keyword-only constructor? All parameters are keyword-only (* in the signature) to prevent positional argument mistakes. Scheduler(100, True) is ambiguous — is 100 the tick rate or watchdog timeout? Scheduler(tick_rate=100, rt=True) is unambiguous.

Why run() blocks? The scheduler's tick loop must own the execution thread for deterministic timing. Returning a future or running in a background thread would add jitter. For concurrent Python work (HTTP server, monitoring), use async def nodes or Python threads — the GIL is released during run().

Why rt=True on the scheduler, not per-node? RT requires system-level setup (memory locking, SCHED_FIFO). This is a process-level decision, not per-node. Individual nodes opt into RT timing via budget/deadline — the scheduler handles the runtime.

Why tick_once() for testing? Unit tests need deterministic, step-by-step execution. tick_once() executes one complete tick cycle (all nodes in order) and returns. This makes tests reproducible without timers or sleeps.

Why release the GIL during run()? The tick loop is Rust code — no Python objects are accessed between callbacks. Releasing the GIL lets other Python threads (Flask server, monitoring, logging) run concurrently. The GIL is re-acquired only for tick()/init()/shutdown() callbacks (~500ns per acquire).


Trade-offs

ChoiceBenefitCost
run() blocksDeterministic timing, no jitterMust use threads for concurrent Python work
GIL released during runOther Python threads work~500ns GIL re-acquire per tick per node
Keyword-only constructorNo positional argument mistakesMore typing than positional args
tick_once() for testingStep-by-step deterministic testsMust manually loop for multi-tick scenarios
Auto RT classificationNo manual RT flagsMust understand budget/deadline → RT mapping
Context manager supportClean resource cleanupExtra indentation

See Also