Testing and Deterministic Mode (Python)

A PID controller runs at 50 Hz. In production, it reads an IMU, computes a correction, and publishes a velocity command --- 20 ms per cycle, wall-clock time, with real sensor noise and OS jitter. In a test, you need the opposite: no wall clock, no jitter, no hardware, and identical results every run. You need to step the scheduler one tick at a time, inject fake sensor data, and assert that the controller produces the exact expected output.

HORUS provides three tools for this:

  • tick_once() --- step the scheduler by exactly one tick cycle, then return control to your test
  • Deterministic mode --- replace the wall clock with a SimClock, fix dt, seed the RNG, and guarantee identical results across runs
  • Record and replay --- capture a full session and replay it for regression testing

Single-Tick Testing with tick_once()

tick_once() is the foundation of all HORUS testing. It executes exactly one tick cycle --- calling tick() on every due node in order --- then returns. No threads, no timing, no sleeping. You control the clock.

# simplified
import horus

results = []

def counter_tick(node):
    count = (node.recv("count") or 0) + 1
    node.send("count", count)
    results.append(count)

sched = horus.Scheduler(tick_rate=100)
sched.add(horus.Node(
    name="counter",
    tick=counter_tick,
    rate=100,
    order=0,
    pubs=["count"],
    subs=["count"],
))

sched.tick_once()  # init (lazy) + tick 1
sched.tick_once()  # tick 2
sched.tick_once()  # tick 3

assert results == [1, 2, 3]

On the first call to tick_once(), the scheduler lazily initializes: it finalizes node configurations, calls init() on every node in order, then runs one tick. Subsequent calls run one tick each.

Selective Ticking

Pass a list of node names to tick only specific nodes:

# simplified
sched.tick_once(["sensor"])             # only tick "sensor"
sched.tick_once(["sensor", "controller"])  # tick sensor and controller, skip motor

This is the primary tool for unit-testing a single node in isolation. Other nodes remain frozen --- they do not tick, but their published data stays on the topics.

tick_for() --- Timed Runs

tick_for() runs the tick loop for a specific wall-clock duration, then returns:

# simplified
sched.tick_for(1.0)                  # run for 1 second
sched.tick_for(0.5, ["sensor"])      # run only sensor for 0.5 seconds

Use this for integration tests that need to observe behavior over many ticks without calling tick_once() in a loop.


Deterministic Mode

Enable deterministic mode on the Scheduler for reproducible results:

# simplified
sched = horus.Scheduler(tick_rate=100, deterministic=True)

In deterministic mode, three things change:

FeatureNormal modeDeterministic mode
horus.dt()Actual wall-clock elapsed timeFixed 1/rate every tick (e.g., 0.01 at 100 Hz)
horus.now()Wall-clock timeSimClock --- advances by dt each tick
horus.rng_float()System entropyTick-seeded sequence --- same across runs

This guarantees identical results on any machine, any run. The physics does not depend on how fast your CPU is or what else the OS is doing.

# simplified
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="sim", tick=sim_tick, rate=100, order=0))

for _ in range(1000):
    sched.tick_once()
    assert horus.dt() == 0.01  # always 1/100

Why Deterministic Mode Matters

Without deterministic mode, a PID controller's output depends on the actual dt between ticks. On a fast machine, dt might be 9.98 ms. On a loaded CI server, it might be 10.5 ms. The integral term accumulates differently, and the test produces different outputs each run. With deterministic=True, dt is always exactly 1/rate, and the test is reproducible.

SimClock Behavior

The SimClock starts at 0.0 and advances by 1/rate each tick:

# simplified
sched = horus.Scheduler(tick_rate=50, deterministic=True)
sched.add(horus.Node(name="n", tick=lambda node: None, rate=50, order=0))

sched.tick_once()  # horus.now() == 0.02, horus.dt() == 0.02
sched.tick_once()  # horus.now() == 0.04, horus.dt() == 0.02
sched.tick_once()  # horus.now() == 0.06, horus.dt() == 0.02

