Servo Controller

Controls a bus of servos (e.g., Dynamixel, hobby PWM). Reads position commands from a topic, writes to hardware, publishes joint feedback. Implements ordered shutdown -- all servos return to home position before exit.

Problem

You need to control multiple servos on a bus (serial, CAN, or PWM) with joint limit enforcement and safe shutdown.

When To Use

  • Robot arms with 3-12 servos on a shared bus
  • Pan-tilt camera mounts
  • Any multi-actuator system requiring coordinated position control

Prerequisites

  • HORUS installed (Installation Guide)
  • Servo hardware or a simulated servo bus for testing

horus.toml

[package]
name = "servo-controller"
version = "0.1.0"
description = "Multi-servo bus with safe shutdown"

Complete Code

// simplified
use horus::prelude::*;

const NUM_SERVOS: usize = 6;
const HOME_POSITION: f32 = 0.0;   // radians — safe resting position

/// Command for all servos
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct ServoGoals {
    positions: [f32; NUM_SERVOS],   // target positions in radians
}

/// Feedback from all servos
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct ServoFeedback {
    positions: [f32; NUM_SERVOS],
    velocities: [f32; NUM_SERVOS],
    temperatures: [f32; NUM_SERVOS],
}

// ── Servo Node ──────────────────────────────────────────────

struct ServoNode {
    goal_sub: Topic<ServoGoals>,
    feedback_pub: Topic<ServoFeedback>,
    current_positions: [f32; NUM_SERVOS],
}

impl ServoNode {
    fn new() -> Result<Self> {
        Ok(Self {
            goal_sub: Topic::new("servo.goals")?,
            feedback_pub: Topic::new("servo.feedback")?,
            current_positions: [HOME_POSITION; NUM_SERVOS],
        })
    }

    /// Write positions to hardware bus (replace with real driver)
    fn write_hardware(&mut self, goals: &[f32; NUM_SERVOS]) {
        self.current_positions = *goals;
    }

    /// Read feedback from hardware bus (replace with real driver)
    fn read_hardware(&self) -> ServoFeedback {
        ServoFeedback {
            positions: self.current_positions,
            velocities: [0.0; NUM_SERVOS],
            temperatures: [35.0; NUM_SERVOS],
        }
    }
}

impl Node for ServoNode {
    fn name(&self) -> &str { "Servo" }

    fn tick(&mut self) {
        // IMPORTANT: always recv() every tick to drain the buffer
        if let Some(goals) = self.goal_sub.recv() {
            let mut clamped = goals.positions;
            for pos in &mut clamped {
                // SAFETY: enforce joint limits to prevent mechanical damage
                *pos = pos.clamp(-std::f32::consts::PI, std::f32::consts::PI);
            }
            self.write_hardware(&clamped);
        }

        let feedback = self.read_hardware();

        // WARNING: check for overheating servos
        for (i, temp) in feedback.temperatures.iter().enumerate() {
            if *temp > 70.0 {
                eprintln!("WARNING: servo {} temperature {:.1}C exceeds limit", i, temp);
            }
        }

        self.feedback_pub.send(feedback);
    }

    fn shutdown(&mut self) -> Result<()> {
        // SAFETY: return ALL servos to home position before exiting
        let home = [HOME_POSITION; NUM_SERVOS];
        self.write_hardware(&home);
        self.feedback_pub.send(ServoFeedback {
            positions: home,
            velocities: [0.0; NUM_SERVOS],
            temperatures: [0.0; NUM_SERVOS],
        });
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Execution order: servo reads goals, writes hardware, publishes feedback
    scheduler.add(ServoNode::new()?)
        .order(0)
        .rate(100_u64.hz())     // 100Hz servo update rate — auto-enables RT
        .budget(800.us())       // 800μs budget for bus communication
        .on_miss(Miss::Warn)
        .build()?;

    scheduler.run()
}

Expected Output

[HORUS] Scheduler running — tick_rate: 100 Hz
[HORUS] Node "Servo" started (Rt, 100 Hz, budget: 800μs, deadline: 9.5ms)
^C
[HORUS] Shutting down...
[HORUS] Node "Servo" shutdown complete

write_hardware() and read_hardware() are placeholders. For a complete working example with serialport (Rust) or pyserial (Python), see the Real Hardware Recipe.

Key Points

  • Fixed-size arrays ([f32; NUM_SERVOS]) enable #[repr(C)] + Copy for zero-copy IPC
  • Joint limit clamping in tick() prevents hardware damage regardless of upstream commands
  • Temperature monitoring catches overheating before servo damage
  • shutdown() returns to home — critical for robot arms that hold pose under gravity
  • 800μs budget accounts for serial bus latency (Dynamixel at 1Mbps takes ~500μs for 6 servos)
  • 100Hz is typical for hobby servos; use 200-500Hz for industrial servos

Variations

  • Velocity mode: Replace position commands with velocity targets for wheeled joints
  • Trajectory interpolation: Accept waypoints and interpolate between them over time in tick()
  • Mixed bus: Use different NUM_SERVOS constants per bus and run multiple ServoNode instances
  • Current limiting: Read current_amps from feedback and reduce torque if limits are exceeded

Common Errors

SymptomCauseFix
Servos jerk on startupNo initial position read before first commandRead current positions in init() before accepting commands
Overheating warnings constantlyServo loaded beyond continuous ratingReduce duty cycle or upgrade servos
Positions overshoot limitsClamp range too wide for physical jointTighten clamp() values to match hardware stops
Bus timeout errorsBaud rate mismatch or cable issueVerify baud rate in horus.toml matches servo firmware
Servos do not return to home on Ctrl+Cshutdown() not implementedAlways implement shutdown() for actuator nodes

See Also