Testing and Deterministic Mode (Python)
A PID controller runs at 50 Hz. In production, it reads an IMU, computes a correction, and publishes a velocity command --- 20 ms per cycle, wall-clock time, with real sensor noise and OS jitter. In a test, you need the opposite: no wall clock, no jitter, no hardware, and identical results every run. You need to step the scheduler one tick at a time, inject fake sensor data, and assert that the controller produces the exact expected output.
HORUS provides three tools for this:
tick_once()--- step the scheduler by exactly one tick cycle, then return control to your test- Deterministic mode --- replace the wall clock with a SimClock, fix
dt, seed the RNG, and guarantee identical results across runs - Record and replay --- capture a full session and replay it for regression testing
Single-Tick Testing with tick_once()
tick_once() is the foundation of all HORUS testing. It executes exactly one tick cycle --- calling tick() on every due node in order --- then returns. No threads, no timing, no sleeping. You control the clock.
import horus
results = []
def counter_tick(node):
count = (node.recv("count") or 0) + 1
node.send("count", count)
results.append(count)
sched = horus.Scheduler(tick_rate=100)
sched.add(horus.Node(
name="counter",
tick=counter_tick,
rate=100,
order=0,
pubs=["count"],
subs=["count"],
))
sched.tick_once() # init (lazy) + tick 1
sched.tick_once() # tick 2
sched.tick_once() # tick 3
assert results == [1, 2, 3]
On the first call to tick_once(), the scheduler lazily initializes: it finalizes node configurations, calls init() on every node in order, then runs one tick. Subsequent calls run one tick each.
Selective Ticking
Pass a list of node names to tick only specific nodes:
sched.tick_once(["sensor"]) # only tick "sensor"
sched.tick_once(["sensor", "controller"]) # tick sensor and controller, skip motor
This is the primary tool for unit-testing a single node in isolation. Other nodes remain frozen --- they do not tick, but their published data stays on the topics.
tick_for() --- Timed Runs
tick_for() runs the tick loop for a specific wall-clock duration, then returns:
sched.tick_for(1.0) # run for 1 second
sched.tick_for(0.5, ["sensor"]) # run only sensor for 0.5 seconds
Use this for integration tests that need to observe behavior over many ticks without calling tick_once() in a loop.
Deterministic Mode
Enable deterministic mode on the Scheduler for reproducible results:
sched = horus.Scheduler(tick_rate=100, deterministic=True)
In deterministic mode, three things change:
| Feature | Normal mode | Deterministic mode |
|---|---|---|
horus.dt() | Actual wall-clock elapsed time | Fixed 1/rate every tick (e.g., 0.01 at 100 Hz) |
horus.now() | Wall-clock time | SimClock --- advances by dt each tick |
horus.rng_float() | System entropy | Tick-seeded sequence --- same across runs |
This guarantees identical results on any machine, any run. The physics does not depend on how fast your CPU is or what else the OS is doing.
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="sim", tick=sim_tick, rate=100, order=0))
for _ in range(1000):
sched.tick_once()
assert horus.dt() == 0.01 # always 1/100
Why Deterministic Mode Matters
Without deterministic mode, a PID controller's output depends on the actual dt between ticks. On a fast machine, dt might be 9.98 ms. On a loaded CI server, it might be 10.5 ms. The integral term accumulates differently, and the test produces different outputs each run. With deterministic=True, dt is always exactly 1/rate, and the test is reproducible.
SimClock Behavior
The SimClock starts at 0.0 and advances by 1/rate each tick:
sched = horus.Scheduler(tick_rate=50, deterministic=True)
sched.add(horus.Node(name="n", tick=lambda node: None, rate=50, order=0))
sched.tick_once() # horus.now() == 0.02, horus.dt() == 0.02
sched.tick_once() # horus.now() == 0.04, horus.dt() == 0.02
sched.tick_once() # horus.now() == 0.06, horus.dt() == 0.02
All time-dependent logic --- horus.now(), horus.dt(), horus.elapsed() --- uses the SimClock. Wall-clock time is irrelevant. This means a 1000-tick test that simulates 10 seconds of robot time finishes in milliseconds.
Seeded RNG
horus.rng_float() returns a repeatable sequence in deterministic mode:
sched = horus.Scheduler(tick_rate=100, deterministic=True)
values = []
def tick(node):
values.append(horus.rng_float())
sched.add(horus.Node(name="rng", tick=tick, rate=100, order=0))
for _ in range(5):
sched.tick_once()
# values is identical across runs, across machines
Use horus.rng_float() instead of Python's random.random() for any randomness inside tick(). The system random module is not seeded by HORUS and will produce different results across runs.
Testing Patterns
Unit Testing a Single Node's tick()
Test the tick function directly, without a scheduler, for pure logic validation:
def test_pid_proportional_gain():
"""Verify the P-term of the PID controller."""
integral = [0.0]
target = 1.0
def pid_tick(node):
imu = node.recv("imu.data")
if imu is None:
return
error = target - imu.accel_x
integral[0] += error * horus.dt()
command = 2.0 * error + 0.1 * integral[0]
node.send("cmd_vel", horus.CmdVel(linear=command, angular=0.0))
results = []
def capture_tick(node):
cmd = node.recv("cmd_vel")
if cmd:
results.append(cmd.linear)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="pid", tick=pid_tick, rate=100, order=0,
subs=[horus.Imu], pubs=[horus.CmdVel]))
sched.add(horus.Node(name="capture", tick=capture_tick, rate=100, order=10,
subs=[horus.CmdVel]))
# Inject fake IMU data by publishing before ticking
imu_topic = horus.Topic("imu.data")
imu_topic.send(horus.Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.0))
sched.tick_once()
assert len(results) == 1
assert abs(results[0] - 2.0) < 0.01 # P-gain * error = 2.0 * 1.0
Integration Testing with Real Topics
Test data flow across multiple nodes:
def test_sensor_to_controller_pipeline():
"""Verify that sensor data flows through the controller to a motor command."""
motor_cmds = []
def fake_imu(node):
node.send("imu.data", horus.Imu(
accel_x=0.3, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
))
def controller(node):
imu = node.recv("imu.data")
if imu:
cmd = horus.CmdVel(linear=0.5 - imu.accel_x, angular=0.0)
node.send("cmd_vel", cmd)
def mock_motor(node):
cmd = node.recv("cmd_vel")
if cmd:
motor_cmds.append(cmd.linear)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="imu", tick=fake_imu, rate=100, order=0,
pubs=[horus.Imu]))
sched.add(horus.Node(name="ctrl", tick=controller, rate=100, order=10,
subs=[horus.Imu], pubs=[horus.CmdVel]))
sched.add(horus.Node(name="motor", tick=mock_motor, rate=100, order=20,
subs=[horus.CmdVel]))
sched.tick_for(0.1) # 10 ticks at 100 Hz
assert len(motor_cmds) > 0
assert abs(motor_cmds[0] - 0.2) < 0.01 # 0.5 - 0.3 = 0.2
Testing Safety Behavior
Verify that deadline miss policies and watchdog fire correctly:
def test_safety_monitor_stops_on_dangerous_velocity():
"""Safety monitor should request_stop() when velocity exceeds threshold."""
stopped = [False]
def unsafe_controller(node):
node.send("cmd_vel", horus.CmdVel(linear=5.0, angular=0.0))
def safety_tick(node):
cmd = node.recv("cmd_vel")
if cmd and abs(cmd.linear) > 2.0:
node.log_warning(f"Unsafe: {cmd.linear}")
node.request_stop()
stopped[0] = True
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="ctrl", tick=unsafe_controller, rate=100,
order=0, pubs=[horus.CmdVel]))
sched.add(horus.Node(name="safety", tick=safety_tick, rate=100,
order=1, subs=[horus.CmdVel]))
sched.tick_once()
assert stopped[0] is True
def test_watchdog_detects_frozen_node():
"""Verify the watchdog catches a node that hangs."""
import time
def frozen_tick(node):
time.sleep(2.0) # simulate frozen node
sched = horus.Scheduler(tick_rate=10, watchdog_ms=100)
sched.add(horus.Node(name="frozen", tick=frozen_tick, rate=10, order=0))
sched.tick_once()
stats = sched.safety_stats()
assert stats is not None
assert stats["watchdog_expirations"] > 0
Testing Multi-Rate Interactions
Verify that nodes running at different rates interact correctly:
def test_multi_rate_data_flow():
"""50 Hz controller reads from 100 Hz sensor --- gets latest value."""
readings = []
def fast_sensor(node):
t = horus.now()
node.send("sensor.data", {"value": t * 100})
def slow_controller(node):
data = node.recv("sensor.data")
if data:
readings.append(data["value"])
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="sensor", tick=fast_sensor, rate=100,
order=0, pubs=["sensor.data"]))
sched.add(horus.Node(name="ctrl", tick=slow_controller, rate=50,
order=10, subs=["sensor.data"]))
# Run 10 ticks. The sensor ticks 10 times, the controller ticks 5 times.
for _ in range(10):
sched.tick_once()
# Controller should have received about 5 readings
assert len(readings) >= 4
assert len(readings) <= 6
Mock Patterns for Hardware Dependencies
Nodes that depend on hardware --- serial ports, I2C buses, GPIO pins --- cannot run in CI. Replace the hardware call with a mock in tests.
Pattern 1: Dependency Injection via Closure
def make_motor_node(write_fn=None):
"""Create a motor node with injectable hardware write function."""
if write_fn is None:
write_fn = real_motor_write # production: actual hardware
def tick(node):
cmd = node.recv("cmd_vel")
if cmd:
write_fn(cmd.linear, cmd.angular)
node.send("motor.state", {"velocity": cmd.linear})
return horus.Node(
name="motor",
tick=tick,
rate=50,
order=10,
subs=[horus.CmdVel],
pubs=["motor.state"],
)
def test_motor_writes_velocity():
"""Test motor node without real hardware."""
writes = []
def mock_write(linear, angular):
writes.append((linear, angular))
motor = make_motor_node(write_fn=mock_write)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(motor)
# Inject a command
topic = horus.Topic("cmd_vel")
topic.send(horus.CmdVel(linear=0.5, angular=0.1))
sched.tick_once()
assert len(writes) == 1
assert abs(writes[0][0] - 0.5) < 0.01
Pattern 2: Fake Sensor Node
Replace a real hardware driver with a fake that publishes known data:
def make_fake_lidar(ranges=None):
"""Fake LiDAR that publishes static scan data."""
if ranges is None:
ranges = [1.0, 2.0, 3.0, 1.5]
def tick(node):
node.send("scan", horus.LaserScan(
ranges=ranges,
angle_min=-1.57,
angle_max=1.57,
angle_increment=3.14 / len(ranges),
))
return horus.Node(
name="lidar",
tick=tick,
rate=40,
order=0,
pubs=[horus.LaserScan],
)
def test_planner_avoids_close_obstacle():
"""Planner should slow down when an obstacle is close."""
commands = []
def capture(node):
cmd = node.recv("cmd_vel")
if cmd:
commands.append(cmd.linear)
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(make_fake_lidar(ranges=[0.3, 0.5, 1.0, 2.0]))
sched.add(horus.Node(name="planner", tick=planner_tick, rate=10,
order=10, compute=True,
subs=[horus.LaserScan], pubs=[horus.CmdVel]))
sched.add(horus.Node(name="capture", tick=capture, rate=100,
order=20, subs=[horus.CmdVel]))
sched.tick_for(0.5)
assert len(commands) > 0
assert all(c < 1.0 for c in commands) # slowed down near obstacle
Pattern 3: Test Fixture with Setup and Teardown
import pytest
@pytest.fixture
def robot_sched():
"""Create a deterministic scheduler with standard test nodes."""
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(make_fake_lidar())
sched.add(make_controller())
sched.add(make_safety())
yield sched
# Scheduler cleans up automatically when garbage collected
def test_controller_publishes_at_rate(robot_sched):
"""Controller should publish commands every tick."""
commands = []
def capture(node):
cmd = node.recv("cmd_vel")
if cmd:
commands.append(cmd)
robot_sched.add(horus.Node(name="capture", tick=capture, rate=100,
order=100, subs=[horus.CmdVel]))
for _ in range(50):
robot_sched.tick_once()
# Controller at 50 Hz in a 100 Hz scheduler: ~25 commands in 50 ticks
assert len(commands) >= 20
Record and Replay
Record a session to capture all topic data for later replay and regression testing.
Recording a Session
sched = horus.Scheduler(tick_rate=100, recording=True)
sched.add(horus.Node(name="sensor", tick=sensor_tick, rate=100, order=0,
pubs=["scan"]))
sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=50, order=10,
subs=["scan"], pubs=["cmd_vel"]))
# Run for 60 seconds, recording all topic data
sched.run(duration=60.0)
# Check recording status
print(sched.is_recording()) # True while running
Stopping and Listing Recordings
# Stop recording and get file paths
files = sched.stop_recording()
for f in files:
print(f"Recorded: {f}")
# List all available recordings
recordings = sched.list_recordings()
for r in recordings:
print(f"Session: {r}")
Using Recordings for Regression Tests
Record a known-good session, then replay the inputs in a test and assert that outputs match:
def test_controller_regression():
"""Replay recorded sensor data and verify controller outputs match baseline."""
sched = horus.Scheduler(tick_rate=100, deterministic=True)
sched.add(horus.Node(name="ctrl", tick=ctrl_tick, rate=100, order=0,
subs=["scan"], pubs=["cmd_vel"]))
# Load recorded sensor data
recordings = sched.list_recordings()
assert len(recordings) > 0
# Replay: inject recorded data tick by tick
# (actual replay API depends on your recording format)
for _ in range(1000):
sched.tick_once()
# Compare outputs to baseline
# ...
Deleting Old Recordings
sched.delete_recording("session_2026_03_20_143000")
pytest Integration
Project Layout
my-robot/
horus.toml
src/
main.py
nodes/
imu.py
controller.py
planner.py
safety.py
tests/
conftest.py
test_controller.py
test_safety.py
test_integration.py
conftest.py --- Shared Fixtures
import pytest
import horus
@pytest.fixture
def det_sched():
"""Deterministic scheduler for reproducible tests."""
return horus.Scheduler(tick_rate=100, deterministic=True)
@pytest.fixture
def fake_imu_node():
"""Fake IMU that publishes constant readings."""
def tick(node):
node.send("imu.data", horus.Imu(
accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.0,
))
return horus.Node(name="fake_imu", tick=tick, rate=100, order=0,
pubs=[horus.Imu])
@pytest.fixture
def capture_node():
"""Capture node that collects CmdVel messages."""
captured = []
def tick(node):
cmd = node.recv("cmd_vel")
if cmd:
captured.append(cmd)
node = horus.Node(name="capture", tick=tick, rate=100, order=99,
subs=[horus.CmdVel])
node._test_captured = captured # attach for test access
return node
Test File Example
# tests/test_controller.py
import horus
from nodes.controller import make_controller
def test_controller_zero_error_zero_output(det_sched, fake_imu_node, capture_node):
"""When IMU reads match the target, controller output should be near zero."""
det_sched.add(fake_imu_node)
det_sched.add(make_controller(target_speed=0.0))
det_sched.add(capture_node)
for _ in range(10):
det_sched.tick_once()
cmds = capture_node._test_captured
assert len(cmds) > 0
assert all(abs(c.linear) < 0.01 for c in cmds)
def test_controller_nonzero_target(det_sched, fake_imu_node, capture_node):
"""When target speed is 1.0 and IMU reads 0.0, controller should output positive."""
det_sched.add(fake_imu_node)
det_sched.add(make_controller(target_speed=1.0))
det_sched.add(capture_node)
for _ in range(10):
det_sched.tick_once()
cmds = capture_node._test_captured
assert len(cmds) > 0
assert all(c.linear > 0.0 for c in cmds)
Running Tests
# Run all tests
pytest tests/
# Run with verbose output
pytest tests/ -v
# Run a specific test
pytest tests/test_controller.py::test_controller_zero_error_zero_output
# Run with coverage
pytest tests/ --cov=src --cov-report=term-missing
CI/CD: Running Tests in Docker
HORUS tests need shared memory (/dev/shm) for topic IPC. Docker provides this by default, but you may need to increase the size for large test suites.
Dockerfile
FROM python:3.11-slim
# Install HORUS
RUN curl -fsSL https://raw.githubusercontent.com/softmata/horus/release/install.sh | bash
# Copy project
WORKDIR /app
COPY . .
# Run tests with deterministic mode
CMD ["pytest", "tests/", "-v", "--tb=short"]
Docker Compose for CI
services:
test:
build: .
shm_size: "256m" # increase shared memory for topic IPC
environment:
- HORUS_LOG=warn # reduce log noise in CI
GitHub Actions Example
name: Test
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install HORUS
run: curl -fsSL https://raw.githubusercontent.com/softmata/horus/release/install.sh | bash
- name: Run tests
run: pytest tests/ -v --tb=short
- name: Clean shared memory
if: always()
run: horus clean --shm
Key CI Rules
Always use deterministic=True in tests. Without it, tests depend on wall-clock timing and fail intermittently on loaded CI servers.
Always clean shared memory after tests. HORUS topics use shared memory segments that persist after the process exits. Run horus clean --shm in your CI teardown step to avoid leaking shared memory between runs.
Set HORUS_LOG=warn in CI. Debug-level logging produces megabytes of output per test run. Reduce to warn to keep CI logs readable.
Do not use rt=True in CI tests. CI runners do not have CAP_SYS_NICE or SCHED_FIFO. The scheduler degrades gracefully, but RT features add overhead without benefit in CI.
Deterministic Reproducibility Guarantees
When deterministic=True is set:
| Property | Guarantee |
|---|---|
horus.dt() | Always 1/rate, independent of wall clock |
horus.now() | Monotonically increases by dt each tick |
horus.rng_float() | Same sequence across runs, across machines |
| Execution order | Determined by order=, not OS thread scheduling |
| Tick count | Exact: 100 calls to tick_once() means exactly 100 ticks |
| Topic delivery | Same-tick delivery for same-order publishers and subscribers |
What is not guaranteed:
| Property | Why |
|---|---|
| Absolute wall-clock duration | tick_once() returns as fast as the CPU allows |
Python random.random() | Not seeded by HORUS --- use horus.rng_float() |
time.time() inside tick() | Returns wall clock, not SimClock |
| External I/O (files, network) | Non-deterministic by nature |
GC timing with gc.disable() | GC pauses depend on allocation patterns |
Do not mix time.time() with horus.now() in deterministic mode. time.time() returns wall-clock time, which varies between runs. Use horus.now() and horus.dt() for all time-dependent logic inside tick().
Quick Reference
| I want to... | Pattern |
|---|---|
| Test one tick of a node | sched.tick_once() |
| Test only specific nodes | sched.tick_once(["node_a", "node_b"]) |
| Run for N ticks | for _ in range(N): sched.tick_once() |
| Run for a duration | sched.tick_for(1.0) |
| Get reproducible results | Scheduler(deterministic=True) |
| Inject fake sensor data | horus.Topic("name").send(data) before tick_once() |
| Capture node outputs | Add a capture node that appends to a list |
| Mock hardware writes | Dependency injection via closure |
| Record a session | Scheduler(recording=True) |
| Stop and save recording | sched.stop_recording() |
| List recordings | sched.list_recordings() |
| Run in CI | deterministic=True, no rt=True, clean shm after |
See Also
- Scheduler Deep-Dive (Python) ---
tick_once(),tick_for(), deterministic mode internals - Safety and Policies (Python) --- miss policies, failure policies, watchdog
- Node Lifecycle (Python) --- init, tick, shutdown, error handling
- Real-Time Systems (Python) --- budget, deadline, GIL impact
- Tutorial: Real-Time Control (Python) --- build the system these tests verify
- Python API Reference --- complete
Node(),Scheduler, andTopicreference