Error Handling (Python)

Every production node fails eventually. A sensor disconnects, shared memory fills up, a transform goes stale. What matters is how your node responds. This page covers HORUS exception types, failure policies, error callbacks, and defensive patterns that keep your robot running when things go wrong.


Exception Types

HORUS raises three domain-specific exceptions that map to distinct failure modes. Import them from horus:

from horus import HorusNotFoundError, HorusTransformError, HorusTimeoutError

HorusNotFoundError

Raised when a topic, transform frame, or node does not exist.

try:
    data = node.recv("nonexistent.topic")
except HorusNotFoundError as e:
    node.log_error(f"Topic missing: {e}")
    # Error message includes a hint: "Run: horus topic list"

Common triggers:

  • Subscribing to a topic that no publisher has created yet
  • Looking up a transform frame that was never broadcast
  • Querying a node that has not been registered with the scheduler

HorusTransformError

Raised when a coordinate transform cannot be computed. Two sub-cases:

  • Extrapolation -- the requested timestamp is outside the buffered range
  • Stale data -- the transform exists but has not been updated recently
from horus import TransformFrame, HorusTransformError

tf = TransformFrame()

try:
    transform = tf.lookup("base_link", "camera_link")
except HorusTransformError as e:
    node.log_warning(f"Transform unavailable: {e}")
    # Hint may suggest using tf_at() for clamped lookup

HorusTimeoutError

Raised when a blocking operation exceeds its deadline. The error message includes the resource name, elapsed time, and the deadline that was exceeded.

from horus import HorusTimeoutError

try:
    data = node.recv("lidar.scan", timeout=0.5)
except HorusTimeoutError:
    node.log_warning("LiDAR scan not received within 500ms")

Standard Python Exceptions

HORUS also raises standard Python exceptions for input and system errors:

ExceptionWhen
ValueErrorInvalid input: bad topic name, invalid config, parse failure
TypeErrorSerialization failure: wrong message type for a typed topic
IOErrorFile or shared memory I/O failure
MemoryErrorShared memory allocation failed (SHM segment full)
RuntimeErrorInternal error or unclassified failure
KeyErrorMissing key in driver parameters

Failure Policies

When tick() raises an unhandled exception, the scheduler applies the node's failure_policy to decide what happens next. Set it on the node:

import horus

node = horus.Node(
    name="sensor",
    tick=read_sensor,
    rate=100,
    failure_policy="restart",
)

Policy Reference

PolicyBehaviorWhen to use
"fatal"Stops the entire scheduler immediatelySafety-critical nodes where any error means stop
"restart"Retries the node (up to max retries with backoff)Nodes that recover from transient failures
"skip"Skips the failed tick, continues on next cycleSensor nodes where one missed reading is acceptable
"ignore"Swallows the exception silentlyLogging or telemetry nodes that must never stop the system

Default behavior: If no failure_policy is set, unhandled exceptions propagate to the scheduler, which logs the error and continues running other nodes. Set an explicit policy for every production node.

How Each Policy Works

Fatal stops everything. The scheduler calls shutdown() on all nodes and exits. Use this for motor controllers or safety monitors where an error means the robot is in an unknown state.

motor = horus.Node(
    name="motor_ctrl",
    tick=motor_tick,
    rate=1000,
    failure_policy="fatal",
)

Restart retries the node with exponential backoff. The scheduler calls shutdown() then init() again before resuming ticks. If retries are exhausted, the node is marked unhealthy and removed from the tick loop.

camera = horus.Node(
    name="camera",
    tick=capture_frame,
    rate=30,
    failure_policy="restart",
)

Skip drops the current tick and moves on. The scheduler increments an error counter and continues to the next tick cycle. The node's state is preserved -- init() is not called again.

logger = horus.Node(
    name="db_logger",
    tick=log_to_database,
    rate=10,
    failure_policy="skip",
)

Ignore swallows the exception without logging. The scheduler does not even increment the error counter. Use sparingly -- silent failures are hard to debug.


The on_error Callback

For custom error handling, pass an on_error function to Node(). It runs before the failure policy kicks in:

import horus

error_count = 0

def handle_error(node, exception):
    global error_count
    error_count += 1
    node.log_error(f"Error #{error_count}: {exception}")

    if error_count > 10:
        node.log_error("Too many errors -- requesting shutdown")
        raise exception  # Re-raise to trigger failure_policy

