Testing and Deterministic Mode (Python)

A PID controller runs at 50 Hz. In production, it reads an IMU, computes a correction, and publishes a velocity command --- 20 ms per cycle, wall-clock time, with real sensor noise and OS jitter. In a test, you need the opposite: no wall clock, no jitter, no hardware, and identical results every run. You need to step the scheduler one tick at a time, inject fake sensor data, and assert that the controller produces the exact expected output.

HORUS provides three tools for this:

  • tick_once() --- step the scheduler by exactly one tick cycle, then return control to your test
  • Deterministic mode --- replace the wall clock with a SimClock, fix dt, seed the RNG, and guarantee identical results across runs
  • Record and replay --- capture a full session and replay it for regression testing

Single-Tick Testing with tick_once()

tick_once() is the foundation of all HORUS testing. It executes exactly one tick cycle --- calling tick() on every due node in order --- then returns. No threads, no timing, no sleeping. You control the clock.

import horus

results = []

def counter_tick(node):
    count = (node.recv("count") or 0) + 1
    node.send("count", count)
    results.append(count)

sched = horus.Scheduler(tick_rate=100)
sched.add(horus.Node(
    name="counter",
    tick=counter_tick,
    rate=100,
    order=0,
    pubs=["count"],
    subs=["count"],
))

sched.tick_once()  # init (lazy) + tick 1
sched.tick_once()  # tick 2
sched.tick_once()  # tick 3

assert results == [1, 2, 3]

On the first call to tick_once(), the scheduler lazily initializes: it finalizes node configurations, calls init() on every node in order, then runs one tick. Subsequent calls run one tick each.

Selective Ticking

Pass a list of node names to tick only specific nodes:

sched.tick_once(["sensor"])             # only tick "sensor"
sched.tick_once(["sensor", "controller"])  # tick sensor and controller, skip motor

This is the primary tool for unit-testing a single node in isolation. Other nodes remain frozen --- they do not tick, but their published data stays on the topics.

tick_for() --- Timed Runs

tick_for() runs the tick loop for a specific wall-clock duration, then returns:

sched.tick_for(1.0)                  # run for 1 second
sched.tick_for(0.5, ["sensor"])      # run only sensor for 0.5 seconds

Use this for integration tests that need to observe behavior over many ticks without calling tick_once() in a loop.


Deterministic Mode

Enable deterministic mode on the Scheduler for reproducible results:

sched = horus.Scheduler(tick_rate=100, deterministic=True)

In deterministic mode, three things change:

FeatureNormal modeDeterministic mode
horus.dt()Actual wall-clock elapsed timeFixed 1/rate every tick (e.g., 0.01 at 100 Hz)
horus.now()Wall-clock timeSimClock --- advances by dt each tick
horus.rng_float()System entropyTick-seeded sequence --- same across runs

This guarantees identical results on any machine, any run. The physics does not depend on how fast your CPU is or what else the OS is doing.

sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="sim", tick=sim_tick, rate=100, order=0))

for _ in range(1000):
    sched.tick_once()
    assert horus.dt() == 0.01  # always 1/100

Why Deterministic Mode Matters

Without deterministic mode, a PID controller's output depends on the actual dt between ticks. On a fast machine, dt might be 9.98 ms. On a loaded CI server, it might be 10.5 ms. The integral term accumulates differently, and the test produces different outputs each run. With deterministic=True, dt is always exactly 1/rate, and the test is reproducible.

SimClock Behavior

The SimClock starts at 0.0 and advances by 1/rate each tick:

sched = horus.Scheduler(tick_rate=50, deterministic=True)
sched.add(horus.Node(name="n", tick=lambda node: None, rate=50, order=0))

sched.tick_once()  # horus.now() == 0.02, horus.dt() == 0.02
sched.tick_once()  # horus.now() == 0.04, horus.dt() == 0.02
sched.tick_once()  # horus.now() == 0.06, horus.dt() == 0.02

