Error Handling (Python)
Every production node fails eventually. A sensor disconnects, shared memory fills up, a transform goes stale. What matters is how your node responds. This page covers HORUS exception types, failure policies, error callbacks, and defensive patterns that keep your robot running when things go wrong.
Exception Types
HORUS raises three domain-specific exceptions that map to distinct failure modes. Import them from horus:
from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError
HorusNotFoundError
Raised when a topic, transform frame, or node does not exist.
try:
data = node.recv("nonexistent.topic")
except HorusNotFoundError as e:
node.log_error(f"Topic missing: {e}")
# Error message includes a hint: "Run: horus topic list"
Common triggers:
- Subscribing to a topic that no publisher has created yet
- Looking up a transform frame that was never broadcast
- Querying a node that has not been registered with the scheduler
HorusTransformError
Raised when a coordinate transform cannot be computed. Two sub-cases:
- Extrapolation -- the requested timestamp is outside the buffered range
- Stale data -- the transform exists but has not been updated recently
from horus import TransformFrame, HorusTransformError
tf = TransformFrame()
try:
transform = tf.lookup("base_link", "camera_link")
except HorusTransformError as e:
node.log_warning(f"Transform unavailable: {e}")
# Hint may suggest using tf_at() for clamped lookup
HorusTimeoutError
Raised when a blocking operation exceeds its deadline. The error message includes the resource name, elapsed time, and the deadline that was exceeded.
from horus import HorusTimeoutError
try:
data = node.recv("lidar.scan", timeout=0.5)
except HorusTimeoutError:
node.log_warning("LiDAR scan not received within 500ms")
Standard Python Exceptions
HORUS also raises standard Python exceptions for input and system errors:
| Exception | When |
|---|---|
ValueError | Invalid input: bad topic name, invalid config, parse failure |
TypeError | Serialization failure: wrong message type for a typed topic |
IOError | File or shared memory I/O failure |
MemoryError | Shared memory allocation failed (SHM segment full) |
RuntimeError | Internal error or unclassified failure |
KeyError | Missing key in driver parameters |
Failure Policies
When tick() raises an unhandled exception, the scheduler applies the node's failure_policy to decide what happens next. Set it on the node:
import horus
node = horus.Node(
name="sensor",
tick=read_sensor,
rate=100,
failure_policy="restart",
)
Policy Reference
| Policy | Behavior | When to use |
|---|---|---|
"fatal" | Stops the entire scheduler immediately | Safety-critical nodes where any error means stop |
"restart" | Retries the node (up to max retries with backoff) | Nodes that recover from transient failures |
"skip" | Skips the failed tick, continues on next cycle | Sensor nodes where one missed reading is acceptable |
"ignore" | Swallows the exception silently | Logging or telemetry nodes that must never stop the system |
Default behavior: If no failure_policy is set, unhandled exceptions propagate to the scheduler, which logs the error and continues running other nodes. Set an explicit policy for every production node.
How Each Policy Works
Fatal stops everything. The scheduler calls shutdown() on all nodes and exits. Use this for motor controllers or safety monitors where an error means the robot is in an unknown state.
motor = horus.Node(
name="motor_ctrl",
tick=motor_tick,
rate=1000,
failure_policy="fatal",
)
Restart retries the node with exponential backoff. The scheduler calls shutdown() then init() again before resuming ticks. If retries are exhausted, the node is marked unhealthy and removed from the tick loop.
camera = horus.Node(
name="camera",
tick=capture_frame,
rate=30,
failure_policy="restart",
)
Skip drops the current tick and moves on. The scheduler increments an error counter and continues to the next tick cycle. The node's state is preserved -- init() is not called again.
logger = horus.Node(
name="db_logger",
tick=log_to_database,
rate=10,
failure_policy="skip",
)
Ignore swallows the exception without logging. The scheduler does not even increment the error counter. Use sparingly -- silent failures are hard to debug.
The on_error Callback
For custom error handling, pass an on_error function to Node(). It runs before the failure policy kicks in:
import horus
error_count = 0
def handle_error(node, exception):
global error_count
error_count += 1
node.log_error(f"Error #{error_count}: {exception}")
if error_count > 10:
node.log_error("Too many errors -- requesting shutdown")
raise exception # Re-raise to trigger failure_policy
node = horus.Node(
name="sensor",
tick=read_sensor,
rate=100,
on_error=handle_error,
failure_policy="skip",
)
on_error Flow
tick()raises an exceptionon_error(node, exception)is called- If
on_errorreturns normally, the exception is suppressed -- the scheduler continues as if tick succeeded - If
on_errorraises (or re-raises), the failure policy takes over - If
on_erroritself raises a different exception, the original exception is propagated instead
This means on_error acts as a filter. Return normally to swallow the error. Re-raise to escalate.
def selective_handler(node, exception):
if isinstance(exception, HorusTimeoutError):
node.log_warning("Timeout -- will retry next tick")
return # Swallow timeout errors
# All other errors escalate to failure_policy
raise exception
Structured Logging
HORUS provides four logging methods on the node object. These integrate with the scheduler's structured logging pipeline and appear in horus logs output:
def tick(node):
node.log_debug("Processing frame 42")
node.log_info("Detection found: person at (120, 340)")
node.log_warning("LiDAR signal weak -- SNR below threshold")
node.log_error("Motor controller not responding")
Logging only works during init(), tick(), and shutdown(). Calling node.log_info() outside the scheduler lifecycle (before run() or after shutdown) will emit a RuntimeWarning and the message will be dropped. Use standard print() or Python's logging module for setup-time diagnostics.
Log Levels in Practice
| Method | Use for | Shows in horus logs |
|---|---|---|
node.log_debug() | Internal state, per-frame values | Only with --verbose |
node.log_info() | State changes, detections, milestones | Default output |
node.log_warning() | Degraded operation, approaching limits | Default + highlighted |
node.log_error() | Failures that trigger recovery | Default + highlighted |
Common Errors and Fixes
Topic Not Found
The subscriber starts before the publisher. The topic does not exist yet.
def tick(node):
if not node.has_msg("lidar.scan"):
return # No data yet -- skip this tick
data = node.recv("lidar.scan")
process(data)
Fix: Always check node.has_msg() before node.recv(). This is the standard pattern for handling publisher/subscriber startup order.
Shared Memory Full
The ring buffer is full because the consumer is slower than the producer.
def tick(node):
try:
node.send("big.pointcloud", cloud_data)
except MemoryError:
node.log_warning("SHM full -- dropping frame")
Fix: Increase the ring buffer capacity or reduce the publishing rate. The default_capacity parameter on Node() controls buffer size:
node = horus.Node(
name="publisher",
tick=publish_fn,
rate=30,
pubs=["big.pointcloud"],
default_capacity=4096, # Larger buffer (default: 1024)
)
You can also clean up stale shared memory segments with horus clean --shm.
Permission Denied (SHM)
Shared memory segments created by one user cannot be accessed by another.
# This error appears as an IOError
try:
node.send("topic", data)
except IOError as e:
if "Permission denied" in str(e):
node.log_error("SHM permission error -- check user/group")
Fix: Run all nodes as the same user, or clean stale segments with horus clean --shm and restart.
Transform Stale
A transform frame has not been updated recently.
from horus import TransformFrame, HorusTransformError
tf = TransformFrame()
def tick(node):
try:
t = tf.lookup("base_link", "camera_link")
except HorusTransformError:
node.log_warning("Camera transform stale -- using last known")
return
process_with_transform(t)
Fix: Ensure the sensor driver publishing the transform is running and healthy. Check with horus frame list.
Defensive tick() Pattern
A production-ready tick function handles all expected failures explicitly, uses on_error as a safety net, and lets the failure policy handle everything else.
import horus
from horus import HorusNotFoundError, HorusTimeoutError
def motor_tick(node):
# Guard: skip if no command available
if not node.has_msg("cmd_vel"):
return
try:
cmd = node.recv("cmd_vel")
except HorusNotFoundError:
node.log_error("cmd_vel topic disappeared")
return
try:
result = apply_motor_command(cmd)
except TimeoutError:
node.log_error("Motor hardware timeout")
emergency_stop()
return
except ValueError as e:
node.log_warning(f"Invalid command: {e}")
return
node.send("motor.status", {"applied": True, "velocity": result})
def motor_error_handler(node, exception):
"""Last resort before failure_policy."""
node.log_error(f"Unhandled motor error: {exception}")
emergency_stop()
raise exception # Escalate to failure_policy="fatal"
motor = horus.Node(
name="motor_ctrl",
tick=motor_tick,
rate=1000,
order=0,
subs=["cmd_vel"],
pubs=["motor.status"],
on_error=motor_error_handler,
failure_policy="fatal",
)
horus.run(motor, rt=True)
Structure:
- Guard clause --
has_msg()check, return early if no data - Specific try/except -- catch expected exceptions by type
- Handle and continue -- log, take corrective action, return
- on_error as safety net -- catches anything tick() missed
- failure_policy as last resort -- scheduler-level response
Exception Handling Anti-Patterns
Bare except
Never use bare except: -- it catches KeyboardInterrupt and SystemExit, preventing clean shutdown:
# BAD: catches Ctrl+C, prevents shutdown
def tick(node):
try:
data = node.recv("topic")
except:
pass
# GOOD: catch specific exceptions
def tick(node):
try:
data = node.recv("topic")
except (HorusNotFoundError, HorusTimeoutError) as e:
node.log_warning(f"Expected error: {e}")
Catch-and-ignore in Safety Nodes
Swallowing exceptions in a safety-critical node defeats the purpose of the failure policy:
# BAD: hides failures in a critical node
def safety_tick(node):
try:
check_limits()
except Exception:
pass # "It's fine."
# GOOD: let failure_policy handle it
def safety_tick(node):
check_limits() # If this fails, failure_policy="fatal" stops everything
Logging without Action
Catching an exception just to log it, then re-raising, adds noise without value:
# UNNECESSARY: the scheduler already logs unhandled exceptions
def tick(node):
try:
process()
except Exception as e:
node.log_error(f"Error: {e}")
raise # Scheduler logs this again
# BETTER: either handle it or don't catch it
def tick(node):
process() # Let failure_policy handle errors
Design Decisions
Why three custom exceptions instead of one? HorusNotFoundError, HorusTransformError, and HorusTimeoutError represent distinct failure modes with different recovery strategies. A missing topic means a node has not started. A stale transform means a sensor stopped publishing. A timeout means the system is overloaded. Catching a generic HorusError would force every handler to inspect the message string. Separate types let you write precise except clauses.
Why does on_error suppress by default? If on_error returns normally, the exception is swallowed. This lets on_error act as a filter -- handle what you know, re-raise what you do not. The alternative (always propagating after on_error) would make custom error handling useless, since the failure policy would fire regardless. Swallow-by-default gives the callback full control.
Why no exception hierarchy? The three HORUS exceptions inherit directly from Exception, not from a shared HorusError base class. This is intentional. In practice, you almost never want to catch "any HORUS error" -- you want to catch a specific failure mode and respond accordingly. A base class encourages overly broad except clauses. If you truly need to catch all three, list them explicitly: except (HorusNotFoundError, HorusTransformError, HorusTimeoutError).
Why standard exceptions for input errors? Invalid topic names raise ValueError, not a custom exception. Serialization failures raise TypeError. These are programming errors, not runtime failures. Standard exceptions mean you do not need to import HORUS-specific types to catch bugs in your own code, and linters/IDEs already understand them.
Trade-offs
failure_policy vs on_error: The failure policy is coarse-grained (one policy per node) but reliable (enforced by the scheduler, works even if Python crashes). on_error is fine-grained but fragile (runs in Python, can itself fail). For safety-critical nodes, rely on failure_policy="fatal" and keep on_error simple. For non-critical nodes, on_error gives flexibility to implement retry logic, circuit breakers, or rate-limited alerts.
Skip vs Ignore: "skip" logs the error and increments the error counter. "ignore" does neither. Use "skip" unless you have measured that the logging overhead is unacceptable at your tick rate. Silent failures with "ignore" make debugging production issues significantly harder.
Defensive tick vs let-it-crash: Adding try/except for every expected failure makes tick() verbose but predictable. Removing all error handling and relying on failure_policy="restart" is simpler code but slower recovery (restart calls shutdown() then init() again). The right balance depends on your recovery cost: if init() takes 2 seconds to reconnect to hardware, defensive handling inside tick() avoids that penalty.
See Also
- Python Bindings -- Core API reference
- Async Nodes -- Error handling in async tick functions
- Production Deployment -- Failure policies in production