All time-dependent logic --- horus.now(), horus.dt(), horus.elapsed() --- uses the SimClock. Wall-clock time is irrelevant. This means a 1000-tick test that simulates 10 seconds of robot time finishes in milliseconds.

Seeded RNG

horus.rng_float() returns a repeatable sequence in deterministic mode:

# simplified
sched = horus.Scheduler(tick_rate=100, deterministic=True)
values = []

def tick(node):
    values.append(horus.rng_float())

sched.add(horus.Node(name="rng", tick=tick, rate=100, order=0))

for _ in range(5):
    sched.tick_once()

# values is identical across runs, across machines

Use horus.rng_float() instead of Python's random.random() for any randomness inside tick(). The system random module is not seeded by HORUS and will produce different results across runs.


Testing Patterns

Unit Testing a Single Node's tick()

Test the tick function directly, without a scheduler, for pure logic validation:

# simplified
def test_pid_proportional_gain():
    """Verify the P-term of the PID controller."""
    integral = [0.0]
    target = 1.0

    def pid_tick(node):
        imu = node.recv("imu.data")
        if imu is None:
            return
        error = target - imu.accel_x
        integral[0] += error * horus.dt()
        command = 2.0 * error + 0.1 * integral[0]
        node.send("cmd_vel", horus.CmdVel(linear=command, angular=0.0))

    results = []

    def capture_tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            results.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="pid", tick=pid_tick, rate=100, order=0,
                         subs=[horus.Imu], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="capture", tick=capture_tick, rate=100, order=10,
                         subs=[horus.CmdVel]))

    # Inject fake IMU data by publishing before ticking
    imu_topic = horus.Topic("imu.data")
    imu_topic.send(horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
                              gyro_x=0.0, gyro_y=0.0, gyro_z=0.0))

    sched.tick_once()

    assert len(results) == 1
    assert abs(results[0] - 2.0) < 0.01  # P-gain * error = 2.0 * 1.0

Integration Testing with Real Topics

Test data flow across multiple nodes:

# simplified
def test_sensor_to_controller_pipeline():
    """Verify that sensor data flows through the controller to a motor command."""
    motor_cmds = []

    def fake_imu(node):
        node.send("imu.data", horus.Imu(
            accel_x=0.3, accel_y=0.0, accel_z=9.81,
            gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
        ))

    def controller(node):
        imu = node.recv("imu.data")
        if imu:
            cmd = horus.CmdVel(linear=0.5 - imu.accel_x, angular=0.0)
            node.send("cmd_vel", cmd)

    def mock_motor(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            motor_cmds.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="imu", tick=fake_imu, rate=100, order=0,
                         pubs=[horus.Imu]))
    sched.add(horus.Node(name="ctrl", tick=controller, rate=100, order=10,
                         subs=[horus.Imu], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="motor", tick=mock_motor, rate=100, order=20,
                         subs=[horus.CmdVel]))

    sched.tick_for(0.1)  # 10 ticks at 100 Hz

    assert len(motor_cmds) > 0
    assert abs(motor_cmds[0] - 0.2) < 0.01  # 0.5 - 0.3 = 0.2

Testing Safety Behavior

Verify that deadline miss policies and watchdog fire correctly:

# simplified
def test_safety_monitor_stops_on_dangerous_velocity():
    """Safety monitor should request_stop() when velocity exceeds threshold."""
    stopped = [False]

    def unsafe_controller(node):
        node.send("cmd_vel", horus.CmdVel(linear=5.0, angular=0.0))

    def safety_tick(node):
        cmd = node.recv("cmd_vel")
        if cmd and abs(cmd.linear) > 2.0:
            node.log_warning(f"Unsafe: {cmd.linear}")
            node.request_stop()
            stopped[0] = True

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="ctrl", tick=unsafe_controller, rate=100,
                         order=0, pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="safety", tick=safety_tick, rate=100,
                         order=1, subs=[horus.CmdVel]))

    sched.tick_once()

    assert stopped[0] is True