All time-dependent logic --- horus.now(), horus.dt(), horus.elapsed() --- uses the SimClock. Wall-clock time is irrelevant. This means a 1000-tick test that simulates 10 seconds of robot time finishes in milliseconds.

Seeded RNG

horus.rng_float() returns a repeatable sequence in deterministic mode:

sched = horus.Scheduler(tick_rate=100, deterministic=True)
values = []

def tick(node):
    values.append(horus.rng_float())

sched.add(horus.Node(name="rng", tick=tick, rate=100, order=0))

for _ in range(5):
    sched.tick_once()

# values is identical across runs, across machines

Use horus.rng_float() instead of Python's random.random() for any randomness inside tick(). The system random module is not seeded by HORUS and will produce different results across runs.


Testing Patterns

Unit Testing a Single Node's tick()

Test the tick function directly, without a scheduler, for pure logic validation:

def test_pid_proportional_gain():
    """Verify the P-term of the PID controller."""
    integral = [0.0]
    target = 1.0

    def pid_tick(node):
        imu = node.recv("imu.data")
        if imu is None:
            return
        error = target - imu.accel_x
        integral[0] += error * horus.dt()
        command = 2.0 * error + 0.1 * integral[0]
        node.send("cmd_vel", horus.CmdVel(linear=command, angular=0.0))

    results = []

    def capture_tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            results.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="pid", tick=pid_tick, rate=100, order=0,
                         subs=[horus.Imu], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="capture", tick=capture_tick, rate=100, order=10,
                         subs=[horus.CmdVel]))

    # Inject fake IMU data by publishing before ticking
    imu_topic = horus.Topic("imu.data")
    imu_topic.send(horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
                              gyro_x=0.0, gyro_y=0.0, gyro_z=0.0))

    sched.tick_once()

    assert len(results) == 1
    assert abs(results[0] - 2.0) < 0.01  # P-gain * error = 2.0 * 1.0

Integration Testing with Real Topics

Test data flow across multiple nodes:

def test_sensor_to_controller_pipeline():
    """Verify that sensor data flows through the controller to a motor command."""
    motor_cmds = []

    def fake_imu(node):
        node.send("imu.data", horus.Imu(
            accel_x=0.3, accel_y=0.0, accel_z=9.81,
            gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
        ))

    def controller(node):
        imu = node.recv("imu.data")
        if imu:
            cmd = horus.CmdVel(linear=0.5 - imu.accel_x, angular=0.0)
            node.send("cmd_vel", cmd)

    def mock_motor(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            motor_cmds.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="imu", tick=fake_imu, rate=100, order=0,
                         pubs=[horus.Imu]))
    sched.add(horus.Node(name="ctrl", tick=controller, rate=100, order=10,
                         subs=[horus.Imu], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="motor", tick=mock_motor, rate=100, order=20,
                         subs=[horus.CmdVel]))

    sched.tick_for(0.1)  # 10 ticks at 100 Hz

    assert len(motor_cmds) > 0
    assert abs(motor_cmds[0] - 0.2) < 0.01  # 0.5 - 0.3 = 0.2

Testing Safety Behavior

Verify that deadline miss policies and watchdog fire correctly:

def test_safety_monitor_stops_on_dangerous_velocity():
    """Safety monitor should request_stop() when velocity exceeds threshold."""
    stopped = [False]

    def unsafe_controller(node):
        node.send("cmd_vel", horus.CmdVel(linear=5.0, angular=0.0))

    def safety_tick(node):
        cmd = node.recv("cmd_vel")
        if cmd and abs(cmd.linear) > 2.0:
            node.log_warning(f"Unsafe: {cmd.linear}")
            node.request_stop()
            stopped[0] = True

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="ctrl", tick=unsafe_controller, rate=100,
                         order=0, pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="safety", tick=safety_tick, rate=100,
                         order=1, subs=[horus.CmdVel]))

    sched.tick_once()

    assert stopped[0] is True