node = horus.Node(
    name="sensor",
    tick=read_sensor,
    rate=100,
    on_error=handle_error,
    failure_policy="skip",
)

on_error Flow

  1. tick() raises an exception
  2. on_error(node, exception) is called
  3. If on_error returns normally, the exception is suppressed -- the scheduler continues as if tick succeeded
  4. If on_error raises (or re-raises), the failure policy takes over
  5. If on_error itself raises a different exception, the original exception is propagated instead

This means on_error acts as a filter. Return normally to swallow the error. Re-raise to escalate.

def selective_handler(node, exception):
    if isinstance(exception, HorusTimeoutError):
        node.log_warning("Timeout -- will retry next tick")
        return  # Swallow timeout errors

    # All other errors escalate to failure_policy
    raise exception

Structured Logging

HORUS provides four logging methods on the node object. These integrate with the scheduler's structured logging pipeline and appear in horus logs output:

def tick(node):
    node.log_debug("Processing frame 42")
    node.log_info("Detection found: person at (120, 340)")
    node.log_warning("LiDAR signal weak -- SNR below threshold")
    node.log_error("Motor controller not responding")

Logging only works during init(), tick(), and shutdown(). Calling node.log_info() outside the scheduler lifecycle (before run() or after shutdown) will emit a RuntimeWarning and the message will be dropped. Use standard print() or Python's logging module for setup-time diagnostics.

Log Levels in Practice

MethodUse forShows in horus logs
node.log_debug()Internal state, per-frame valuesOnly with --verbose
node.log_info()State changes, detections, milestonesDefault output
node.log_warning()Degraded operation, approaching limitsDefault + highlighted
node.log_error()Failures that trigger recoveryDefault + highlighted

Common Errors and Fixes

Topic Not Found

The subscriber starts before the publisher. The topic does not exist yet.

def tick(node):
    if not node.has_msg("lidar.scan"):
        return  # No data yet -- skip this tick

    data = node.recv("lidar.scan")
    process(data)

Fix: Always check node.has_msg() before node.recv(). This is the standard pattern for handling publisher/subscriber startup order.

Shared Memory Full

The ring buffer is full because the consumer is slower than the producer.

def tick(node):
    try:
        node.send("big.pointcloud", cloud_data)
    except MemoryError:
        node.log_warning("SHM full -- dropping frame")

Fix: Increase the ring buffer capacity or reduce the publishing rate. The default_capacity parameter on Node() controls buffer size:

node = horus.Node(
    name="publisher",
    tick=publish_fn,
    rate=30,
    pubs=["big.pointcloud"],
    default_capacity=4096,  # Larger buffer (default: 1024)
)

You can also clean up stale shared memory segments with horus clean --shm.

Permission Denied (SHM)

Shared memory segments created by one user cannot be accessed by another.

# This error appears as an IOError
try:
    node.send("topic", data)
except IOError as e:
    if "Permission denied" in str(e):
        node.log_error("SHM permission error -- check user/group")

Fix: Run all nodes as the same user, or clean stale segments with horus clean --shm and restart.

Transform Stale

A transform frame has not been updated recently.

from horus import TransformFrame, HorusTransformError

tf = TransformFrame()

def tick(node):
    try:
        t = tf.lookup("base_link", "camera_link")
    except HorusTransformError:
        node.log_warning("Camera transform stale -- using last known")
        return
    process_with_transform(t)

Fix: Ensure the sensor driver publishing the transform is running and healthy. Check with horus frame list.


Defensive tick() Pattern

A production-ready tick function handles all expected failures explicitly, uses on_error as a safety net, and lets the failure policy handle everything else.

import horus
from horus import HorusNotFoundError, HorusTimeoutError

def motor_tick(node):
    # Guard: skip if no command available
    if not node.has_msg("cmd_vel"):
        return

    try:
        cmd = node.recv("cmd_vel")
    except HorusNotFoundError:
        node.log_error("cmd_vel topic disappeared")
        return

    try:
        result = apply_motor_command(cmd)
    except TimeoutError:
        node.log_error("Motor hardware timeout")
        emergency_stop()
        return
    except ValueError as e:
        node.log_warning(f"Invalid command: {e}")
        return

    node.send("motor.status", {"applied": True, "velocity": result})


