Servo Controller
Controls a bus of servos (e.g., Dynamixel, hobby PWM). Reads position commands from a topic, writes to hardware, publishes joint feedback. Implements ordered shutdown — all servos return to home position before exit.
horus.toml
[package]
name = "servo-controller"
version = "0.1.0"
description = "Multi-servo bus with safe shutdown"
Complete Code
use horus::prelude::*;
const NUM_SERVOS: usize = 6;
const HOME_POSITION: f32 = 0.0; // radians — safe resting position
/// Command for all servos
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct ServoGoals {
positions: [f32; NUM_SERVOS], // target positions in radians
}
/// Feedback from all servos
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct ServoFeedback {
positions: [f32; NUM_SERVOS],
velocities: [f32; NUM_SERVOS],
temperatures: [f32; NUM_SERVOS],
}
// ── Servo Node ──────────────────────────────────────────────
struct ServoNode {
goal_sub: Topic<ServoGoals>,
feedback_pub: Topic<ServoFeedback>,
current_positions: [f32; NUM_SERVOS],
}
impl ServoNode {
fn new() -> Result<Self> {
Ok(Self {
goal_sub: Topic::new("servo.goals")?,
feedback_pub: Topic::new("servo.feedback")?,
current_positions: [HOME_POSITION; NUM_SERVOS],
})
}
/// Write positions to hardware bus (replace with real driver)
fn write_hardware(&mut self, goals: &[f32; NUM_SERVOS]) {
self.current_positions = *goals;
}
/// Read feedback from hardware bus (replace with real driver)
fn read_hardware(&self) -> ServoFeedback {
ServoFeedback {
positions: self.current_positions,
velocities: [0.0; NUM_SERVOS],
temperatures: [35.0; NUM_SERVOS],
}
}
}
impl Node for ServoNode {
fn name(&self) -> &str { "Servo" }
fn tick(&mut self) {
// IMPORTANT: always recv() every tick to drain the buffer
if let Some(goals) = self.goal_sub.recv() {
let mut clamped = goals.positions;
for pos in &mut clamped {
// SAFETY: enforce joint limits to prevent mechanical damage
*pos = pos.clamp(-std::f32::consts::PI, std::f32::consts::PI);
}
self.write_hardware(&clamped);
}
let feedback = self.read_hardware();
// WARNING: check for overheating servos
for (i, temp) in feedback.temperatures.iter().enumerate() {
if *temp > 70.0 {
eprintln!("WARNING: servo {} temperature {:.1}C exceeds limit", i, temp);
}
}
self.feedback_pub.send(feedback);
}
fn shutdown(&mut self) -> Result<()> {
// SAFETY: return ALL servos to home position before exiting
let home = [HOME_POSITION; NUM_SERVOS];
self.write_hardware(&home);
self.feedback_pub.send(ServoFeedback {
positions: home,
velocities: [0.0; NUM_SERVOS],
temperatures: [0.0; NUM_SERVOS],
});
Ok(())
}
}
fn main() -> Result<()> {
let mut scheduler = Scheduler::new();
// Execution order: servo reads goals, writes hardware, publishes feedback
scheduler.add(ServoNode::new()?)
.order(0)
.rate(100_u64.hz()) // 100Hz servo update rate — auto-enables RT
.budget(800.us()) // 800μs budget for bus communication
.on_miss(Miss::Warn)
.build()?;
scheduler.run()
}
Expected Output
[HORUS] Scheduler running — tick_rate: 100 Hz
[HORUS] Node "Servo" started (Rt, 100 Hz, budget: 800μs, deadline: 9.5ms)
^C
[HORUS] Shutting down...
[HORUS] Node "Servo" shutdown complete
Key Points
- Fixed-size arrays (
[f32; NUM_SERVOS]) enable#[repr(C)]+Copyfor zero-copy IPC - Joint limit clamping in
tick()prevents hardware damage regardless of upstream commands - Temperature monitoring catches overheating before servo damage
shutdown()returns to home — critical for robot arms that hold pose under gravity- 800μs budget accounts for serial bus latency (Dynamixel at 1Mbps takes ~500μs for 6 servos)
- 100Hz is typical for hobby servos; use 200-500Hz for industrial servos