def test_watchdog_detects_frozen_node():
    """Verify the watchdog catches a node that hangs."""
    import time

    def frozen_tick(node):
        time.sleep(2.0)  # simulate frozen node

    sched = horus.Scheduler(tick_rate=10, watchdog_ms=100)
    sched.add(horus.Node(name="frozen", tick=frozen_tick, rate=10, order=0))

    sched.tick_once()

    stats = sched.safety_stats()
    assert stats is not None
    assert stats["watchdog_expirations"] > 0

Testing Multi-Rate Interactions

Verify that nodes running at different rates interact correctly:

def test_multi_rate_data_flow():
    """50 Hz controller reads from 100 Hz sensor --- gets latest value."""
    readings = []

    def fast_sensor(node):
        t = horus.now()
        node.send("sensor.data", {"value": t * 100})

    def slow_controller(node):
        data = node.recv("sensor.data")
        if data:
            readings.append(data["value"])

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="sensor", tick=fast_sensor, rate=100,
                         order=0, pubs=["sensor.data"]))
    sched.add(horus.Node(name="ctrl", tick=slow_controller, rate=50,
                         order=10, subs=["sensor.data"]))

    # Run 10 ticks. The sensor ticks 10 times, the controller ticks 5 times.
    for _ in range(10):
        sched.tick_once()

    # Controller should have received about 5 readings
    assert len(readings) >= 4
    assert len(readings) <= 6

Mock Patterns for Hardware Dependencies

Nodes that depend on hardware --- serial ports, I2C buses, GPIO pins --- cannot run in CI. Replace the hardware call with a mock in tests.

Pattern 1: Dependency Injection via Closure

def make_motor_node(write_fn=None):
    """Create a motor node with injectable hardware write function."""
    if write_fn is None:
        write_fn = real_motor_write  # production: actual hardware

    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            write_fn(cmd.linear, cmd.angular)
            node.send("motor.state", {"velocity": cmd.linear})

    return horus.Node(
        name="motor",
        tick=tick,
        rate=50,
        order=10,
        subs=[horus.CmdVel],
        pubs=["motor.state"],
    )


def test_motor_writes_velocity():
    """Test motor node without real hardware."""
    writes = []

    def mock_write(linear, angular):
        writes.append((linear, angular))

    motor = make_motor_node(write_fn=mock_write)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(motor)

    # Inject a command
    topic = horus.Topic("cmd_vel")
    topic.send(horus.CmdVel(linear=0.5, angular=0.1))

    sched.tick_once()

    assert len(writes) == 1
    assert abs(writes[0][0] - 0.5) < 0.01

Pattern 2: Fake Sensor Node

Replace a real hardware driver with a fake that publishes known data:

def make_fake_lidar(ranges=None):
    """Fake LiDAR that publishes static scan data."""
    if ranges is None:
        ranges = [1.0, 2.0, 3.0, 1.5]

    def tick(node):
        node.send("scan", horus.LaserScan(
            ranges=ranges,
            angle_min=-1.57,
            angle_max=1.57,
            angle_increment=3.14 / len(ranges),
        ))

    return horus.Node(
        name="lidar",
        tick=tick,
        rate=40,
        order=0,
        pubs=[horus.LaserScan],
    )