def test_watchdog_detects_frozen_node():
    """Verify the watchdog catches a node that hangs."""
    import time

    def frozen_tick(node):
        time.sleep(2.0)  # simulate frozen node

    sched = horus.Scheduler(tick_rate=10, watchdog_ms=100)
    sched.add(horus.Node(name="frozen", tick=frozen_tick, rate=10, order=0))

    sched.tick_once()

    stats = sched.safety_stats()
    assert stats is not None
    assert stats["watchdog_expirations"] > 0

Testing Multi-Rate Interactions

Verify that nodes running at different rates interact correctly:

# simplified
def test_multi_rate_data_flow():
    """50 Hz controller reads from 100 Hz sensor --- gets latest value."""
    readings = []

    def fast_sensor(node):
        t = horus.now()
        node.send("sensor.data", {"value": t * 100})

    def slow_controller(node):
        data = node.recv("sensor.data")
        if data:
            readings.append(data["value"])

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="sensor", tick=fast_sensor, rate=100,
                         order=0, pubs=["sensor.data"]))
    sched.add(horus.Node(name="ctrl", tick=slow_controller, rate=50,
                         order=10, subs=["sensor.data"]))

    # Run 10 ticks. The sensor ticks 10 times, the controller ticks 5 times.
    for _ in range(10):
        sched.tick_once()

    # Controller should have received about 5 readings
    assert len(readings) >= 4
    assert len(readings) <= 6

Mock Patterns for Hardware Dependencies

Nodes that depend on hardware --- serial ports, I2C buses, GPIO pins --- cannot run in CI. Replace the hardware call with a mock in tests.

Pattern 1: Dependency Injection via Closure

# simplified
def make_motor_node(write_fn=None):
    """Create a motor node with injectable hardware write function."""
    if write_fn is None:
        write_fn = real_motor_write  # production: actual hardware

    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            write_fn(cmd.linear, cmd.angular)
            node.send("motor.state", {"velocity": cmd.linear})

    return horus.Node(
        name="motor",
        tick=tick,
        rate=50,
        order=10,
        subs=[horus.CmdVel],
        pubs=["motor.state"],
    )


def test_motor_writes_velocity():
    """Test motor node without real hardware."""
    writes = []

    def mock_write(linear, angular):
        writes.append((linear, angular))

    motor = make_motor_node(write_fn=mock_write)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(motor)

    # Inject a command
    topic = horus.Topic("cmd_vel")
    topic.send(horus.CmdVel(linear=0.5, angular=0.1))

    sched.tick_once()

    assert len(writes) == 1
    assert abs(writes[0][0] - 0.5) < 0.01

Pattern 2: Fake Sensor Node

Replace a real hardware driver with a fake that publishes known data:

# simplified
def make_fake_lidar(ranges=None):
    """Fake LiDAR that publishes static scan data."""
    if ranges is None:
        ranges = [1.0, 2.0, 3.0, 1.5]

    def tick(node):
        node.send("scan", horus.LaserScan(
            ranges=ranges,
            angle_min=-1.57,
            angle_max=1.57,
            angle_increment=3.14 / len(ranges),
        ))

    return horus.Node(
        name="lidar",
        tick=tick,
        rate=40,
        order=0,
        pubs=[horus.LaserScan],
    )


