Record & Replay (Python)

Capture a Python robot's execution and replay it later for debugging or regression testing. Recording is enabled via the Scheduler constructor; replay and mixed replay use the CLI.

When To Use

  • Debugging a bug that only reproduces with specific sensor data
  • Capturing field data for offline analysis or algorithm development
  • Regression testing after controller changes

Prerequisites

horus.toml

[package]
name = "record-replay-py"
version = "0.1.0"
description = "Record and replay demonstration (Python)"
language = "python"

Complete Code

#!/usr/bin/env python3
"""Record a robot session, then replay and analyze via CLI."""

import math
import horus
from horus import Node, Scheduler

# ── Sensor Node ────────────────────────────────────────────────

def sensor_init(node):
    node.state = {"tick": 0}

def sensor_tick(node):
    t = node.state["tick"] * 0.01
    # Sine wave with periodic spikes
    spike = 5.0 if node.state["tick"] % 73 == 0 else 0.0
    value = math.sin(t * 2.0) * 10.0 + spike
    node.send("sensor.reading", {"value": value, "tick": node.state["tick"]})
    node.state["tick"] += 1

sensor = Node(
    name="sensor",
    pubs=["sensor.reading"],
    tick=sensor_tick,
    init=sensor_init,
    rate=100,
)

# ── Controller Node ────────────────────────────────────────────

def ctrl_init(node):
    node.state = {"integral": 0.0, "setpoint": 0.0}

def ctrl_tick(node):
    data = node.recv("sensor.reading")
    if data is None:
        return

    error = node.state["setpoint"] - data["value"]
    dt = horus.dt()

    # IMPORTANT: Clamp integral to prevent windup
    node.state["integral"] = max(-10.0, min(10.0,
        node.state["integral"] + error * dt
    ))

    output = 0.5 * error + 0.1 * node.state["integral"]
    node.send("ctrl.cmd", {"output": output, "integral": node.state["integral"]})

controller = Node(
    name="controller",
    subs=["sensor.reading"],
    pubs=["ctrl.cmd"],
    tick=ctrl_tick,
    init=ctrl_init,
    rate=100,
)

# ── Record ─────────────────────────────────────────────────────

print("=== Recording 5 seconds ===")
sched = Scheduler(tick_rate=100, recording=True)
sched.add(sensor)
sched.add(controller)
sched.run(duration=5.0)

# Save and list
files = sched.stop_recording()
print(f"Saved to: {files}")

for rec in sched.list_recordings():
    print(f"  Available session: {rec}")

# ── Step 2: Full replay ──────────────────────────────────────────

print("\n=== Full replay ===")
scheduler_file = [f for f in files if "scheduler@" in f][0]
replay_sched = Scheduler.replay_from(scheduler_file)
replay_sched.run()

# ── Step 3: Time travel replay ─────────────────────────────────

print("\n=== Time travel (ticks 200-400, half speed) ===")
replay2 = Scheduler.replay_from(scheduler_file)
replay2.start_at_tick(200)
replay2.stop_at_tick(400)
replay2.set_replay_speed(0.5)
replay2.run()

# ── Step 4: Mixed replay (recorded sensor + live controller) ───

print("\n=== Mixed replay ===")
sensor_file = [f for f in files if "sensor@" in f][0]
mixed = Scheduler(tick_rate=100)
mixed.add_replay(sensor_file, priority=0)   # Recorded sensor
mixed.add(controller)                        # Live controller
mixed.run(duration=5.0)

# ── Step 5: What-if override ───────────────────────────────────

import struct
print("\n=== What-if override ===")
ov_sched = Scheduler.replay_from(scheduler_file)
ov_sched.set_replay_override("sensor", "sensor.reading",
                              struct.pack('<fi', 99, 0))
ov_sched.run()

# ── CLI alternatives (also available) ──────────────────────────
#   horus record replay <session>
#   horus record diff session_v1 session_v2
#   horus record export <session> --output data.csv --format csv
#   horus record clean --max-age-days 7

Understanding the Code

  • Scheduler(recording=True) enables per-tick capture of all node inputs and outputs as raw shared memory bytes
  • stop_recording() flushes data to ~/.local/share/horus/recordings/ and returns file paths
  • Scheduler.replay_from(path) loads an entire scheduler recording for deterministic replay
  • add_replay(path) loads a single node's recording for mixed replay (recorded + live nodes)
  • start_at_tick() / stop_at_tick() enable time travel to specific tick ranges
  • set_replay_speed() controls playback speed (0.01x to 100x)
  • set_replay_override() replaces a node's output with custom bytes for what-if testing

CLI Quick Reference

# Record (alternative to code)
horus run --record my_session src/main.py

# Inspect
horus record list --long
horus record info my_session

# Replay
horus record replay my_session
horus record replay my_session --speed 0.5 --start-tick 100

# Mixed replay (regression testing)
horus record inject my_session --nodes sensor

# Compare
horus record diff before_fix after_fix

# Export for pandas
horus record export my_session --output data.csv --format csv

# Cleanup
horus record clean --max-age-days 30

Common Errors

SymptomCauseFix
list_recordings() returns emptyrecording=True not set in SchedulerAdd recording=True to constructor
stop_recording() returns empty pathsScheduler did not run (no ticks)Ensure sched.run(duration=N) runs before stopping
horus record inject has no effectTopic name mismatchPython topic names must match exactly (case-sensitive, dot-separated)
Recording files are largeHigh-frequency topics with large payloadsUse horus record clean to manage disk space

See Also