def motor_error_handler(node, exception):
    """Last resort before failure_policy."""
    node.log_error(f"Unhandled motor error: {exception}")
    emergency_stop()
    raise exception  # Escalate to failure_policy="fatal"


motor = horus.Node(
    name="motor_ctrl",
    tick=motor_tick,
    rate=1000,
    order=0,
    subs=["cmd_vel"],
    pubs=["motor.status"],
    on_error=motor_error_handler,
    failure_policy="fatal",
)

horus.run(motor, rt=True)

Structure:

  1. Guard clause -- has_msg() check, return early if no data
  2. Specific try/except -- catch expected exceptions by type
  3. Handle and continue -- log, take corrective action, return
  4. on_error as safety net -- catches anything tick() missed
  5. failure_policy as last resort -- scheduler-level response

Exception Handling Anti-Patterns

Bare except

Never use bare except: -- it catches KeyboardInterrupt and SystemExit, preventing clean shutdown:

# BAD: catches Ctrl+C, prevents shutdown
def tick(node):
    try:
        data = node.recv("topic")
    except:
        pass

# GOOD: catch specific exceptions
def tick(node):
    try:
        data = node.recv("topic")
    except (HorusNotFoundError, HorusTimeoutError) as e:
        node.log_warning(f"Expected error: {e}")

Catch-and-ignore in Safety Nodes

Swallowing exceptions in a safety-critical node defeats the purpose of the failure policy:

# BAD: hides failures in a critical node
def safety_tick(node):
    try:
        check_limits()
    except Exception:
        pass  # "It's fine."

# GOOD: let failure_policy handle it
def safety_tick(node):
    check_limits()  # If this fails, failure_policy="fatal" stops everything

Logging without Action

Catching an exception just to log it, then re-raising, adds noise without value:

# UNNECESSARY: the scheduler already logs unhandled exceptions
def tick(node):
    try:
        process()
    except Exception as e:
        node.log_error(f"Error: {e}")
        raise  # Scheduler logs this again

# BETTER: either handle it or don't catch it
def tick(node):
    process()  # Let failure_policy handle errors

Design Decisions

Why three custom exceptions instead of one? HorusNotFoundError, HorusTransformError, and HorusTimeoutError represent distinct failure modes with different recovery strategies. A missing topic means a node has not started. A stale transform means a sensor stopped publishing. A timeout means the system is overloaded. Catching a generic HorusError would force every handler to inspect the message string. Separate types let you write precise except clauses.

Why does on_error suppress by default? If on_error returns normally, the exception is swallowed. This lets on_error act as a filter -- handle what you know, re-raise what you do not. The alternative (always propagating after on_error) would make custom error handling useless, since the failure policy would fire regardless. Swallow-by-default gives the callback full control.

Why no exception hierarchy? The three HORUS exceptions inherit directly from Exception, not from a shared HorusError base class. This is intentional. In practice, you almost never want to catch "any HORUS error" -- you want to catch a specific failure mode and respond accordingly. A base class encourages overly broad except clauses. If you truly need to catch all three, list them explicitly: except (HorusNotFoundError, HorusTransformError, HorusTimeoutError).

Why standard exceptions for input errors? Invalid topic names raise ValueError, not a custom exception. Serialization failures raise TypeError. These are programming errors, not runtime failures. Standard exceptions mean you do not need to import HORUS-specific types to catch bugs in your own code, and linters/IDEs already understand them.

Trade-offs

failure_policy vs on_error: The failure policy is coarse-grained (one policy per node) but reliable (enforced by the scheduler, works even if Python crashes). on_error is fine-grained but fragile (runs in Python, can itself fail). For safety-critical nodes, rely on failure_policy="fatal" and keep on_error simple. For non-critical nodes, on_error gives flexibility to implement retry logic, circuit breakers, or rate-limited alerts.

Skip vs Ignore: "skip" logs the error and increments the error counter. "ignore" does neither. Use "skip" unless you have measured that the logging overhead is unacceptable at your tick rate. Silent failures with "ignore" make debugging production issues significantly harder.

Defensive tick vs let-it-crash: Adding try/except for every expected failure makes tick() verbose but predictable. Removing all error handling and relying on failure_policy="restart" is simpler code but slower recovery (restart calls shutdown() then init() again). The right balance depends on your recovery cost: if init() takes 2 seconds to reconnect to hardware, defensive handling inside tick() avoids that penalty.


See Also