Servo Controller (Python)

Controls a bus of servos (Dynamixel, hobby PWM, etc.). Subscribes to ServoCommand messages, enforces joint limits, writes to hardware, publishes JointState feedback. Returns all servos to home position on shutdown.

Problem

You need to control multiple servos on a bus with joint limit enforcement and safe shutdown from Python.

When To Use

  • Robot arms with 3-12 servos on a shared bus
  • Pan-tilt camera mounts
  • Any multi-actuator system requiring coordinated position control

Prerequisites

horus.toml

[package]
name = "servo-controller-py"
version = "0.1.0"
description = "Multi-servo bus controller with safe shutdown"
language = "python"

Complete Code

#!/usr/bin/env python3
"""Multi-servo controller with joint limits and safe shutdown."""

import math
import horus
from horus import Node, ServoCommand, JointState, us, ms

# ── Configuration ────────────────────────────────────────────

NUM_SERVOS = 6
HOME_POSITION = 0.0                        # radians — safe resting position
JOINT_LIMIT = math.pi                      # +/- pi radians
TEMP_WARNING = 70.0                        # celsius
SERVO_NAMES = [f"joint_{i}" for i in range(NUM_SERVOS)]

# ── State ────────────────────────────────────────────────────

positions = [HOME_POSITION] * NUM_SERVOS
velocities = [0.0] * NUM_SERVOS
temperatures = [35.0] * NUM_SERVOS

# ── Hardware stubs ───────────────────────────────────────────

def write_servo(servo_id, position):
    """Write position to a single servo — replace with real driver.

    For Dynamixel: use dynamixel_sdk
    For hobby PWM: use RPi.GPIO or pigpio
    """
    if 0 <= servo_id < NUM_SERVOS:
        positions[servo_id] = position

def read_feedback():
    """Read feedback from all servos — replace with real driver."""
    return positions[:], velocities[:], temperatures[:]

# ── Node callbacks ───────────────────────────────────────────

def servo_tick(node):
    # IMPORTANT: always recv() every tick to drain the buffer
    cmd = node.recv("servo.command")
    if cmd is not None:
        # SAFETY: enforce joint limits to prevent mechanical damage
        clamped_pos = max(-JOINT_LIMIT, min(JOINT_LIMIT, cmd.position))
        write_servo(cmd.servo_id, clamped_pos)

    # Read current state from all servos
    pos, vel, temp = read_feedback()

    # WARNING: check for overheating servos
    for i, t in enumerate(temp):
        if t > TEMP_WARNING:
            print(f"WARNING: servo {i} temperature {t:.1f}C exceeds limit")

    # Publish joint state feedback
    feedback = JointState(
        names=SERVO_NAMES,
        positions=pos,
        velocities=vel,
        efforts=[0.0] * NUM_SERVOS,
    )
    node.send("servo.feedback", feedback)

def servo_init(node):
    print(f"ServoController: initialized {NUM_SERVOS} servos")
    # Read current positions from hardware before accepting commands
    pos, _, _ = read_feedback()
    for i, p in enumerate(pos):
        positions[i] = p

def servo_shutdown(node):
    # SAFETY: return ALL servos to home position before exiting
    for i in range(NUM_SERVOS):
        write_servo(i, HOME_POSITION)

    feedback = JointState(
        names=SERVO_NAMES,
        positions=[HOME_POSITION] * NUM_SERVOS,
        velocities=[0.0] * NUM_SERVOS,
        efforts=[0.0] * NUM_SERVOS,
    )
    node.send("servo.feedback", feedback)
    print("ServoController: all servos returned to home position")

# ── Main ─────────────────────────────────────────────────────

servo_node = Node(
    name="ServoController",
    tick=servo_tick,
    init=servo_init,
    shutdown=servo_shutdown,
    rate=100,                      # 100 Hz servo update rate
    order=0,
    subs=["servo.command"],
    pubs=["servo.feedback"],
    budget=800 * us,               # 800 us budget for bus communication
    on_miss="warn",
)

if __name__ == "__main__":
    horus.run(servo_node)

Expected Output

[HORUS] Scheduler running — tick_rate: 1000 Hz
ServoController: initialized 6 servos
[HORUS] Node "ServoController" started (100 Hz)
^C
ServoController: all servos returned to home position
[HORUS] Shutting down...
[HORUS] Node "ServoController" shutdown complete

Key Points

  • ServoCommand has servo_id, position, speed, and enable fields — one command per servo per tick
  • JointState publishes all servo positions/velocities in a single message with named joints
  • Joint limit clamping in tick() prevents hardware damage regardless of upstream commands
  • Temperature monitoring catches overheating before servo damage
  • shutdown() returns to home — critical for robot arms that hold pose under gravity
  • init() reads current positions — prevents jerking to an unexpected position on startup
  • 800 us budget accounts for serial bus latency (Dynamixel at 1Mbps takes ~500 us for 6 servos)

Variations

  • Batch commands: Accept all servo positions in a single JointCommand message instead of per-servo ServoCommand
  • Velocity mode: Replace position commands with velocity targets for wheeled joints
  • Trajectory interpolation: Accept waypoints and interpolate between them over time
  • Current limiting: Track servo current draw and reduce torque if limits are exceeded

Common Errors

SymptomCauseFix
Servos jerk on startupNo initial position read before first commandRead current positions in init() before accepting commands
Overheating warnings constantlyServo loaded beyond continuous ratingReduce duty cycle or upgrade servos
Positions overshoot limitsClamp range too wide for physical jointTighten clamp values to match hardware stops
Bus timeout errorsBaud rate mismatch or cable issueVerify baud rate matches servo firmware
Servos do not return to home on Ctrl+Cshutdown() not implementedAlways implement shutdown() for actuator nodes

See Also