Emergency Stop (Python)

Safety monitor node that subscribes to sensor topics, evaluates safety conditions every tick, and publishes EmergencyStop when hazards are detected. Sends zero CmdVel to override any active motion. Implements debounce on clear signals and fail-safe behavior when sensor data stops arriving.

Problem

You need a Python safety node that can halt the robot when sensors detect unsafe conditions (obstacles too close, battery low, communication lost).

When To Use

  • Any robot with actuators (motors, servos, grippers)
  • When safety regulations require guaranteed shutdown
  • As a software safety layer alongside a hardware E-stop circuit

Prerequisites

  • HORUS installed (Installation Guide)
  • Upstream nodes publishing sensor data (LaserScan, BatteryState)

horus.toml

[package]
name = "emergency-stop-py"
version = "0.1.0"
description = "E-stop monitor with multi-condition safety checks"
language = "python"

Complete Code

#!/usr/bin/env python3
"""Emergency stop monitor — multi-condition safety with debounce and fail-safe."""

import horus
from horus import Node, CmdVel, EmergencyStop, LaserScan, BatteryState, us, ms

# ── Safety thresholds ────────────────────────────────────────

MIN_OBSTACLE_DISTANCE = 0.2    # meters — hard stop if anything closer
LOW_BATTERY_VOLTAGE = 10.5     # volts — shutdown to prevent brownout
CLEAR_THRESHOLD = 50           # ticks of clear signals before releasing
STALE_THRESHOLD = 20           # ticks without sensor data = fault

# ── State ────────────────────────────────────────────────────

estop_active = [False]
consecutive_clears = [0]
ticks_since_scan = [0]
ticks_since_battery = [0]
total_ticks = [0]
trigger_reason = [""]

# ── Safety checks ────────────────────────────────────────────

def check_obstacle(scan):
    """Check if any LaserScan range is below the safety minimum."""
    if scan is None:
        return False, ""
    for i, r in enumerate(scan.ranges):
        if 0.01 < r < MIN_OBSTACLE_DISTANCE:
            return True, f"obstacle at {r:.2f}m (ray {i})"
    return False, ""

def check_battery(battery):
    """Check if battery voltage is critically low."""
    if battery is None:
        return False, ""
    if battery.voltage > 0 and battery.voltage < LOW_BATTERY_VOLTAGE:
        return True, f"battery low: {battery.voltage:.1f}V"
    return False, ""

def check_stale_sensors():
    """Check if sensor data has stopped arriving (publisher crash)."""
    if ticks_since_scan[0] > STALE_THRESHOLD:
        return True, f"lidar stale ({ticks_since_scan[0]} ticks)"
    if ticks_since_battery[0] > STALE_THRESHOLD:
        return True, f"battery stale ({ticks_since_battery[0]} ticks)"
    return False, ""

# ── Node callbacks ───────────────────────────────────────────

def estop_tick(node):
    total_ticks[0] += 1
    ticks_since_scan[0] += 1
    ticks_since_battery[0] += 1

    # IMPORTANT: always recv() ALL topics every tick to drain buffers
    scan = node.recv("lidar.scan")
    battery = node.recv("battery.state")

    if scan is not None:
        ticks_since_scan[0] = 0
    if battery is not None:
        ticks_since_battery[0] = 0

    # Evaluate all safety conditions
    triggered = False
    reason = ""

    obstacle_bad, obstacle_reason = check_obstacle(scan)
    if obstacle_bad:
        triggered = True
        reason = obstacle_reason

    battery_bad, battery_reason = check_battery(battery)
    if battery_bad:
        triggered = True
        reason = battery_reason

    stale_bad, stale_reason = check_stale_sensors()
    if stale_bad:
        triggered = True
        reason = stale_reason

    if triggered:
        # SAFETY: immediately activate E-stop
        estop_active[0] = True
        consecutive_clears[0] = 0
        trigger_reason[0] = reason
        print(f"E-STOP TRIGGERED: {reason}")
    else:
        consecutive_clears[0] += 1

    # Require N consecutive clear signals before releasing
    if estop_active[0] and consecutive_clears[0] >= CLEAR_THRESHOLD:
        estop_active[0] = False
        print("E-STOP RELEASED after clear period")

    if estop_active[0]:
        # SAFETY: override cmd_vel with zero — stops all motion
        node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))

    # Publish E-stop status for monitoring
    node.send("safety.estop", EmergencyStop(
        engaged=estop_active[0],
        reason=trigger_reason[0] if estop_active[0] else "",
    ))

def estop_shutdown(node):
    # SAFETY: zero velocity on shutdown
    node.send("cmd_vel", CmdVel(linear=0.0, angular=0.0))
    node.send("safety.estop", EmergencyStop(engaged=True, reason="shutdown"))
    print(f"EStop: shutdown after {total_ticks[0]} ticks")

# ── Main ─────────────────────────────────────────────────────

estop_node = Node(
    name="EStop",
    tick=estop_tick,
    shutdown=estop_shutdown,
    rate=100,                      # 100 Hz safety monitoring
    order=100,                     # IMPORTANT: runs LAST — overrides cmd_vel from other nodes
    subs=["lidar.scan", "battery.state"],
    pubs=["cmd_vel", "safety.estop"],
    budget=200 * us,               # tight budget for safety node
    deadline=500 * us,             # tight deadline
    on_miss="safe_mode",           # force safe state on deadline miss
)

if __name__ == "__main__":
    horus.run(
        estop_node,
        watchdog_ms=500,
    )

Expected Output

[HORUS] Scheduler running — tick_rate: 1000 Hz
[HORUS] Node "EStop" started (100 Hz)
E-STOP TRIGGERED: obstacle at 0.15m (ray 42)
E-STOP RELEASED after clear period
^C
EStop: shutdown after 1500 ticks
[HORUS] Shutting down...
[HORUS] Node "EStop" shutdown complete

Key Points

  • High .order(100) ensures E-stop runs AFTER drive/planning nodes — it overrides their cmd_vel output
  • Multi-condition checks: obstacle proximity, battery voltage, and sensor staleness
  • Debounce with CLEAR_THRESHOLD prevents flickering E-stop from bouncing on/off
  • No signal = fault: if sensors stop publishing, the node treats it as triggered (fail-safe design)
  • on_miss="safe_mode" is the strictest miss policy — any deadline overrun triggers safe state
  • 200 us budget keeps safety checks deterministic and fast
  • EmergencyStop message has engaged and reason fields for downstream monitoring

Variations

  • Wireless E-stop: Subscribe to a network heartbeat topic; missing heartbeats trigger E-stop
  • Multi-zone E-stop: Separate E-stop nodes per actuator group (arm vs. wheels)
  • Graduated response: Reduce speed before full stop using distance-proportional scaling
  • Hardware GPIO: Read a physical E-stop button via RPi.GPIO and publish to safety.estop

Common Errors

SymptomCauseFix
E-stop doesn't override motorsE-stop order lower than motor nodeSet E-stop .order() HIGHER than motor node
Motors resume after E-stopOnly sending zero onceSend CmdVel(0, 0) every tick while active
E-stop flickers on/offNo debounce on clear signalUse CLEAR_THRESHOLD consecutive clears
E-stop never releasesCLEAR_THRESHOLD too high or no clear signalsReduce threshold or verify sensors are publishing
False triggers on startupSensors haven't published yet and stale threshold is too lowIncrease STALE_THRESHOLD or skip checks for first N ticks

See Also