Diagnostics Messages

Diagnostics messages keep robots safe and observable in production. They report node health, trigger emergency stops, monitor resources, and track safety state. Every production robot needs these — even simple hobby robots benefit from battery monitoring and heartbeats.

from horus import (
    DiagnosticStatus, EmergencyStop, ResourceUsage, SafetyStatus,
    DiagnosticReport, DiagnosticValue, Heartbeat, NodeHeartbeat,
)

DiagnosticStatus

Node health reporting with severity-level factory methods. Instead of remembering that level 2 means ERROR, use the error() factory.

Constructor

ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")

.ok(message) — Everything Is Fine

ds = DiagnosticStatus.ok("All systems nominal")

Level 0. Publish this periodically to confirm your node is alive and healthy. Monitoring dashboards show OK nodes in green. If a node stops publishing OK statuses, the watchdog knows something is wrong.

.warn(code, message) — Degraded But Functional

ds = DiagnosticStatus.warn(code=101, message="Temperature rising: 65°C")

Level 1. The node is still working but something needs attention. Examples: battery getting low, sensor noise increasing, CPU usage above 70%, communication latency above threshold.

When to use warn vs error: If the robot can still complete its mission, it's a warning. If the mission is compromised, it's an error.

.error(code, message) — Something Is Wrong

ds = DiagnosticStatus.error(code=201, message="Motor stalled on joint 3")

Level 2. The node cannot function correctly. Examples: motor stalled, sensor disconnected, localization lost, path blocked. An operator should investigate.

Common mistake: Using error() for recoverable conditions. If the motor stalls briefly then recovers, that's a warn(). error() should mean "this needs human intervention."

.fatal(code, message) — System Cannot Continue

ds = DiagnosticStatus.fatal(code=301, message="Hardware fault: CAN bus disconnected")

Level 3. Unrecoverable failure. The node should enter safe state and stop. Examples: hardware fault, firmware crash, safety violation. This often triggers an EmergencyStop.

.with_component(name) — Set Component Name

ds = DiagnosticStatus.error(code=201, message="Overheating") \
    .with_component("left_drive_motor")

Returns a new DiagnosticStatus with the component name set. Always set this — monitoring dashboards group statuses by component, and without it, operators can't tell which motor is overheating.

.message_str() / .component_str() — Read Back as Strings

print(ds.message_str())     # "Overheating"
print(ds.component_str())   # "left_drive_motor"

The message and component are stored as fixed-size byte arrays internally. These methods convert them to Python strings.

Example — Node Health Reporter:

from horus import Node, run, DiagnosticStatus, Topic

diag_topic = Topic(DiagnosticStatus)
cpu_percent = 0.0  # Updated elsewhere

def report_health(node):
    if cpu_percent > 90:
        status = DiagnosticStatus.error(code=100, message=f"CPU at {cpu_percent:.0f}%")
    elif cpu_percent > 70:
        status = DiagnosticStatus.warn(code=100, message=f"CPU at {cpu_percent:.0f}%")
    else:
        status = DiagnosticStatus.ok(f"CPU at {cpu_percent:.0f}%")
    diag_topic.send(status.with_component("controller"), node)

run(Node(tick=report_health, rate=1, pubs=["diagnostics"]))

EmergencyStop

The panic button. engage() triggers an immediate stop; release() clears it after an operator confirms safe conditions.

.engage(reason) — Trigger E-Stop

estop = EmergencyStop.engage("Obstacle detected at 0.1m")

Creates an engaged emergency stop with a reason string. Publish this on the e-stop topic and all nodes should immediately enter safe state — stop motors, lock brakes, disable actuators.

.release() — Clear E-Stop

release = EmergencyStop.release()

Creates a release command. Publish this to clear the e-stop and allow normal operation to resume.

Common mistake: Auto-releasing the e-stop programmatically. E-stop release should always require human confirmation — a physical button, operator console acknowledgment, or at minimum a deliberate command. Auto-release defeats the purpose of safety systems.

.with_source(source) — Identify Who Triggered It

estop = EmergencyStop.engage("Collision detected") \
    .with_source("lidar_safety_node")

Returns a new EmergencyStop with a source identifier. When multiple nodes can trigger e-stops, the source tells operators which node detected the problem.

.reason_str() — Read the Reason

print(estop.reason_str())  # "Collision detected"

Example — Safety Controller:

from horus import Node, run, EmergencyStop, LaserScan, CmdVel, Topic

