Control Messages
Command messages for motors, servos, and actuators. All are typed, zero-copy, and binary-compatible with Rust.
from horus import CmdVel, MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig, JointCommand
CmdVel
2D velocity command — the most common control message in robotics. Used for differential drive, holonomic, and Ackermann robots.
import horus
cmd = horus.CmdVel(linear=0.5, angular=0.3)
print(f"Forward: {cmd.linear} m/s")
print(f"Turn: {cmd.angular} rad/s")
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
linear | float | m/s | Forward/backward velocity |
angular | float | rad/s | Rotational velocity (counter-clockwise positive) |
ROS2 equivalent:
geometry_msgs/msg/Twist(2D simplified)Common rates: 10-1000 Hz depending on controller. Motor control at 100+ Hz, teleoperation at 10-30 Hz.
Safety patterns:
# ALWAYS clamp before sending to hardware
MAX_LINEAR = 1.0 # m/s
MAX_ANGULAR = 2.0 # rad/s
def safe_cmd(linear, angular):
return horus.CmdVel(
linear=max(-MAX_LINEAR, min(MAX_LINEAR, linear)),
angular=max(-MAX_ANGULAR, min(MAX_ANGULAR, angular)),
)
# ALWAYS send zero in shutdown
def motor_shutdown(node):
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
# Stale command watchdog — zero motors if no command for 500ms
stale_ticks = [0]
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is None:
stale_ticks[0] += 1
if stale_ticks[0] > 50: # 500ms at 100Hz
node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
return
stale_ticks[0] = 0
node.send("motor", safe_cmd(cmd.linear, cmd.angular))
Common pitfalls:
- Sending unclamped values → motor damage. Always clamp to hardware limits.
- Missing
shutdown()→ motors keep running after program exits - No stale timeout → motors hold last command forever if publisher dies
Example: Teleoperation
def teleop_tick(node):
joy = node.recv("joystick")
if joy:
cmd = horus.CmdVel(
linear=joy.axes[1] * 1.0, # Left stick Y → forward/back
angular=joy.axes[0] * 2.0, # Left stick X → turn
)
node.send("cmd_vel", cmd)
MotorCommand
Direct motor control — position, velocity, or torque mode.
import horus
cmd = horus.MotorCommand(
motor_id=1, # Motor identifier
mode=1, # 0=position, 1=velocity, 2=torque
target=5.0, # Target value (units depend on mode)
max_velocity=10.0, # rad/s — velocity limit
max_acceleration=50.0, # rad/s^2 — acceleration limit
feed_forward=0.1, # Feed-forward term
enable=True, # Motor enabled
)
Constructor: MotorCommand(motor_id=0, mode=0, target=0.0, max_velocity=inf, max_acceleration=inf, feed_forward=0.0, enable=True, timestamp_ns=0)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
motor_id | int | 0 | Motor identifier |
mode | int | 0 | Control mode (0=position, 1=velocity, 2=torque) |
target | float | 0.0 | Target value (units depend on mode) |
max_velocity | float | inf | Maximum velocity limit (rad/s) |
max_acceleration | float | inf | Maximum acceleration limit (rad/s^2) |
feed_forward | float | 0.0 | Feed-forward term |
enable | bool | True | Whether motor is enabled |
timestamp_ns | int | 0 | Timestamp in nanoseconds |
ServoCommand
Position command for hobby or industrial servos.
import horus
cmd = horus.ServoCommand(
servo_id=3, # Servo identifier
position=1.57, # rad — target position
speed=0.8, # Movement speed (0.0-1.0)
enable=True, # Servo enabled
)
Constructor: ServoCommand(servo_id=0, position=0.0, speed=0.5, enable=True, timestamp_ns=0)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
servo_id | int | 0 | Servo identifier |
position | float | 0.0 | Target position (rad) |
speed | float | 0.5 | Movement speed (0.0-1.0) |
enable | bool | True | Whether servo is enabled |
timestamp_ns | int | 0 | Timestamp in nanoseconds |
DifferentialDriveCommand
Direct left/right wheel velocity command.
import horus
cmd = horus.DifferentialDriveCommand(
left_velocity=0.5, # m/s — left wheel
right_velocity=0.3, # m/s — right wheel
max_acceleration=2.0, # m/s^2 — acceleration limit
enable=True, # Drive enabled
)
Constructor: DifferentialDriveCommand(left_velocity=0.0, right_velocity=0.0, max_acceleration=inf, enable=True, timestamp_ns=0)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
left_velocity | float | 0.0 | Left wheel velocity (m/s) |
right_velocity | float | 0.0 | Right wheel velocity (m/s) |
max_acceleration | float | inf | Maximum acceleration limit (m/s^2) |
enable | bool | True | Whether drive is enabled |
timestamp_ns | int | 0 | Timestamp in nanoseconds |
Use
CmdVelfor high-level velocity commands. UseDifferentialDriveCommandwhen your motor driver accepts raw wheel speeds.
PidConfig
PID controller configuration — gains, limits, and tuning.
import horus
pid = horus.PidConfig(
kp=2.0, # Proportional gain
ki=0.1, # Integral gain
kd=0.05, # Derivative gain
controller_id=1, # Controller identifier
integral_limit=10.0, # Anti-windup integral limit
output_limit=1.0, # Output clamp (symmetric)
anti_windup=True, # Anti-windup enabled
)
Constructor: PidConfig(kp=1.0, ki=0.0, kd=0.0, controller_id=0, integral_limit=inf, output_limit=inf, anti_windup=True, timestamp_ns=0)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
kp | float | 1.0 | Proportional gain |
ki | float | 0.0 | Integral gain |
kd | float | 0.0 | Derivative gain |
controller_id | int | 0 | Controller identifier |
integral_limit | float | inf | Anti-windup integral limit |
output_limit | float | inf | Output clamp (symmetric +/-) |
anti_windup | bool | True | Whether anti-windup is enabled |
timestamp_ns | int | 0 | Timestamp in nanoseconds |
JointCommand
Multi-joint position/velocity/effort command for robot arms.
import horus
cmd = horus.JointCommand(
names=["shoulder", "elbow", "wrist_1", "wrist_2", "wrist_3", "gripper"],
positions=[0.0, 0.5, -0.3, 1.2, 0.0, 0.0],
velocities=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
efforts=[], # Empty = no effort control
modes=[1, 1, 1, 1, 1, 1], # Per-joint control modes
)
Constructor: JointCommand(names=None, positions=None, velocities=None, efforts=None, modes=None, timestamp_ns=0)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
names | list[str] | None | Joint names |
positions | list[float] | None | Target joint positions (rad) |
velocities | list[float] | None | Max joint velocities (rad/s) |
efforts | list[float] | None | Joint torques (Nm) |
modes | list[int] | None | Per-joint control modes |
timestamp_ns | int | 0 | Timestamp in nanoseconds |
Example: Motor Control Node
import horus
def motor_tick(node):
cmd = node.recv("cmd_vel")
if cmd is None:
return
# Differential drive kinematics
wheel_base = 0.3 # meters
left = cmd.linear - cmd.angular * wheel_base / 2
right = cmd.linear + cmd.angular * wheel_base / 2
# Clamp to safe range
max_speed = 1.0
left = max(-max_speed, min(max_speed, left))
right = max(-max_speed, min(max_speed, right))
node.send("drive", horus.DifferentialDriveCommand(
left_velocity=left,
right_velocity=right,
enable=True,
))
def motor_shutdown(node):
# SAFETY: stop both motors
node.send("drive", horus.DifferentialDriveCommand(
left_velocity=0.0,
right_velocity=0.0,
enable=False,
))
motor = horus.Node(
name="motor_ctrl",
subs=[horus.CmdVel],
pubs=[horus.DifferentialDriveCommand],
tick=motor_tick,
shutdown=motor_shutdown,
rate=100,
order=10,
on_miss="safe_mode",
)
horus.run(motor, rt=True)
See Also
- Sensor Messages — Imu, LaserScan, Odometry
- Geometry Messages — Pose2D, Twist, Vector3
- Standard Messages — Full message catalog
- Differential Drive Recipe — Complete kinematics example
- Rust Control Messages — Rust equivalent