def test_planner_avoids_close_obstacle():
    """Planner should slow down when an obstacle is close."""
    commands = []

    def capture(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            commands.append(cmd.linear)

    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(make_fake_lidar(ranges=[0.3, 0.5, 1.0, 2.0]))
    sched.add(horus.Node(name="planner", tick=planner_tick, rate=10,
                         order=10, compute=True,
                         subs=[horus.LaserScan], pubs=[horus.CmdVel]))
    sched.add(horus.Node(name="capture", tick=capture, rate=100,
                         order=20, subs=[horus.CmdVel]))

    sched.tick_for(0.5)

    assert len(commands) > 0
    assert all(c < 1.0 for c in commands)  # slowed down near obstacle

Pattern 3: Test Fixture with Setup and Teardown

# simplified
import pytest

@pytest.fixture
def robot_sched():
    """Create a deterministic scheduler with standard test nodes."""
    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(make_fake_lidar())
    sched.add(make_controller())
    sched.add(make_safety())
    yield sched
    # Scheduler cleans up automatically when garbage collected


def test_controller_publishes_at_rate(robot_sched):
    """Controller should publish commands every tick."""
    commands = []

    def capture(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            commands.append(cmd)

    robot_sched.add(horus.Node(name="capture", tick=capture, rate=100,
                               order=100, subs=[horus.CmdVel]))

    for _ in range(50):
        robot_sched.tick_once()

    # Controller at 50 Hz in a 100 Hz scheduler: ~25 commands in 50 ticks
    assert len(commands) >= 20

Record and Replay

Record a session to capture all topic data for later replay and regression testing.

Recording a Session

# simplified
sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(horus.Node(name="sensor", tick=sensor_tick, rate=100, order=0,
                     pubs=["scan"]))
sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=50, order=10,
                     subs=["scan"], pubs=["cmd_vel"]))

# Run for 60 seconds, recording all topic data
sched.run(duration=60.0)

# Check recording status
print(sched.is_recording())  # True while running

Stopping and Listing Recordings

# simplified
# Stop recording and get file paths
files = sched.stop_recording()
for f in files:
    print(f"Recorded: {f}")

# List all available recordings
recordings = sched.list_recordings()
for r in recordings:
    print(f"Session: {r}")

Using Recordings for Regression Tests

Record a known-good session, then replay the inputs in a test and assert that outputs match:

# simplified
def test_controller_regression():
    """Replay recorded sensor data and verify controller outputs match baseline."""
    sched = horus.Scheduler(tick_rate=100, deterministic=True)
    sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=100, order=0,
                         subs=["scan"], pubs=["cmd_vel"]))

    # Load recorded sensor data
    recordings = sched.list_recordings()
    assert len(recordings) > 0

    # Replay: inject recorded data tick by tick
    # (actual replay API depends on your recording format)
    for _ in range(1000):
        sched.tick_once()

    # Compare outputs to baseline
    # ...

Deleting Old Recordings

# simplified
sched.delete_recording("session_2026_03_20_143000")

pytest Integration

Project Layout

my-robot/
  horus.toml
  src/
    main.py
    nodes/
      imu.py
      controller.py
      planner.py
      safety.py
  tests/
    conftest.py
    test_controller.py
    test_safety.py
    test_integration.py

conftest.py --- Shared Fixtures

# simplified
import pytest
import horus

@pytest.fixture
def det_sched():
    """Deterministic scheduler for reproducible tests."""
    return horus.Scheduler(tick_rate=100, deterministic=True)


@pytest.fixture
def fake_imu_node():
    """Fake IMU that publishes constant readings."""
    def tick(node):
        node.send("imu.data", horus.Imu(
            accel_x=0.0, accel_y=0.0, accel_z=9.81,
            gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
        ))
    return horus.Node(name="fake_imu", tick=tick, rate=100, order=0,
                      pubs=[horus.Imu])


@pytest.fixture
def capture_node():
    """Capture node that collects CmdVel messages."""
    captured = []

    def tick(node):
        cmd = node.recv("cmd_vel")
        if cmd:
            captured.append(cmd)

    node = horus.Node(name="capture", tick=tick, rate=100, order=99,
                      subs=[horus.CmdVel])
    node._test_captured = captured  # attach for test access
    return node

Test File Example

# simplified
# tests/test_controller.py
import horus
from nodes.controller import make_controller

def test_controller_zero_error_zero_output(det_sched, fake_imu_node, capture_node):
    """When IMU reads match the target, controller output should be near zero."""
    det_sched.add(fake_imu_node)
    det_sched.add(make_controller(target_speed=0.0))
    det_sched.add(capture_node)

    for _ in range(10):
        det_sched.tick_once()

    cmds = capture_node._test_captured
    assert len(cmds) > 0
    assert all(abs(c.linear) < 0.01 for c in cmds)


