Record & Replay (Python)
Capture a Python robot's execution and replay it later for debugging or regression testing. Recording is enabled via the Scheduler constructor; replay and mixed replay use the CLI.
When To Use
- Debugging a bug that only reproduces with specific sensor data
- Capturing field data for offline analysis or algorithm development
- Regression testing after controller changes
Prerequisites
- HORUS installed (Installation Guide)
- Basic understanding of Python nodes (Quick Start (Python))
horus.toml
[package]
name = "record-replay-py"
version = "0.1.0"
description = "Record and replay demonstration (Python)"
language = "python"
Complete Code
#!/usr/bin/env python3
"""Record a robot session, then replay and analyze via CLI."""
import math
import horus
from horus import Node, Scheduler
# ── Sensor Node ────────────────────────────────────────────────
def sensor_init(node):
node.state = {"tick": 0}
def sensor_tick(node):
t = node.state["tick"] * 0.01
# Sine wave with periodic spikes
spike = 5.0 if node.state["tick"] % 73 == 0 else 0.0
value = math.sin(t * 2.0) * 10.0 + spike
node.send("sensor.reading", {"value": value, "tick": node.state["tick"]})
node.state["tick"] += 1
sensor = Node(
name="sensor",
pubs=["sensor.reading"],
tick=sensor_tick,
init=sensor_init,
rate=100,
)
# ── Controller Node ────────────────────────────────────────────
def ctrl_init(node):
node.state = {"integral": 0.0, "setpoint": 0.0}
def ctrl_tick(node):
data = node.recv("sensor.reading")
if data is None:
return
error = node.state["setpoint"] - data["value"]
dt = horus.dt()
# IMPORTANT: Clamp integral to prevent windup
node.state["integral"] = max(-10.0, min(10.0,
node.state["integral"] + error * dt
))
output = 0.5 * error + 0.1 * node.state["integral"]
node.send("ctrl.cmd", {"output": output, "integral": node.state["integral"]})
controller = Node(
name="controller",
subs=["sensor.reading"],
pubs=["ctrl.cmd"],
tick=ctrl_tick,
init=ctrl_init,
rate=100,
)
# ── Record ─────────────────────────────────────────────────────
print("=== Recording 5 seconds ===")
sched = Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.add(controller)
sched.run(duration=5.0)
# Save and list
files = sched.stop_recording()
print(f"Saved to: {files}")
for rec in sched.list_recordings():
print(f" Available session: {rec}")
# ── Step 2: Full replay ──────────────────────────────────────────
print("\n=== Full replay ===")
scheduler_file = [f for f in files if "scheduler@" in f][0]
replay_sched = Scheduler.replay_from(scheduler_file)
replay_sched.run()
# ── Step 3: Time travel replay ─────────────────────────────────
print("\n=== Time travel (ticks 200-400, half speed) ===")
replay2 = Scheduler.replay_from(scheduler_file)
replay2.start_at_tick(200)
replay2.stop_at_tick(400)
replay2.set_replay_speed(0.5)
replay2.run()
# ── Step 4: Mixed replay (recorded sensor + live controller) ───
print("\n=== Mixed replay ===")
sensor_file = [f for f in files if "sensor@" in f][0]
mixed = Scheduler(tick_rate=100)
mixed.add_replay(sensor_file, priority=0) # Recorded sensor
mixed.add(controller) # Live controller
mixed.run(duration=5.0)
# ── Step 5: What-if override ───────────────────────────────────
import struct
print("\n=== What-if override ===")
ov_sched = Scheduler.replay_from(scheduler_file)
ov_sched.set_replay_override("sensor", "sensor.reading",
struct.pack('<fi', 99, 0))
ov_sched.run()
# ── CLI alternatives (also available) ──────────────────────────
# horus record replay <session>
# horus record diff session_v1 session_v2
# horus record export <session> --output data.csv --format csv
# horus record clean --max-age-days 7
Understanding the Code
Scheduler(recording=True)enables per-tick capture of all node inputs and outputs as raw shared memory bytesstop_recording()flushes data to~/.local/share/horus/recordings/and returns file pathsScheduler.replay_from(path)loads an entire scheduler recording for deterministic replayadd_replay(path)loads a single node's recording for mixed replay (recorded + live nodes)start_at_tick()/stop_at_tick()enable time travel to specific tick rangesset_replay_speed()controls playback speed (0.01x to 100x)set_replay_override()replaces a node's output with custom bytes for what-if testing
CLI Quick Reference
# Record (alternative to code)
horus run --record my_session src/main.py
# Inspect
horus record list --long
horus record info my_session
# Replay
horus record replay my_session
horus record replay my_session --speed 0.5 --start-tick 100
# Mixed replay (regression testing)
horus record inject my_session --nodes sensor
# Compare
horus record diff before_fix after_fix
# Export for pandas
horus record export my_session --output data.csv --format csv
# Cleanup
horus record clean --max-age-days 30
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
list_recordings() returns empty | recording=True not set in Scheduler | Add recording=True to constructor |
stop_recording() returns empty paths | Scheduler did not run (no ticks) | Ensure sched.run(duration=N) runs before stopping |
horus record inject has no effect | Topic name mismatch | Python topic names must match exactly (case-sensitive, dot-separated) |
| Recording files are large | High-frequency topics with large payloads | Use horus record clean to manage disk space |
See Also
- Record & Replay Reference — Full documentation and design decisions
- Record & Replay Recipe (Rust) — Rust version with programmatic replay and mixed replay
- Debug with Record & Replay Tutorial — Step-by-step debugging walkthrough
- BlackBox Flight Recorder — Lightweight always-on crash forensics