def test_planner_avoids_close_obstacle():
    """Planner should slow down when an obstacle is close."""
    commands = []

    def capture(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            commands.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(make_fake_lidar(ranges=[0.3, 0.5, 1.0, 2.0]))
    sched.add(horus.Node(name="planner", tick=planner_tick, rate=10,
                         order=10, compute=True,
                         subs=[horus.LaserScan], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="capture", tick=capture, rate=100,
                         order=20, subs=[horus.CmdVel]))

    sched.tick_for(0.5)

    assert len(commands) > 0
    assert all(c < 1.0 for c in commands)  # slowed down near obstacle

Pattern 3: Test Fixture with Setup and Teardown

import pytest

@pytest.fixture
def robot_sched():
    """Create a deterministic scheduler with standard test nodes."""
    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(make_fake_lidar())
    sched.add(make_controller())
    sched.add(make_safety())
    yield sched
    # Scheduler cleans up automatically when garbage collected


def test_controller_publishes_at_rate(robot_sched):
    """Controller should publish commands every tick."""
    commands = []

    def capture(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            commands.append(cmd)

    robot_sched.add(horus.Node(name="capture", tick=capture, rate=100,
                               order=100, subs=[horus.CmdVel]))

    for _ in range(50):
        robot_sched.tick_once()

    # Controller at 50 Hz in a 100 Hz scheduler: ~25 commands in 50 ticks
    assert len(commands) >= 20

Record and Replay

Record a session to capture all topic data for later replay and regression testing.

Recording a Session

sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(horus.Node(name="sensor", tick=sensor_tick, rate=100, order=0,
                     pubs=["scan"]))
sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=50, order=10,
                     subs=["scan"], pubs=["cmd_vel"]))

# Run for 60 seconds, recording all topic data
sched.run(duration=60.0)

# Check recording status
print(sched.is_recording())  # True while running

Stopping and Listing Recordings

# Stop recording and get file paths
files = sched.stop_recording()
for f in files:
    print(f"Recorded: {f}")

# List all available recordings
recordings = sched.list_recordings()
for r in recordings:
    print(f"Session: {r}")

Using Recordings for Regression Tests

Record a known-good session, then replay the inputs in a test and assert that outputs match:

def test_controller_regression():
    """Replay recorded sensor data and verify controller outputs match baseline."""
    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=100, order=0,
                         subs=["scan"], pubs=["cmd_vel"]))

    # Load recorded sensor data
    recordings = sched.list_recordings()
    assert len(recordings) > 0

    # Replay: inject recorded data tick by tick
    # (actual replay API depends on your recording format)
    for _ in range(1000):
        sched.tick_once()

    # Compare outputs to baseline
    # ...

Deleting Old Recordings

sched.delete_recording("session_2026_03_20_143000")

pytest Integration

Project Layout

my-robot/
  horus.toml
  src/
    main.py
    nodes/
      imu.py
      controller.py
      planner.py
      safety.py
  tests/
    conftest.py
    test_controller.py
    test_safety.py
    test_integration.py

conftest.py --- Shared Fixtures

import pytest
import horus

@pytest.fixture
def det_sched():
    """Deterministic scheduler for reproducible tests."""
    return horus.Scheduler(tick_rate=100, deterministic=True)


@pytest.fixture
def fake_imu_node():
    """Fake IMU that publishes constant readings."""
    def tick(node):
        node.send("imu.data", horus.Imu(
            accel_x=0.0, accel_y=0.0, accel_z=9.81,
            gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
        ))
    return horus.Node(name="fake_imu", tick=tick, rate=100, order=0,
                      pubs=[horus.Imu])


@pytest.fixture
def capture_node():
    """Capture node that collects CmdVel messages."""
    captured = []

    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            captured.append(cmd)

    node = horus.Node(name="capture", tick=tick, rate=100, order=99,
                      subs=[horus.CmdVel])
    node._test_captured = captured  # attach for test access
    return node

Test File Example

# tests/test_controller.py
import horus
from nodes.controller import make_controller

def test_controller_zero_error_zero_output(det_sched, fake_imu_node, capture_node):
    """When IMU reads match the target, controller output should be near zero."""
    det_sched.add(fake_imu_node)
    det_sched.add(make_controller(target_speed=0.0))
    det_sched.add(capture_node)

    for _ in range(10):
        det_sched.tick_once()

    cmds = capture_node._test_captured
    assert len(cmds) > 0
    assert all(abs(c.linear) < 0.01 for c in cmds)


