Coordinate Transforms (Python)

Build a robot's coordinate frame tree using horus.TransformFrame. Register static sensor mounts (camera, LiDAR) and dynamic frames (base link, arm joint). A publisher node updates dynamic transforms each tick, and a consumer node queries transforms to convert points between frames.

Problem

You need to convert sensor data between coordinate frames (e.g., a camera detection into world coordinates) from Python.

When To Use

  • Robots with sensors mounted at known offsets from the base
  • Articulated arms where end-effector pose depends on joint angles
  • Any system that needs spatial reasoning between components

Prerequisites

  • HORUS installed (Installation Guide)
  • Basic understanding of 3D transforms (translation + rotation)

horus.toml

[package]
name = "transform-frames-py"
version = "0.1.0"
description = "Coordinate frame tree with sensor mounts"
language = "python"

Complete Code

#!/usr/bin/env python3
"""Coordinate frame tree: static mounts + dynamic joints + point transforms."""

import math
import horus
from horus import Node, Scheduler, TransformFrame, Transform, us, ms

# ── Shared TransformFrame instance ───────────────────────────

tf = TransformFrame()

# ============================================================================
# Node 1: FramePublisher — registers and updates the coordinate frame tree
# ============================================================================

tick_pub = [0]

def publisher_init(node):
    """Register the frame hierarchy.

    Frame tree:
        world (root)
          └── base_link (dynamic — robot moves in world)
                ├── camera_link (static — bolted to chassis)
                ├── lidar_link (static — mounted on top)
                └── arm_link (dynamic — joint rotates)
                      └── end_effector (dynamic — tool tip)
    """
    # Root frame
    tf.register_frame("world")

    # Robot base — dynamic, position changes as robot drives
    tf.register_frame("base_link", parent="world")

    # Camera — static mount: 10cm forward, 30cm up, tilted down 15 degrees
    camera_tf = Transform.from_euler(
        translation=[0.1, 0.0, 0.3],
        rpy=[0.0, 0.26, 0.0],        # roll=0, pitch=15deg, yaw=0
    )
    tf.register_static_frame("camera_link", camera_tf, parent="base_link")

    # LiDAR — static mount: centered on top, 40cm up
    lidar_tf = Transform.from_translation([0.0, 0.0, 0.4])
    tf.register_static_frame("lidar_link", lidar_tf, parent="base_link")

    # Arm link — dynamic, rotates around Z axis
    tf.register_frame("arm_link", parent="base_link")

    # End effector — dynamic, extends from arm
    tf.register_frame("end_effector", parent="arm_link")

    print(f"Frame tree registered — {tf.frame_count()} frames")
    print(tf.format_tree())

def publisher_tick(node):
    tick_pub[0] += 1
    t = tick_pub[0] * 0.01          # 100 Hz -> 10 ms per tick

    # Update base_link: robot drives in a circle
    radius = 2.0
    speed = 0.2                      # rad/s
    base_x = radius * math.cos(speed * t)
    base_y = radius * math.sin(speed * t)
    base_yaw = speed * t + math.pi / 2   # face tangent direction

    base_tf = Transform.from_euler(
        translation=[base_x, base_y, 0.0],
        rpy=[0.0, 0.0, base_yaw],
    )
    tf.update_transform("base_link", base_tf)

    # Update arm_link: joint sweeps back and forth
    arm_angle = 0.8 * math.sin(t * 0.5)  # +/- 0.8 rad
    arm_tf = Transform.from_euler(
        translation=[0.15, 0.0, 0.2],
        rpy=[0.0, 0.0, arm_angle],
    )
    tf.update_transform("arm_link", arm_tf)

    # Update end_effector: extends from arm tip
    ee_tf = Transform.from_translation([0.3, 0.0, 0.0])  # 30cm arm
    tf.update_transform("end_effector", ee_tf)

    if tick_pub[0] % 100 == 0:
        print(
            f"[FRAMES] base=({base_x:.2f}, {base_y:.2f}), "
            f"arm_angle={arm_angle:.2f} rad"
        )

# ============================================================================
# Node 2: FrameUser — queries transforms and converts sensor data
# ============================================================================

tick_user = [0]

