Control Messages

Command messages for motors, servos, and actuators. All are typed, zero-copy, and binary-compatible with Rust.

from horus import CmdVel, MotorCommand, ServoCommand, DifferentialDriveCommand, PidConfig, JointCommand

CmdVel

2D velocity command — the most common control message in robotics. Used for differential drive, holonomic, and Ackermann robots.

import horus

cmd = horus.CmdVel(linear=0.5, angular=0.3)

print(f"Forward: {cmd.linear} m/s")
print(f"Turn: {cmd.angular} rad/s")

Fields:

FieldTypeUnitDescription
linearfloatm/sForward/backward velocity
angularfloatrad/sRotational velocity (counter-clockwise positive)

ROS2 equivalent: geometry_msgs/msg/Twist (2D simplified)

Common rates: 10-1000 Hz depending on controller. Motor control at 100+ Hz, teleoperation at 10-30 Hz.

Safety patterns:

# ALWAYS clamp before sending to hardware
MAX_LINEAR = 1.0   # m/s
MAX_ANGULAR = 2.0  # rad/s

def safe_cmd(linear, angular):
    return horus.CmdVel(
        linear=max(-MAX_LINEAR, min(MAX_LINEAR, linear)),
        angular=max(-MAX_ANGULAR, min(MAX_ANGULAR, angular)),
    )

# ALWAYS send zero in shutdown
def motor_shutdown(node):
    node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))

# Stale command watchdog — zero motors if no command for 500ms
stale_ticks = [0]
def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd is None:
        stale_ticks[0] += 1
        if stale_ticks[0] > 50:  # 500ms at 100Hz
            node.send("motor", horus.CmdVel(linear=0.0, angular=0.0))
        return
    stale_ticks[0] = 0
    node.send("motor", safe_cmd(cmd.linear, cmd.angular))

Common pitfalls:

  • Sending unclamped values → motor damage. Always clamp to hardware limits.
  • Missing shutdown() → motors keep running after program exits
  • No stale timeout → motors hold last command forever if publisher dies

Example: Teleoperation

def teleop_tick(node):
    joy = node.recv("joystick")
    if joy:
        cmd = horus.CmdVel(
            linear=joy.axes[1] * 1.0,    # Left stick Y → forward/back
            angular=joy.axes[0] * 2.0,   # Left stick X → turn
        )
        node.send("cmd_vel", cmd)

MotorCommand

Direct motor control — position, velocity, or torque mode.

import horus

cmd = horus.MotorCommand(
    motor_id=1,              # Motor identifier
    mode=1,                  # 0=position, 1=velocity, 2=torque
    target=5.0,              # Target value (units depend on mode)
    max_velocity=10.0,       # rad/s — velocity limit
    max_acceleration=50.0,   # rad/s^2 — acceleration limit
    feed_forward=0.1,        # Feed-forward term
    enable=True,             # Motor enabled
)

Constructor: MotorCommand(motor_id=0, mode=0, target=0.0, max_velocity=inf, max_acceleration=inf, feed_forward=0.0, enable=True, timestamp_ns=0)

Fields:

FieldTypeDefaultDescription
motor_idint0Motor identifier
modeint0Control mode (0=position, 1=velocity, 2=torque)
targetfloat0.0Target value (units depend on mode)
max_velocityfloatinfMaximum velocity limit (rad/s)
max_accelerationfloatinfMaximum acceleration limit (rad/s^2)
feed_forwardfloat0.0Feed-forward term
enableboolTrueWhether motor is enabled
timestamp_nsint0Timestamp in nanoseconds

ServoCommand

Position command for hobby or industrial servos.

import horus

cmd = horus.ServoCommand(
    servo_id=3,         # Servo identifier
    position=1.57,      # rad — target position
    speed=0.8,          # Movement speed (0.0-1.0)
    enable=True,        # Servo enabled
)

Constructor: ServoCommand(servo_id=0, position=0.0, speed=0.5, enable=True, timestamp_ns=0)

Fields:

FieldTypeDefaultDescription
servo_idint0Servo identifier
positionfloat0.0Target position (rad)
speedfloat0.5Movement speed (0.0-1.0)
enableboolTrueWhether servo is enabled
timestamp_nsint0Timestamp in nanoseconds

