Diagnostics Messages
Diagnostics messages keep robots safe and observable in production. They report node health, trigger emergency stops, monitor resources, and track safety state. Every production robot needs these — even simple hobby robots benefit from battery monitoring and heartbeats.
from horus import (
DiagnosticStatus, EmergencyStop, ResourceUsage, SafetyStatus,
DiagnosticReport, DiagnosticValue, Heartbeat, NodeHeartbeat,
)
DiagnosticStatus
Node health reporting with severity-level factory methods. Instead of remembering that level 2 means ERROR, use the error() factory.
Constructor
ds = DiagnosticStatus(level=2, code=101, message="overheating", component="motor")
.ok(message) — Everything Is Fine
ds = DiagnosticStatus.ok("All systems nominal")
Level 0. Publish this periodically to confirm your node is alive and healthy. Monitoring dashboards show OK nodes in green. If a node stops publishing OK statuses, the watchdog knows something is wrong.
.warn(code, message) — Degraded But Functional
ds = DiagnosticStatus.warn(code=101, message="Temperature rising: 65°C")
Level 1. The node is still working but something needs attention. Examples: battery getting low, sensor noise increasing, CPU usage above 70%, communication latency above threshold.
When to use warn vs error: If the robot can still complete its mission, it's a warning. If the mission is compromised, it's an error.
.error(code, message) — Something Is Wrong
ds = DiagnosticStatus.error(code=201, message="Motor stalled on joint 3")
Level 2. The node cannot function correctly. Examples: motor stalled, sensor disconnected, localization lost, path blocked. An operator should investigate.
Common mistake: Using
error()for recoverable conditions. If the motor stalls briefly then recovers, that's awarn().error()should mean "this needs human intervention."
.fatal(code, message) — System Cannot Continue
ds = DiagnosticStatus.fatal(code=301, message="Hardware fault: CAN bus disconnected")
Level 3. Unrecoverable failure. The node should enter safe state and stop. Examples: hardware fault, firmware crash, safety violation. This often triggers an EmergencyStop.
.with_component(name) — Set Component Name
ds = DiagnosticStatus.error(code=201, message="Overheating") \
.with_component("left_drive_motor")
Returns a new DiagnosticStatus with the component name set. Always set this — monitoring dashboards group statuses by component, and without it, operators can't tell which motor is overheating.
.message_str() / .component_str() — Read Back as Strings
print(ds.message_str()) # "Overheating"
print(ds.component_str()) # "left_drive_motor"
The message and component are stored as fixed-size byte arrays internally. These methods convert them to Python strings.
Example — Node Health Reporter:
from horus import Node, run, DiagnosticStatus, Topic
diag_topic = Topic(DiagnosticStatus)
cpu_percent = 0.0 # Updated elsewhere
def report_health(node):
if cpu_percent > 90:
status = DiagnosticStatus.error(code=100, message=f"CPU at {cpu_percent:.0f}%")
elif cpu_percent > 70:
status = DiagnosticStatus.warn(code=100, message=f"CPU at {cpu_percent:.0f}%")
else:
status = DiagnosticStatus.ok(f"CPU at {cpu_percent:.0f}%")
diag_topic.send(status.with_component("controller"), node)
run(Node(tick=report_health, rate=1, pubs=["diagnostics"]))
EmergencyStop
The panic button. engage() triggers an immediate stop; release() clears it after an operator confirms safe conditions.
.engage(reason) — Trigger E-Stop
estop = EmergencyStop.engage("Obstacle detected at 0.1m")
Creates an engaged emergency stop with a reason string. Publish this on the e-stop topic and all nodes should immediately enter safe state — stop motors, lock brakes, disable actuators.
.release() — Clear E-Stop
release = EmergencyStop.release()
Creates a release command. Publish this to clear the e-stop and allow normal operation to resume.
Common mistake: Auto-releasing the e-stop programmatically. E-stop release should always require human confirmation — a physical button, operator console acknowledgment, or at minimum a deliberate command. Auto-release defeats the purpose of safety systems.
.with_source(source) — Identify Who Triggered It
estop = EmergencyStop.engage("Collision detected") \
.with_source("lidar_safety_node")
Returns a new EmergencyStop with a source identifier. When multiple nodes can trigger e-stops, the source tells operators which node detected the problem.
.reason_str() — Read the Reason
print(estop.reason_str()) # "Collision detected"
Example — Safety Controller:
from horus import Node, run, EmergencyStop, LaserScan, CmdVel, Topic
scan_topic = Topic(LaserScan)
estop_topic = Topic(EmergencyStop)
cmd_topic = Topic(CmdVel)
def safety_check(node):
scan = scan_topic.recv(node)
if scan is None:
return
closest = scan.min_range()
if closest is not None and closest < 0.15:
estop = EmergencyStop.engage(f"Object at {closest:.2f}m") \
.with_source("safety_monitor")
estop_topic.send(estop, node)
cmd_topic.send(CmdVel.zero(), node)
run(Node(tick=safety_check, rate=50, pubs=["estop", "cmd_vel"], subs=["scan"]))
ResourceUsage
System resource monitoring with threshold checks.
Constructor
ru = ResourceUsage(cpu_percent=85.0, memory_bytes=4_000_000_000)
.is_cpu_high(threshold) — CPU Overload Check
if ru.is_cpu_high(80.0):
print("CPU overloaded!")
Returns True if cpu_percent exceeds the given threshold. Typical thresholds:
- 70%: Warning — consider reducing processing load
- 85%: Error — system may miss deadlines
- 95%: Critical — risk of dropped messages and missed ticks
.is_memory_high(threshold) — Memory Pressure
if ru.is_memory_high(90.0):
print("Memory pressure! Consider releasing caches")
.is_temperature_high(threshold) — Thermal Check
if ru.is_temperature_high(75.0):
print("Overheating! Reduce motor duty cycle")
Hardware-specific. Raspberry Pi throttles at 80°C. Jetson limits at 97°C. Industrial PCs vary.
SafetyStatus
Safety system state machine with fault tracking.
Constructor
ss = SafetyStatus()
.is_safe() — All Clear?
if not ss.is_safe():
print("Safety fault — entering safe state")
Returns True when no faults are active, e-stop is not engaged, and watchdog is healthy. Check this every tick — if it returns False, your node should stop actuators.
.set_fault(code) — Register a Fault
ss.set_fault(101) # Motor overcurrent fault
Registers a fault code. is_safe() will return False until all faults are cleared. Use fault codes consistently across your system — document what each code means.
.clear_faults() — Reset After Recovery
ss.clear_faults()
print(ss.is_safe()) # True (assuming no other issues)
Clears all registered faults. Call this only after the root cause has been fixed — not as a way to ignore problems.
DiagnosticReport
Structured diagnostic data with typed key-value pairs. More organized than free-text messages — monitoring tools can parse and chart the values.
Constructor
report = DiagnosticReport(component="sensor_hub")
.add_string(key, value) — Text Data
report.add_string("firmware_version", "2.1.3")
report.add_string("status", "calibrating")
.add_int(key, value) — Integer Data
report.add_int("retry_count", 3)
report.add_int("messages_dropped", 0)
.add_float(key, value) — Float Data
report.add_float("temperature_c", 42.5)
report.add_float("voltage", 24.1)
.add_bool(key, value) — Boolean Data
report.add_bool("calibrated", True)
report.add_bool("firmware_update_available", False)
All add_* methods raise ValueError if the report is full (max 16 values).
Example — Periodic Diagnostic Report:
from horus import DiagnosticReport, Topic
diag_topic = Topic(DiagnosticReport)
def publish_diagnostics(node, temp, voltage, calibrated):
report = DiagnosticReport(component="imu_driver")
report.add_float("temperature_c", temp)
report.add_float("supply_voltage", voltage)
report.add_bool("calibrated", calibrated)
report.add_int("tick_count", node.tick)
diag_topic.send(report, node)
Heartbeat
Simple "I'm alive" signal from nodes.
.update(uptime) — Tick the Heartbeat
hb = Heartbeat(node_name="controller", node_id=1)
hb.update(uptime=120.5) # Increments sequence, sets uptime
Call once per tick and publish. The monitoring system watches for heartbeats — if a node stops publishing, it's considered dead.
NodeHeartbeat
Filesystem-based heartbeat for cross-process discovery. Written to shared memory, not published on topics.
.update_timestamp() — Refresh Timestamp
nhb = NodeHeartbeat(state=1, health=0)
nhb.update_timestamp() # Sets to current time
.is_fresh(max_age_secs) — Check Staleness
if not nhb.is_fresh(max_age_secs=5):
print("Node heartbeat is stale — node may have crashed")
Returns True if the timestamp is within max_age_secs of the current time. Use this in monitoring tools to detect crashed nodes.
See Also
- Navigation Messages — NavGoal, path following (often paired with diagnostics)
- Sensor Messages — BatteryState for power monitoring
- Force Messages — WrenchStamped.exceeds_limits() for force safety
- Rust Diagnostics Messages — Rust API reference