def user_tick(node):
    tick_user[0] += 1

    # Simulate a detection at a fixed point in camera frame
    # (1.5m forward, 0.2m left from camera)
    detection_camera = [1.5, 0.2, 0.0]

    # Transform the detection from camera_link to world frame
    if tf.can_transform("camera_link", "world"):
        world_point = tf.transform_point("camera_link", "world", detection_camera)
        node.send("detection.world", {
            "x": world_point[0],
            "y": world_point[1],
            "z": world_point[2],
        })

        if tick_user[0] % 100 == 0:
            print(
                f"[USER] Camera ({detection_camera[0]:.2f}, "
                f"{detection_camera[1]:.2f}, {detection_camera[2]:.2f}) "
                f"-> World ({world_point[0]:.2f}, {world_point[1]:.2f}, "
                f"{world_point[2]:.2f})"
            )

    # Query end effector position in world frame
    if tick_user[0] % 100 == 0 and tf.can_transform("end_effector", "world"):
        ee_tf = tf.tf("end_effector", "world")
        ee_pos = ee_tf.translation
        print(
            f"[USER] End effector in world: "
            f"({ee_pos[0]:.2f}, {ee_pos[1]:.2f}, {ee_pos[2]:.2f})"
        )

        # Print the frame chain for debugging
        chain = tf.frame_chain("end_effector", "world")
        print(f"[USER] Frame chain: {' -> '.join(chain)}")

    # Check for stale transforms (sensor disconnect detection)
    if tick_user[0] % 200 == 0:
        if tf.is_stale("base_link", max_age_sec=0.5):
            print("WARNING: base_link transform is stale — odometry may be disconnected")

def user_shutdown(node):
    print(f"FrameUser: {tick_user[0]} ticks")
    print(tf.format_tree())

# ============================================================================
# Main — two nodes sharing the same TransformFrame
# ============================================================================

publisher_node = Node(
    name="FramePublisher",
    tick=publisher_tick,
    init=publisher_init,
    rate=100,                      # 100 Hz — updates dynamic transforms
    order=0,                       # Runs first
    pubs=[],
    subs=[],
    on_miss="skip",
)

user_node = Node(
    name="FrameUser",
    tick=user_tick,
    shutdown=user_shutdown,
    rate=50,                       # 50 Hz — queries are cheaper than updates
    order=1,                       # Runs after publisher
    pubs=["detection.world"],
    subs=[],
    on_miss="warn",
)

if __name__ == "__main__":
    horus.run(publisher_node, user_node)

Expected Output

Frame tree registered — 6 frames
world
  └── base_link
        ├── camera_link [static]
        ├── lidar_link [static]
        └── arm_link
              └── end_effector
[HORUS] Scheduler running — tick_rate: 1000 Hz
[HORUS] Node "FramePublisher" started (100 Hz)
[HORUS] Node "FrameUser" started (50 Hz)
[FRAMES] base=(2.00, 0.00), arm_angle=0.00 rad
[USER] Camera (1.50, 0.20, 0.00) -> World (2.07, 0.22, 0.30)
[USER] End effector in world: (2.43, 0.03, 0.20)
[USER] Frame chain: end_effector -> arm_link -> base_link -> world
[FRAMES] base=(1.96, 0.39), arm_angle=0.39 rad
[USER] Camera (1.50, 0.20, 0.00) -> World (1.89, 0.65, 0.30)
[USER] End effector in world: (2.18, 0.57, 0.20)
[USER] Frame chain: end_effector -> arm_link -> base_link -> world
^C
FrameUser: 500 ticks
world
  └── base_link
        ├── camera_link [static]
        ├── lidar_link [static]
        └── arm_link
              └── end_effector
[HORUS] Shutting down...

Key Points

  • TransformFrame is shared between nodes — both publisher and user reference the same tree. The underlying Rust implementation is lock-free, so concurrent access is safe.
  • Static vs dynamic frames: camera_link and lidar_link use register_static_frame() because they are bolted to the chassis. Dynamic frames use register_frame() + update_transform().
  • Transform.from_euler(translation, rpy) creates a transform from position and roll/pitch/yaw angles.
  • transform_point(src, dst, point) converts a 3D point from one frame to another, walking the tree automatically.
  • can_transform() checks before querying — avoids exceptions when frames are not yet updated.
  • is_stale() catches disconnected sensors or frozen publishers without polling.
  • frame_chain() returns the path between frames — useful for debugging unexpected transform results.

Variations

  • URDF loading: Parse a URDF file at init to build the frame tree automatically
  • Multi-process frames: Publish TransformStamped messages on a shared topic so multiple processes share transforms
  • Time-based queries: Use tf_at() to query transforms at a specific timestamp for interpolation
  • Visualization: Publish frame positions to a topic for rendering in horus-sim3d

Common Errors

SymptomCauseFix
HorusTransformErrorFrame not registered or parent chain incompleteVerify all frames are registered in init() with correct parent names
Stale transform warningsPublisher stopped updating a dynamic frameCheck that update_transform() runs every tick for dynamic frames
Wrong point coordinatesFrame chain goes through unexpected intermediate framesUse frame_chain() to debug the actual path
Static transforms changeUsing update_transform() on a static frameUse register_static_frame() for fixed sensor mounts
can_transform() returns FalseDynamic frame never received its first updateEnsure publisher init() or first tick() updates all dynamic frames

See Also