DifferentialDriveCommand

Direct left/right wheel velocity command.

import horus

cmd = horus.DifferentialDriveCommand(
    left_velocity=0.5,       # m/s — left wheel
    right_velocity=0.3,      # m/s — right wheel
    max_acceleration=2.0,    # m/s^2 — acceleration limit
    enable=True,             # Drive enabled
)

Constructor: DifferentialDriveCommand(left_velocity=0.0, right_velocity=0.0, max_acceleration=inf, enable=True, timestamp_ns=0)

Fields:

FieldTypeDefaultDescription
left_velocityfloat0.0Left wheel velocity (m/s)
right_velocityfloat0.0Right wheel velocity (m/s)
max_accelerationfloatinfMaximum acceleration limit (m/s^2)
enableboolTrueWhether drive is enabled
timestamp_nsint0Timestamp in nanoseconds

Use CmdVel for high-level velocity commands. Use DifferentialDriveCommand when your motor driver accepts raw wheel speeds.


PidConfig

PID controller configuration — gains, limits, and tuning.

import horus

pid = horus.PidConfig(
    kp=2.0,              # Proportional gain
    ki=0.1,              # Integral gain
    kd=0.05,             # Derivative gain
    controller_id=1,     # Controller identifier
    integral_limit=10.0, # Anti-windup integral limit
    output_limit=1.0,    # Output clamp (symmetric)
    anti_windup=True,     # Anti-windup enabled
)

Constructor: PidConfig(kp=1.0, ki=0.0, kd=0.0, controller_id=0, integral_limit=inf, output_limit=inf, anti_windup=True, timestamp_ns=0)

Fields:

FieldTypeDefaultDescription
kpfloat1.0Proportional gain
kifloat0.0Integral gain
kdfloat0.0Derivative gain
controller_idint0Controller identifier
integral_limitfloatinfAnti-windup integral limit
output_limitfloatinfOutput clamp (symmetric +/-)
anti_windupboolTrueWhether anti-windup is enabled
timestamp_nsint0Timestamp in nanoseconds

JointCommand

Multi-joint position/velocity/effort command for robot arms.

import horus

cmd = horus.JointCommand(
    names=["shoulder", "elbow", "wrist_1", "wrist_2", "wrist_3", "gripper"],
    positions=[0.0, 0.5, -0.3, 1.2, 0.0, 0.0],
    velocities=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
    efforts=[],    # Empty = no effort control
    modes=[1, 1, 1, 1, 1, 1],  # Per-joint control modes
)

Constructor: JointCommand(names=None, positions=None, velocities=None, efforts=None, modes=None, timestamp_ns=0)

Fields:

FieldTypeDefaultDescription
nameslist[str]NoneJoint names
positionslist[float]NoneTarget joint positions (rad)
velocitieslist[float]NoneMax joint velocities (rad/s)
effortslist[float]NoneJoint torques (Nm)
modeslist[int]NonePer-joint control modes
timestamp_nsint0Timestamp in nanoseconds

Example: Motor Control Node

import horus

def motor_tick(node):
    cmd = node.recv("cmd_vel")
    if cmd is None:
        return

    # Differential drive kinematics
    wheel_base = 0.3  # meters
    left = cmd.linear - cmd.angular * wheel_base / 2
    right = cmd.linear + cmd.angular * wheel_base / 2

    # Clamp to safe range
    max_speed = 1.0
    left = max(-max_speed, min(max_speed, left))
    right = max(-max_speed, min(max_speed, right))

    node.send("drive", horus.DifferentialDriveCommand(
        left_velocity=left,
        right_velocity=right,
        enable=True,
    ))

def motor_shutdown(node):
    # SAFETY: stop both motors
    node.send("drive", horus.DifferentialDriveCommand(
        left_velocity=0.0,
        right_velocity=0.0,
        enable=False,
    ))

motor = horus.Node(
    name="motor_ctrl",
    subs=[horus.CmdVel],
    pubs=[horus.DifferentialDriveCommand],
    tick=motor_tick,
    shutdown=motor_shutdown,
    rate=100,
    order=10,
    on_miss="safe_mode",
)
horus.run(motor, rt=True)

See Also