def test_controller_nonzero_target(det_sched, fake_imu_node, capture_node):
    """When target speed is 1.0 and IMU reads 0.0, controller should output positive."""
    det_sched.add(fake_imu_node)
    det_sched.add(make_controller(target_speed=1.0))
    det_sched.add(capture_node)

    for _ in range(10):
        det_sched.tick_once()

    cmds = capture_node._test_captured
    assert len(cmds) > 0
    assert all(c.linear > 0.0 for c in cmds)

Running Tests

# Run all tests
pytest tests/

# Run with verbose output
pytest tests/ -v

# Run a specific test
pytest tests/test_controller.py::test_controller_zero_error_zero_output

# Run with coverage
pytest tests/ --cov=src --cov-report=term-missing

CI/CD: Running Tests in Docker

HORUS tests need shared memory (/dev/shm) for topic IPC. Docker provides this by default, but you may need to increase the size for large test suites.

Dockerfile

FROM python:3.11-slim

# Install HORUS
RUN curl -fsSL https://raw.githubusercontent.com/softmata/horus/release/install.sh | bash

# Copy project
WORKDIR /app
COPY . .

# Run tests with deterministic mode
CMD ["pytest", "tests/", "-v", "--tb=short"]

Docker Compose for CI

services:
  test:
    build: .
    shm_size: "256m"  # increase shared memory for topic IPC
    environment:
      - HORUS_LOG=warn  # reduce log noise in CI

GitHub Actions Example

name: Test
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install HORUS
        run: curl -fsSL https://raw.githubusercontent.com/softmata/horus/release/install.sh | bash

      - name: Run tests
        run: pytest tests/ -v --tb=short

      - name: Clean shared memory
        if: always()
        run: horus clean --shm

Key CI Rules

Always use deterministic=True in tests. Without it, tests depend on wall-clock timing and fail intermittently on loaded CI servers.

Always clean shared memory after tests. HORUS topics use shared memory segments that persist after the process exits. Run horus clean --shm in your CI teardown step to avoid leaking shared memory between runs.

Set HORUS_LOG=warn in CI. Debug-level logging produces megabytes of output per test run. Reduce to warn to keep CI logs readable.

Do not use rt=True in CI tests. CI runners do not have CAP_SYS_NICE or SCHED_FIFO. The scheduler degrades gracefully, but RT features add overhead without benefit in CI.


Deterministic Reproducibility Guarantees

When deterministic=True is set:

PropertyGuarantee
horus.dt()Always 1/rate, independent of wall clock
horus.now()Monotonically increases by dt each tick
horus.rng_float()Same sequence across runs, across machines
Execution orderDetermined by order=, not OS thread scheduling
Tick countExact: 100 calls to tick_once() means exactly 100 ticks
Topic deliverySame-tick delivery for same-order publishers and subscribers

What is not guaranteed:

PropertyWhy
Absolute wall-clock durationtick_once() returns as fast as the CPU allows
Python random.random()Not seeded by HORUS --- use horus.rng_float()
time.time() inside tick()Returns wall clock, not SimClock
External I/O (files, network)Non-deterministic by nature
GC timing with gc.disable()GC pauses depend on allocation patterns

Do not mix time.time() with horus.now() in deterministic mode. time.time() returns wall-clock time, which varies between runs. Use horus.now() and horus.dt() for all time-dependent logic inside tick().


Quick Reference

I want to...Pattern
Test one tick of a nodesched.tick_once()
Test only specific nodessched.tick_once(["node_a", "node_b"])
Run for N ticksfor _ in range(N): sched.tick_once()
Run for a durationsched.tick_for(1.0)
Get reproducible resultsScheduler(deterministic=True)
Inject fake sensor datahorus.Topic("name").send(data) before tick_once()
Capture node outputsAdd a capture node that appends to a list
Mock hardware writesDependency injection via closure
Record a sessionScheduler(recording=True)
Stop and save recordingsched.stop_recording()
List recordingssched.list_recordings()
Run in CIdeterministic=True, no rt=True, clean shm after

See Also