Tutorial 2: Motor Controller (Python)

Looking for the Rust version? See Tutorial 2: Build a Motor Controller.

Build a motor controller that subscribes to velocity commands, simulates motor physics, and publishes position/velocity feedback. Includes safe shutdown.

What You'll Learn

  • Subscribing to command topics
  • Publishing state feedback
  • Managing state between ticks
  • Multiple topics per node (sub + pub)
  • Safe shutdown for actuators

Step 1: Create the Project

horus new motor-tutorial -p
cd motor-tutorial

Step 2: Write the Code

Replace main.py:

import horus
import math

# ── Commander Node ───────────────────────────────────────────

def make_commander():
    """Generates sine-wave velocity commands."""
    t = [0.0]

    def tick(node):
        t[0] += horus.dt()
        velocity = 2.0 * math.sin(t[0] * 0.5)  # oscillate ±2 rad/s
        node.send("motor.cmd", {"velocity": velocity, "max_torque": 5.0})

    return horus.Node(name="Commander", tick=tick, rate=100, order=0,
                      pubs=["motor.cmd"])


# ── Motor Controller Node ───────────────────────────────────

def make_motor():
    """Simulates a motor: integrates velocity → position."""
    state = {"position": 0.0, "velocity": 0.0}

    def tick(node):
        cmd = node.recv("motor.cmd")
        if cmd is not None:
            state["velocity"] = cmd["velocity"]

        # Integrate velocity → position
        dt = horus.dt()
        state["position"] += state["velocity"] * dt

        # Publish state feedback
        node.send("motor.state", {
            "position": state["position"],
            "velocity": state["velocity"],
            "timestamp": horus.now(),
        })

    def shutdown(node):
        # SAFETY: stop the motor before exiting
        state["velocity"] = 0.0
        node.send("motor.cmd", {"velocity": 0.0, "max_torque": 0.0})
        print("Motor stopped safely")

    return horus.Node(name="MotorController", tick=tick, shutdown=shutdown,
                      rate=100, order=1,
                      subs=["motor.cmd"], pubs=["motor.state"])


# ── Display Node ────────────────────────────────────────────

def make_display():
    """Prints motor state every 50 samples."""
    count = [0]

    def tick(node):
        msg = node.recv("motor.state")
        if msg is not None:
            count[0] += 1
            if count[0] % 50 == 0:
                print(f"[#{count[0]}] pos={msg['position']:.2f} rad"
                      f"  vel={msg['velocity']:.2f} rad/s")

    return horus.Node(name="StateDisplay", tick=tick, rate=100, order=2,
                      subs=["motor.state"])


# ── Main ────────────────────────────────────────────────────

print("Starting motor controller tutorial...\n")
horus.run(make_commander(), make_motor(), make_display())

Step 3: Run It

horus run
Starting motor controller tutorial...

[#50] pos=0.12 rad  vel=0.50 rad/s
[#100] pos=0.95 rad  vel=1.73 rad/s
[#150] pos=2.84 rad  vel=1.98 rad/s
[#200] pos=4.76 rad  vel=0.97 rad/s

Press Ctrl+C — you'll see "Motor stopped safely" confirming the shutdown callback ran.

Understanding the Code

Safe Shutdown

The shutdown callback is critical for actuator nodes. When you press Ctrl+C, HORUS calls shutdown() on every node before exiting:

def shutdown(node):
    state["velocity"] = 0.0
    node.send("motor.cmd", {"velocity": 0.0, "max_torque": 0.0})
    print("Motor stopped safely")

Without this, a real motor would continue at its last commanded velocity.

Data Flow

Commander (order=0) → motor.cmd → MotorController (order=1) → motor.state → Display (order=2)

Lower order numbers run first each tick, ensuring the commander publishes before the controller reads.

State Between Ticks

The state dict persists across ticks via closure. The motor integrates velocity into position every tick:

state["position"] += state["velocity"] * dt

Using a Class for State

For complex nodes, a class is cleaner than closures. Here is a complete motor controller rewritten as a class:

import horus

class MotorController:
    def __init__(self):
        self.kp = 1.5
        self.target_speed = 0.0

    def tick(self, node):
        cmd = node.recv("cmd_vel")
        if cmd:
            self.target_speed = cmd.linear
        error = self.target_speed - self.current_speed()
        node.send("motor_cmd", {"rpm": error * self.kp})

    def shutdown(self, node):
        node.send("motor_cmd", {"rpm": 0.0})
        node.log_info("Motors zeroed")

    def current_speed(self):
        # In a real system, read from an encoder
        return 0.0

motor = MotorController()
node = horus.Node(
    name="motor",
    pubs=["motor_cmd"],
    subs=["cmd_vel"],
    tick=motor.tick,
    shutdown=motor.shutdown,
    rate=100
)
horus.run(node)

Python automatically binds self when you pass motor.tick as the callback. The scheduler calls tick(node) which becomes motor.tick(node) with self bound. This means your tick function receives both the instance state (self) and the HORUS node handle (node) without any extra wiring.

Lifecycle: Shutdown Guarantees

The scheduler calls shutdown(node) on Ctrl+C, SIGTERM, or when the run duration expires. If shutdown raises an exception, it is caught and logged — other nodes still shut down normally. This means you can safely send final commands (like zeroing motors) without worrying about one node's failure preventing cleanup of the others.

Next Steps


See Also