scan_topic = Topic(LaserScan)
estop_topic = Topic(EmergencyStop)
cmd_topic = Topic(CmdVel)

def safety_check(node):
    scan = scan_topic.recv(node)
    if scan is None:
        return
    closest = scan.min_range()
    if closest is not None and closest < 0.15:
        estop = EmergencyStop.engage(f"Object at {closest:.2f}m") \
            .with_source("safety_monitor")
        estop_topic.send(estop, node)
        cmd_topic.send(CmdVel.zero(), node)

run(Node(tick=safety_check, rate=50, pubs=["estop", "cmd_vel"], subs=["scan"]))

ResourceUsage

System resource monitoring with threshold checks.

Constructor

ru = ResourceUsage(cpu_percent=85.0, memory_bytes=4_000_000_000)

.is_cpu_high(threshold) — CPU Overload Check

if ru.is_cpu_high(80.0):
    print("CPU overloaded!")

Returns True if cpu_percent exceeds the given threshold. Typical thresholds:

  • 70%: Warning — consider reducing processing load
  • 85%: Error — system may miss deadlines
  • 95%: Critical — risk of dropped messages and missed ticks

.is_memory_high(threshold) — Memory Pressure

if ru.is_memory_high(90.0):
    print("Memory pressure! Consider releasing caches")

.is_temperature_high(threshold) — Thermal Check

if ru.is_temperature_high(75.0):
    print("Overheating! Reduce motor duty cycle")

Hardware-specific. Raspberry Pi throttles at 80°C. Jetson limits at 97°C. Industrial PCs vary.


SafetyStatus

Safety system state machine with fault tracking.

Constructor

ss = SafetyStatus()

.is_safe() — All Clear?

if not ss.is_safe():
    print("Safety fault — entering safe state")

Returns True when no faults are active, e-stop is not engaged, and watchdog is healthy. Check this every tick — if it returns False, your node should stop actuators.

.set_fault(code) — Register a Fault

ss.set_fault(101)  # Motor overcurrent fault

Registers a fault code. is_safe() will return False until all faults are cleared. Use fault codes consistently across your system — document what each code means.

.clear_faults() — Reset After Recovery

ss.clear_faults()
print(ss.is_safe())  # True (assuming no other issues)

Clears all registered faults. Call this only after the root cause has been fixed — not as a way to ignore problems.


DiagnosticReport

Structured diagnostic data with typed key-value pairs. More organized than free-text messages — monitoring tools can parse and chart the values.

Constructor

report = DiagnosticReport(component="sensor_hub")

.add_string(key, value) — Text Data

report.add_string("firmware_version", "2.1.3")
report.add_string("status", "calibrating")

.add_int(key, value) — Integer Data

report.add_int("retry_count", 3)
report.add_int("messages_dropped", 0)

.add_float(key, value) — Float Data

report.add_float("temperature_c", 42.5)
report.add_float("voltage", 24.1)

.add_bool(key, value) — Boolean Data

report.add_bool("calibrated", True)
report.add_bool("firmware_update_available", False)

All add_* methods raise ValueError if the report is full (max 16 values).

Example — Periodic Diagnostic Report:

from horus import DiagnosticReport, Topic

diag_topic = Topic(DiagnosticReport)

def publish_diagnostics(node, temp, voltage, calibrated):
    report = DiagnosticReport(component="imu_driver")
    report.add_float("temperature_c", temp)
    report.add_float("supply_voltage", voltage)
    report.add_bool("calibrated", calibrated)
    report.add_int("tick_count", node.tick)
    diag_topic.send(report, node)

Heartbeat

Simple "I'm alive" signal from nodes.

.update(uptime) — Tick the Heartbeat

hb = Heartbeat(node_name="controller", node_id=1)
hb.update(uptime=120.5)  # Increments sequence, sets uptime

Call once per tick and publish. The monitoring system watches for heartbeats — if a node stops publishing, it's considered dead.


NodeHeartbeat

Filesystem-based heartbeat for cross-process discovery. Written to shared memory, not published on topics.

.update_timestamp() — Refresh Timestamp

nhb = NodeHeartbeat(state=1, health=0)
nhb.update_timestamp()  # Sets to current time

.is_fresh(max_age_secs) — Check Staleness

if not nhb.is_fresh(max_age_secs=5):
    print("Node heartbeat is stale — node may have crashed")

Returns True if the timestamp is within max_age_secs of the current time. Use this in monitoring tools to detect crashed nodes.


See Also