Debugging & Introspection (Python)
When a robot misbehaves, you need answers fast. Is the sensor publishing? Is the controller receiving? Is the tick too slow? HORUS gives you three layers of debugging tools: CLI commands that work from any terminal, a live dashboard that shows everything at once, and a programmatic introspection API you can use inside your nodes. This guide covers all three, with step-by-step workflows for the most common problems.
Your First Debugging Tool
Before writing any debug code, open a second terminal. HORUS's CLI tools let you inspect a running system from the outside --- no code changes, no restarts.
Listing Active Topics
horus topic list
TOPICS (3 active)
cmd_vel CmdVel 1024 capacity 2 subscribers
imu Imu 1024 capacity 1 subscriber
scan LaserScan 1024 capacity 1 subscriber
This shows every topic in the current shared memory namespace: its name, message type, buffer capacity, and subscriber count. If a topic you expect is missing, the node that creates it is either not running or not started with horus run.
Always use horus run to start your Python nodes. Running python src/main.py directly bypasses the SHM namespace setup, so topics will not appear in horus topic list and cannot connect to other processes.
Watching Messages in Real Time
horus topic echo cmd_vel
[0.001s] CmdVel { linear: 0.5, angular: 0.1 }
[0.011s] CmdVel { linear: 0.5, angular: 0.0 }
[0.021s] CmdVel { linear: 0.3, angular: -0.2 }
echo prints every message published to a topic, with a timestamp relative to when you started listening. Use this to verify:
- That a publisher is actually sending data
- That the data looks correct (no NaN values, no stale readings)
- That messages arrive at the expected frequency
Press Ctrl+C to stop.
Verifying Topics Are Publishing
A silent topic is one of the most common problems. horus topic hz measures the actual publish rate:
horus topic hz imu
average rate: 99.8 Hz
min: 9.92ms max: 10.15ms std dev: 0.08ms
This tells you three things:
- The topic is active --- if nothing is publishing, you will see
no messages receivedafter a few seconds. - The rate matches your configuration --- if you configured
rate=100but see 50 Hz, your tick is taking too long (more on this below). - Jitter is acceptable --- a standard deviation over 1 ms on a 100 Hz topic suggests the tick is sometimes overrunning its budget.
Quick rate check across all topics:
# In one terminal, list topics
horus topic list
# In another, check rates one by one
horus topic hz cmd_vel
horus topic hz scan
If a topic shows a rate significantly lower than the node's configured rate, the node's tick() is taking too long. Jump to Debugging Slow tick() below.
Using horus monitor
For a live overview of the entire system, use the TUI dashboard:
horus monitor
The monitor shows all nodes and topics in a single view, updated in real time:
- Node list with per-node tick rate, average tick duration, deadline misses, and health state
- Topic list with publish rate, subscriber count, and buffer usage
- System-wide stats: total ticks, total errors, scheduler status
This is the fastest way to find the bottleneck in a multi-node system. Look for:
| Symptom in monitor | Likely cause |
|---|---|
| Node tick duration climbing over time | Memory leak or unbounded data structure |
| Deadline misses incrementing | Tick is too slow for configured rate |
| Topic rate lower than expected | Publisher node is overloaded or blocked |
| Subscriber count is 0 on a topic | No node is subscribed, or subscriber crashed |
The monitor works with both Python and mixed Python/Rust systems. It reads shared memory directly, so it sees all nodes regardless of language.
Debugging Slow tick()
The most common performance problem in Python nodes is a tick() that takes longer than its budget. When this happens, the scheduler applies the node's on_miss policy and the actual tick rate drops below the configured rate.
Detecting the Problem
From the CLI:
horus topic hz output_topic
If you configured rate=100 (10 ms period) but hz reports 30 Hz, your tick is averaging ~33 ms.
From inside the node, check horus.budget_remaining():
import horus
import time
def tick(node):
start = time.perf_counter()
# ... your processing ...
do_work()
elapsed_ms = (time.perf_counter() - start) * 1000
remaining = horus.budget_remaining()
if remaining < 0.001: # Less than 1ms remaining
node.log_warning(f"Tick took {elapsed_ms:.1f}ms, budget nearly exhausted")
Profiling with cProfile
For a detailed breakdown of where time is spent, use Python's built-in profiler. Since HORUS manages the tick loop, profile the tick function itself:
import horus
import cProfile
import pstats
import io
profiler = cProfile.Profile()
profile_active = False
def tick(node):
global profile_active
# Profile ticks 100-200 (skip warmup)
tick_num = horus.tick()
if tick_num == 100:
profiler.enable()
profile_active = True
elif tick_num == 200 and profile_active:
profiler.disable()
profile_active = False
# Print the top 20 time consumers
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative")
ps.print_stats(20)
node.log_info(f"Profile results:\n{s.getvalue()}")
# Normal tick logic
if node.has_msg("sensor"):
data = node.recv("sensor")
process(data)
node.send("output", result)
node = horus.Node(name="processor", subs=["sensor"], pubs=["output"], tick=tick, rate=50)
horus.run(node)
Checking Budget Remaining
horus.budget_remaining() returns the time left in the current tick's budget, in seconds. Use it to skip expensive optional work when you are running out of time:
import horus
def tick(node):
# Always do critical work
if node.has_msg("sensor"):
data = node.recv("sensor")
cmd = compute_command(data)
node.send("cmd_vel", cmd)
# Only do visualization if budget allows
if horus.budget_remaining() > 0.005: # More than 5ms left
update_visualization(data)
else:
node.log_debug("Skipping visualization, budget low")
Common Causes of Slow Ticks
| Cause | Symptom | Fix |
|---|---|---|
| NumPy/OpenCV doing work on every tick | Consistent high tick time | Downsample input, reduce resolution |
| ML model inference | Consistent 20-50 ms ticks | Lower rate, use compute=True so it runs on thread pool |
| Allocating large arrays every tick | Tick time grows over time | Pre-allocate in init(), reuse buffers |
Calling recv_all() on a high-volume topic | Spiky tick times | Use recv() (one at a time) or increase tick rate |
| File I/O or network calls in tick | Occasional very long ticks | Move to an async node or a separate node |
Debugging Dropped Messages
A message is "dropped" when the publisher writes to a full ring buffer, overwriting the oldest unread message. The subscriber never sees it.
What Causes Drops
Drops happen when the publisher sends faster than the subscriber reads. The ring buffer has a fixed capacity (default: 1024 messages). When it fills up, new messages overwrite the oldest ones.
Common scenarios:
- Slow subscriber: A 10 Hz subscriber on a 1000 Hz topic will miss messages between ticks. This is usually intentional --- the subscriber only needs the latest reading.
- Bursty publisher: A node that publishes 100 messages in a single tick can overflow the buffer before the subscriber ticks.
- Subscriber stall: If a subscriber's tick takes 500 ms, messages published during that time accumulate and may overflow.
Detecting Drops
There is no automatic drop counter (the ring buffer is lock-free by design). Instead, use these techniques:
Sequence numbers: Add a counter to your messages and check for gaps on the subscriber side:
import horus
# Publisher
send_count = 0
def publisher_tick(node):
global send_count
send_count += 1
node.send("data", {"seq": send_count, "value": read_sensor()})
# Subscriber
last_seq = 0
def subscriber_tick(node):
global last_seq
if node.has_msg("data"):
msg = node.recv("data")
if msg["seq"] != last_seq + 1:
dropped = msg["seq"] - last_seq - 1
node.log_warning(f"Dropped {dropped} messages (seq {last_seq} -> {msg['seq']})")
last_seq = msg["seq"]
Rate comparison: Compare horus topic hz on the publisher and subscriber output topics. If the publisher runs at 100 Hz but the subscriber output is at 50 Hz, messages are being lost.
Preventing Drops
Increase buffer capacity for topics that carry bursty or high-frequency data:
node = horus.Node(
name="fast_sub",
subs={"sensor": {"type": "sensor", "capacity": 4096}},
tick=process,
rate=50,
)
Use recv_all() to drain the buffer when you need to process every message:
def tick(node):
messages = node.recv_all("commands")
for msg in messages:
execute(msg)
Match rates: If both publisher and subscriber need to run at the same rate, set them explicitly:
sensor = horus.Node(name="sensor", pubs=["data"], tick=sense, rate=100, order=0)
processor = horus.Node(name="proc", subs=["data"], pubs=["out"], tick=process, rate=100, order=1)
Setting order ensures the sensor publishes before the processor reads in the same tick cycle.
Node Health Monitoring
The node.info API
During tick(), init(), and shutdown(), the node object exposes an info property with scheduler-managed metrics:
import horus
def tick(node):
# Basic metrics
ticks = node.info.tick_count()
errors = node.info.error_count()
avg_ms = node.info.avg_tick_duration_ms()
uptime = node.info.get_uptime()
state = node.info.state
# Full metrics snapshot (dict)
metrics = node.info.get_metrics()
# Periodic health logging
if ticks % 500 == 0:
node.log_info(
f"Health: {ticks} ticks, {errors} errors, "
f"avg {avg_ms:.2f}ms, uptime {uptime:.1f}s, state={state}"
)
| Method/Property | Returns | Description |
|---|---|---|
node.info.tick_count() | int | Total number of ticks executed |
node.info.error_count() | int | Total tick errors (exceptions) |
node.info.successful_ticks() | int | Ticks that completed without error |
node.info.avg_tick_duration_ms() | float | Average tick execution time in milliseconds |
node.info.get_uptime() | float | Seconds since the node's init() was called |
node.info.get_metrics() | dict | Full metrics snapshot with all available data |
node.info.state | str | Current node state (see NodeState below) |
node.info.set_custom_data(key, value) | --- | Attach custom key-value metadata |
node.info.get_custom_data(key) | value | Retrieve custom metadata |
NodeState values: UNINITIALIZED, INITIALIZING, RUNNING, STOPPING, STOPPED, ERROR, CRASHED
Custom data lets you attach arbitrary metadata visible to horus monitor and get_node_stats():
def tick(node):
# Track application-specific metrics
detections = run_detector(frame)
node.info.set_custom_data("detections_this_tick", len(detections))
node.info.set_custom_data("model_version", "yolov8n")
Scheduler-Level Monitoring with get_node_stats()
When using a Scheduler directly (instead of horus.run()), you can query stats for any node:
import horus
scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)
scheduler.add(sensor)
scheduler.add(controller)
scheduler.run(duration=10.0)
# After the run completes
for name in scheduler.get_node_names():
stats = scheduler.get_node_stats(name)
print(f"{name}:")
print(f" Total ticks: {stats['total_ticks']}")
print(f" Failed ticks: {stats['failed_ticks']}")
print(f" Avg tick: {stats.get('avg_tick_duration_ms', 0):.2f} ms")
print(f" Max tick: {stats.get('max_tick_duration_ms', 0):.2f} ms")
print(f" Errors: {stats['errors_count']}")
print(f" Uptime: {stats.get('uptime_seconds', 0):.1f}s")
Safety Stats
For systems with budgets, deadlines, or watchdogs, safety_stats() reports system-level safety events:
stats = scheduler.safety_stats()
if stats:
print(f"Budget overruns: {stats.get('budget_overruns', 0)}")
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
print(f"Watchdog expirations: {stats.get('watchdog_expirations', 0)}")
If any of these numbers are non-zero, the system is under stress. Budget overruns mean ticks are taking longer than expected. Deadline misses mean the on_miss policy is firing. Watchdog expirations mean a node was unresponsive for an extended period.
RT Degradations
If you requested RT features, check whether they were actually applied:
scheduler = horus.Scheduler(tick_rate=1000, rt=True)
# ... add nodes and run ...
if not scheduler.has_full_rt():
for d in scheduler.degradations():
print(f"Degraded: {d.get('feature')} -- {d.get('reason')}")
caps = scheduler.capabilities()
print(f"RT priority: {caps.get('max_priority', 'N/A')}")
print(f"Memory lock: {caps.get('memory_locking', False)}")
Logging from Nodes
HORUS provides structured logging methods on the node object. These integrate with the scheduler's log system and appear in horus monitor.
def tick(node):
node.log_debug("Starting tick processing")
if node.has_msg("sensor"):
data = node.recv("sensor")
node.log_info(f"Received sensor reading: {data}")
if data.get("value", 0) > THRESHOLD:
node.log_warning(f"Sensor value {data['value']} exceeds threshold {THRESHOLD}")
else:
node.log_debug("No sensor data this tick")
def init(node):
node.log_info(f"Node initialized, publishing to: {node.publishers()}")
node.log_info(f"Subscribed to: {node.subscribers()}")
def shutdown(node):
node.log_info("Shutting down, cleaning up resources")
| Method | Use for |
|---|---|
node.log_debug(msg) | Verbose tracing during development |
node.log_info(msg) | Normal operational messages |
node.log_warning(msg) | Anomalies that do not stop operation (stale data, high latency) |
node.log_error(msg) | Failures that affect node behavior |
Logging only works during callbacks. Calling node.log_info() outside of init(), tick(), or shutdown() raises a RuntimeWarning and the message is silently dropped. The scheduler context must be active for log routing to work.
Logging performance tip: Avoid expensive string formatting in debug logs when they are not needed. Python evaluates f-string arguments before the function call:
# Bad: formats the string even if debug logging is suppressed
node.log_debug(f"Full state dump: {expensive_serialize(state)}")
# Better: guard expensive formatting
if DEBUG_ENABLED:
node.log_debug(f"Full state dump: {expensive_serialize(state)}")
Common Debugging Workflows
"My subscriber is not getting messages"
Step 1: Verify the publisher is running.
horus topic list
If the topic is missing, the publisher node is not running or was not started with horus run.
Step 2: Verify messages are being published.
horus topic echo sensor_data
If no output appears, the publisher's tick() is not calling node.send(). Check that:
- The publisher's
pubslist includes the topic name - The
send()call is actually reached (no early return, no exception swallowed) - The publisher node is not in an error state (
horus monitor)
Step 3: Verify the subscriber is subscribed to the correct topic name.
Topic names must match exactly, including case. A common mistake:
# Publisher
pub = horus.Node(pubs=["sensor.data"], tick=pub_tick, rate=100)
# Subscriber -- WRONG: underscore instead of dot
sub = horus.Node(subs=["sensor_data"], tick=sub_tick, rate=100)
Step 4: Check execution order.
If both nodes run in the same scheduler, the publisher must have a lower order than the subscriber:
sensor = horus.Node(name="sensor", pubs=["data"], tick=sense, rate=100, order=0)
processor = horus.Node(name="proc", subs=["data"], tick=process, rate=100, order=1)
If order is wrong, the subscriber ticks before the publisher in every cycle and always sees an empty buffer.
Step 5: Check that recv() is called correctly.
def tick(node):
# WRONG: recv() returns None if no message, not an exception
data = node.recv("data")
process(data) # TypeError: process() got None
# CORRECT: always check for None
data = node.recv("data")
if data is not None:
process(data)
# ALSO CORRECT: use has_msg() first
if node.has_msg("data"):
data = node.recv("data")
process(data)
"My node is too slow"
Step 1: Measure the actual rate.
horus topic hz output_topic
Compare to the configured rate. If the measured rate is lower, the tick is overrunning.
Step 2: Check tick duration in the monitor.
horus monitor
Look at the average and max tick duration for the slow node.
Step 3: Profile the tick function.
Add timing inside the tick to isolate the expensive section:
import time
def tick(node):
t0 = time.perf_counter()
data = node.recv("input")
t1 = time.perf_counter()
result = heavy_computation(data)
t2 = time.perf_counter()
node.send("output", result)
t3 = time.perf_counter()
node.log_debug(
f"recv={1000*(t1-t0):.1f}ms "
f"compute={1000*(t2-t1):.1f}ms "
f"send={1000*(t3-t2):.1f}ms"
)
Step 4: Apply the right fix.
| Bottleneck | Fix |
|---|---|
| Heavy computation (NumPy, OpenCV) | Use compute=True to run on thread pool, or lower the rate |
| ML inference | Lower rate to match inference time, or run on a dedicated node |
| Data serialization | Use typed messages instead of dicts for zero-copy performance |
| File or network I/O | Move to an async node (async def tick) |
| Repeated allocation | Pre-allocate arrays in init(), reuse across ticks |
Step 5: Set a budget to catch future regressions.
node = horus.Node(
name="controller",
tick=control_tick,
rate=100,
budget=0.008, # 8ms budget (80% of 10ms period)
on_miss="warn", # Log a warning when exceeded
)
"My robot stops unexpectedly"
Step 1: Check for exceptions in node ticks.
If a node's tick() raises an unhandled exception, the default failure_policy is "fatal", which stops the entire scheduler. Check the terminal output for tracebacks.
Step 2: Check safety stats after the run.
scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)
scheduler.add(sensor)
scheduler.add(controller)
try:
scheduler.run()
except KeyboardInterrupt:
pass
# Check what happened
stats = scheduler.safety_stats()
if stats:
print(f"Budget overruns: {stats.get('budget_overruns', 0)}")
print(f"Deadline misses: {stats.get('deadline_misses', 0)}")
print(f"Watchdog expirations: {stats.get('watchdog_expirations', 0)}")
for name in scheduler.get_node_names():
ns = scheduler.get_node_stats(name)
if ns['failed_ticks'] > 0:
print(f"Node '{name}' had {ns['failed_ticks']} failed ticks")
Step 3: Check for on_miss="stop" triggering.
If any node has on_miss="stop", a single deadline miss will halt the system. Change to "warn" during development:
# Development: warn on deadline miss
motor = horus.Node(tick=motor_fn, rate=500, budget=0.0016, on_miss="warn")
# Production: stop on deadline miss (safety-critical)
motor = horus.Node(tick=motor_fn, rate=500, budget=0.0016, on_miss="stop")
Step 4: Check for request_stop() calls.
Search your code for node.request_stop(). A node may be intentionally stopping the scheduler based on a condition:
def tick(node):
if battery_voltage < CRITICAL_THRESHOLD:
node.log_error(f"Battery critical: {battery_voltage}V")
node.request_stop() # This stops the whole system
Step 5: Use a non-fatal failure policy during development.
node = horus.Node(
name="experimental",
tick=experimental_tick,
rate=30,
failure_policy="skip", # Skip failed ticks instead of crashing
)
| Policy | Behavior | When to use |
|---|---|---|
"fatal" | Stop the entire scheduler (default) | Production safety-critical nodes |
"restart" | Re-run init() and resume | Nodes that can recover from errors |
"skip" | Skip the failed tick, continue | Development, non-critical nodes |
"ignore" | Silently ignore the error | Logging, telemetry |
Python-Specific Debugging
GIL and Threading
Python's Global Interpreter Lock (GIL) means only one thread executes Python bytecode at a time. This has specific implications for HORUS:
compute=Trueonly helps if your code releases the GIL. NumPy, OpenCV, and PyTorch release the GIL during C/CUDA operations, socompute=Truelets them run in parallel. Pure Python computation does not benefit.- Multiple Python nodes in the same scheduler share the GIL. If one node's tick runs pure Python for 10 ms, other Python nodes cannot tick during that time, even on separate threads.
Detecting GIL contention:
import horus
import time
def tick(node):
wall_start = time.perf_counter()
cpu_start = time.process_time()
do_work()
wall_elapsed = time.perf_counter() - wall_start
cpu_elapsed = time.process_time() - cpu_start
# If wall >> cpu, the thread was waiting (GIL or I/O)
if wall_elapsed > cpu_elapsed * 1.5:
node.log_warning(
f"GIL contention: wall={wall_elapsed*1000:.1f}ms "
f"cpu={cpu_elapsed*1000:.1f}ms"
)
Mitigations:
- Move GIL-heavy nodes to separate processes using HORUS multi-process mode
- Use libraries that release the GIL (NumPy, OpenCV, SciPy, PyTorch)
- Lower the rate of pure-Python nodes so they do not contend with critical nodes
Memory Leaks
Python's garbage collector handles most memory, but leaks still happen --- typically from:
- Growing lists or dicts that are never cleared
- Circular references with
__del__methods - C extension objects that are not properly released
Detecting leaks by tracking tick duration:
import horus
import tracemalloc
def init(node):
tracemalloc.start()
node.info.set_custom_data("baseline_mem", 0)
def tick(node):
if horus.tick() % 1000 == 0:
current, peak = tracemalloc.get_traced_memory()
node.log_info(f"Memory: current={current/1024:.0f}KB, peak={peak/1024:.0f}KB")
# If memory keeps growing, take a snapshot
if current > 100 * 1024 * 1024: # Over 100MB
snapshot = tracemalloc.take_snapshot()
top = snapshot.statistics("lineno")[:5]
for stat in top:
node.log_warning(f" {stat}")
Common leak patterns in HORUS nodes:
# LEAK: appending to a list every tick without bound
history = []
def tick(node):
data = node.recv("sensor")
if data:
history.append(data) # Grows forever
# FIX: use a fixed-size buffer
from collections import deque
history = deque(maxlen=1000)
def tick(node):
data = node.recv("sensor")
if data:
history.append(data) # Oldest entries automatically removed
Exception Tracing
When a tick() raises an exception, HORUS catches it and applies the failure_policy. To get full tracebacks for debugging, add an on_error handler:
import horus
import traceback
def on_error(node, error):
tb = traceback.format_exception(type(error), error, error.__traceback__)
node.log_error(f"Exception in tick:\n{''.join(tb)}")
node = horus.Node(
name="processor",
tick=process_tick,
on_error=on_error,
rate=50,
failure_policy="skip", # Continue after errors
)
For intermittent errors that are hard to reproduce, log the node state alongside the traceback:
def on_error(node, error):
context = {
"tick": horus.tick(),
"elapsed": horus.elapsed(),
"has_sensor": node.has_msg("sensor"),
"tick_count": node.info.tick_count() if node.info else "N/A",
"error_count": node.info.error_count() if node.info else "N/A",
}
node.log_error(f"Error at {context}: {error}")
Debugging with pdb
For interactive debugging, you can use pdb inside a tick, but be aware that the scheduler pauses while you are in the debugger --- other nodes will not tick, and watchdogs may fire.
import horus
def tick(node):
data = node.recv("sensor")
if data and data.get("value", 0) < 0:
# Drop into debugger on unexpected negative value
import pdb; pdb.set_trace()
process(data)
Use pdb only during development with a single node. In multi-node systems, pausing one node's tick in the debugger stalls the scheduler. Set watchdog_ms to a high value or disable it when using interactive debugging.
Quick Reference
CLI Commands
| Command | What it shows |
|---|---|
horus topic list | All active topics with type, capacity, and subscriber count |
horus topic echo <name> | Live stream of messages on a topic |
horus topic hz <name> | Measured publish rate with min/max/stddev timing |
horus monitor | Full-system TUI dashboard with nodes, topics, and metrics |
Python Introspection API
| Call | Returns | Description |
|---|---|---|
node.info.tick_count() | int | Total ticks executed |
node.info.error_count() | int | Total tick errors |
node.info.avg_tick_duration_ms() | float | Average tick time in ms |
node.info.get_uptime() | float | Seconds since init |
node.info.get_metrics() | dict | Full metrics snapshot |
node.info.state | str | Current node state |
node.info.set_custom_data(k, v) | --- | Attach custom metadata |
node.info.get_custom_data(k) | value | Retrieve custom metadata |
horus.budget_remaining() | float | Seconds left in tick budget |
sched.get_node_stats(name) | dict | Per-node stats from scheduler |
sched.get_all_nodes() | list | All nodes with config |
sched.safety_stats() | dict | Budget overruns, deadline misses, watchdog expirations |
sched.degradations() | list | RT features that could not be applied |
sched.capabilities() | dict | Detected RT capabilities |
sched.status() | str | Scheduler state: idle, running, stopped |
See Also
- Python API Reference --- Full Node, Scheduler, and message API
- Python Bindings --- Detailed API with all Scheduler methods
- Common Mistakes (Python) --- Pitfalls and how to avoid them
- Performance Guide --- Optimization techniques for HORUS nodes
- Second Application (Python) --- Intro to CLI introspection with
horus topic