def test_controller_nonzero_target(det_sched, fake_imu_node, capture_node):
    """When target speed is 1.0 and IMU reads 0.0, controller should output positive."""
    det_sched.add(fake_imu_node)
    det_sched.add(make_controller(target_speed=1.0))
    det_sched.add(capture_node)

    for _ in range(10):
        det_sched.tick_once()

    cmds = capture_node._test_captured
    assert len(cmds) > 0
    assert all(c.linear > 0.0 for c in cmds)

Running Tests

# Run all tests
pytest tests/

# Run with verbose output
pytest tests/ -v

# Run a specific test
pytest tests/test_controller.py::test_controller_zero_error_zero_output

# Run with coverage
pytest tests/ --cov=src --cov-report=term-missing

CI/CD: Running Tests in Docker

HORUS tests need shared memory (/dev/shm) for topic IPC. Docker provides this by default, but you may need to increase the size for large test suites.

Dockerfile

FROM python:3.11-slim

# Install HORUS
RUN curl -fsSL https://gitlab.com/softmata/horus/-/raw/release/install.sh | bash

# Copy project
WORKDIR /app
COPY . .

# Run tests with deterministic mode
CMD ["pytest", "tests/", "-v", "--tb=short"]

Docker Compose for CI

services:
  test:
    build: .
    shm_size: "256m"  # increase shared memory for topic IPC
    environment:
      - HORUS_LOG=warn  # reduce log noise in CI

GitHub Actions Example

name: Test
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install HORUS
        run: curl -fsSL https://gitlab.com/softmata/horus/-/raw/release/install.sh | bash

      - name: Run tests
        run: pytest tests/ -v --tb=short

      - name: Clean shared memory
        if: always()
        run: horus clean --shm

Key CI Rules

Always use deterministic=True in tests. Without it, tests depend on wall-clock timing and fail intermittently on loaded CI servers.

Always clean shared memory after tests. HORUS topics use shared memory segments that persist after the process exits. Run horus clean --shm in your CI teardown step to avoid leaking shared memory between runs.

Set HORUS_LOG=warn in CI. Debug-level logging produces megabytes of output per test run. Reduce to warn to keep CI logs readable.

Do not use rt=True in CI tests. CI runners do not have CAP_SYS_NICE or SCHED_FIFO. The scheduler degrades gracefully, but RT features add overhead without benefit in CI.


Deterministic Reproducibility Guarantees

When deterministic=True is set:

PropertyGuarantee
horus.dt()Always 1/rate, independent of wall clock
horus.now()Monotonically increases by dt each tick
horus.rng_float()Same sequence across runs, across machines
Execution orderDetermined by order=, not OS thread scheduling
Tick countExact: 100 calls to tick_once() means exactly 100 ticks
Topic deliverySame-tick delivery for same-order publishers and subscribers

What is not guaranteed:

PropertyWhy
Absolute wall-clock durationtick_once() returns as fast as the CPU allows
Python random.random()Not seeded by HORUS --- use horus.rng_float()
time.time() inside tick()Returns wall clock, not SimClock
External I/O (files, network)Non-deterministic by nature
GC timing with gc.disable()GC pauses depend on allocation patterns

Do not mix time.time() with horus.now() in deterministic mode. time.time() returns wall-clock time, which varies between runs. Use horus.now() and horus.dt() for all time-dependent logic inside tick().


Quick Reference

I want to...Pattern
Test one tick of a nodesched.tick_once()
Test only specific nodessched.tick_once(["node_a", "node_b"])
Run for N ticksfor _ in range(N): sched.tick_once()
Run for a durationsched.tick_for(1.0)
Get reproducible resultsScheduler(deterministic=True)
Inject fake sensor datahorus.Topic("name").send(data) before tick_once()
Capture node outputsAdd a capture node that appends to a list
Mock hardware writesDependency injection via closure
Record a sessionScheduler(recording=True)
Stop and save recordingsched.stop_recording()
List recordingssched.list_recordings()
Run in CIdeterministic=True, no rt=True, clean shm after

See Also