Emergency Stop (Python)
Safety monitor node that subscribes to sensor topics, evaluates safety conditions every tick, and publishes EmergencyStop when hazards are detected. Sends zero CmdVel to override any active motion. Implements debounce on clear signals and fail-safe behavior when sensor data stops arriving.
Problem
You need a Python safety node that can halt the robot when sensors detect unsafe conditions (obstacles too close, battery low, communication lost).
When To Use
- Any robot with actuators (motors, servos, grippers)
- When safety regulations require guaranteed shutdown
- As a software safety layer alongside a hardware E-stop circuit
Prerequisites
- HORUS installed (Installation Guide)
- Upstream nodes publishing sensor data (
LaserScan,BatteryState)
horus.toml
[package]
name = "emergency-stop-py"
version = "0.1.0"
description = "E-stop monitor with multi-condition safety checks"
language = "python"
Complete Code
#!/usr/bin/env python3
"""Emergency stop monitor — multi-condition safety with debounce and fail-safe."""
import horus
from horus import Node, CmdVel, EmergencyStop, LaserScan, BatteryState, us, ms
# ── Safety thresholds ────────────────────────────────────────
MIN_OBSTACLE_DISTANCE = 0.2 # meters — hard stop if anything closer
LOW_BATTERY_VOLTAGE = 10.5 # volts — shutdown to prevent brownout
CLEAR_THRESHOLD = 50 # ticks of clear signals before releasing
STALE_THRESHOLD = 20 # ticks without sensor data = fault
# ── State ────────────────────────────────────────────────────
estop_active = [False]
consecutive_clears = [0]
ticks_since_scan = [0]
ticks_since_battery = [0]
total_ticks = [0]
trigger_reason = [""]
# ── Safety checks ────────────────────────────────────────────
def check_obstacle(scan):
"""Check if any LaserScan range is below the safety minimum."""
if scan is None:
return False, ""
for i, r in enumerate(scan.ranges):
if 0.01 < r < MIN_OBSTACLE_DISTANCE:
return True, f"obstacle at {r:.2f}m (ray {i})"
return False, ""
def check_battery(battery):
"""Check if battery voltage is critically low."""
if battery is None:
return False, ""
if battery.voltage > 0 and battery.voltage < LOW_BATTERY_VOLTAGE:
return True, f"battery low: {battery.voltage:.1f}V"
return False, ""
def check_stale_sensors():
"""Check if sensor data has stopped arriving (publisher crash)."""
if ticks_since_scan[0] > STALE_THRESHOLD:
return True, f"lidar stale ({ticks_since_scan[0]} ticks)"
if ticks_since_battery[0] > STALE_THRESHOLD:
return True, f"battery stale ({ticks_since_battery[0]} ticks)"
return False, ""
# ── Node callbacks ───────────────────────────────────────────
def estop_tick(node):
total_ticks[0] += 1
ticks_since_scan[0] += 1
ticks_since_battery[0] += 1
# IMPORTANT: always recv() ALL topics every tick to drain buffers
scan = node.recv("lidar.scan")
battery = node.recv("battery.state")
if scan is not None:
ticks_since_scan[0] = 0
if battery is not None:
ticks_since_battery[0] = 0
# Evaluate all safety conditions
triggered = False
reason = ""
obstacle_bad, obstacle_reason = check_obstacle(scan)
if obstacle_bad:
triggered = True
reason = obstacle_reason
battery_bad, battery_reason = check_battery(battery)
if battery_bad:
triggered = True
reason = battery_reason
stale_bad, stale_reason = check_stale_sensors()
if stale_bad:
triggered = True
reason = stale_reason
if triggered:
# SAFETY: immediately activate E-stop
estop_active[0] = True
consecutive_clears[0] = 0
trigger_reason[0] = reason
print(f"E-STOP TRIGGERED: {reason}")
else:
consecutive_clears[0] += 1
# Require N consecutive clear signals before releasing
if estop_active[0] and consecutive_clears[0] >= CLEAR_THRESHOLD:
estop_active[0] = False
print("E-STOP RELEASED after clear period")
if estop_active[0]:
# SAFETY: override cmd_vel with zero — stops all motion
node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))
# Publish E-stop status for monitoring
node.send("safety.estop", EmergencyStop(
engaged=estop_active[0],
reason=trigger_reason[0] if estop_active[0] else "",
))
def estop_shutdown(node):
# SAFETY: zero velocity on shutdown
node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))
node.send("safety.estop", EmergencyStop(engaged=True, reason="shutdown"))
print(f"EStop: shutdown after {total_ticks[0]} ticks")
# ── Main ─────────────────────────────────────────────────────
estop_node = Node(
name="EStop",
tick=estop_tick,
shutdown=estop_shutdown,
rate=100, # 100 Hz safety monitoring
order=100, # IMPORTANT: runs LAST — overrides cmd_vel from other nodes
subs=["lidar.scan", "battery.state"],
pubs=["cmd_vel", "safety.estop"],
budget=200 * us, # tight budget for safety node
deadline=500 * us, # tight deadline
on_miss="safe_mode", # force safe state on deadline miss
)
if __name__ == "__main__":
horus.run(
estop_node,
watchdog_ms=500,
)
Expected Output
[HORUS] Scheduler running — tick_rate: 1000 Hz
[HORUS] Node "EStop" started (100 Hz)
E-STOP TRIGGERED: obstacle at 0.15m (ray 42)
E-STOP RELEASED after clear period
^C
EStop: shutdown after 1500 ticks
[HORUS] Shutting down...
[HORUS] Node "EStop" shutdown complete
Key Points
- High
.order(100)ensures E-stop runs AFTER drive/planning nodes — it overrides theircmd_veloutput - Multi-condition checks: obstacle proximity, battery voltage, and sensor staleness
- Debounce with
CLEAR_THRESHOLDprevents flickering E-stop from bouncing on/off - No signal = fault: if sensors stop publishing, the node treats it as triggered (fail-safe design)
on_miss="safe_mode"is the strictest miss policy — any deadline overrun triggers safe state- 200 us budget keeps safety checks deterministic and fast
EmergencyStopmessage hasengagedandreasonfields for downstream monitoring
Variations
- Wireless E-stop: Subscribe to a network heartbeat topic; missing heartbeats trigger E-stop
- Multi-zone E-stop: Separate E-stop nodes per actuator group (arm vs. wheels)
- Graduated response: Reduce speed before full stop using distance-proportional scaling
- Hardware GPIO: Read a physical E-stop button via
RPi.GPIOand publish tosafety.estop
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
| E-stop doesn't override motors | E-stop order lower than motor node | Set E-stop .order() HIGHER than motor node |
| Motors resume after E-stop | Only sending zero once | Send CmdVel(0, 0) every tick while active |
| E-stop flickers on/off | No debounce on clear signal | Use CLEAR_THRESHOLD consecutive clears |
| E-stop never releases | CLEAR_THRESHOLD too high or no clear signals | Reduce threshold or verify sensors are publishing |
| False triggers on startup | Sensors haven't published yet and stale threshold is too low | Increase STALE_THRESHOLD or skip checks for first N ticks |
See Also
- Emergency Stop (Rust) — Rust version with full system example
- LiDAR Obstacle Avoidance (Python) — Reactive avoidance