Navigation Messages

Navigation messages represent the "where do I go?" problem — goals, paths, maps, and planning. These types work together: set a NavGoal, follow a NavPath, build an OccupancyGrid, plan with CostMap.

When you need these: Any robot that moves autonomously. Mobile robots, delivery bots, drones with waypoint missions, cleaning robots with coverage plans.

from horus import (
    NavGoal, GoalResult, Waypoint, NavPath,
    PathPlan, OccupancyGrid, CostMap,
    VelocityObstacle, VelocityObstacles,
)

A navigation goal with tolerance-based arrival checking. This is the core of every "go to point" behavior — you set a target, then check is_reached() every tick until the robot arrives.

Constructor

goal = NavGoal(x=10.0, y=5.0, theta=0.0,
               position_tolerance=0.1, angle_tolerance=0.05)
  • position_tolerance: meters. The robot must be within this distance of (x, y).
  • angle_tolerance: radians. The robot's heading must be within this of theta.

.is_reached(current_pose) — Has the Robot Arrived?

from horus import Pose2D
current = Pose2D(x=9.95, y=5.02, theta=0.01)
if goal.is_reached(current):
    print("Goal reached!")

The single most important navigation method. Returns True when both conditions are met:

  1. Distance from current position to goal position < position_tolerance
  2. Angular difference between current heading and goal heading < angle_tolerance

This is what you call every tick in a navigation loop. When it returns True, send a stop command.

Common mistake: Using Pose2D.distance_to() instead of is_reached(). Distance only checks position — your robot can be at the right spot but facing the wrong direction. is_reached() checks both.

.is_position_reached(current_pose) — Position Only

if goal.is_position_reached(current):
    print("At the right position (heading may differ)")

Checks position tolerance only, ignores heading. Use this when heading doesn't matter — picking up an object (you just need to be close), reaching a charging dock (alignment handled by a separate docking controller).

.is_orientation_reached(current_pose) — Heading Only

if goal.is_orientation_reached(current):
    print("Facing the right direction (position may differ)")

Checks heading tolerance only, ignores position. Use this after position is already correct — for example, rotating in place to face a docking station before approaching.

.with_timeout(seconds) — Set Time Limit

goal = NavGoal(x=10.0, y=5.0, theta=0.0,
               position_tolerance=0.2).with_timeout(30.0)

Returns a new NavGoal with a timeout. The timeout value is stored on the goal — your navigation controller checks it and aborts if time runs out. This prevents the robot from circling forever trying to reach an unreachable goal.

.with_priority(level) — Set Priority

goal = goal.with_priority(1)  # High priority

For multi-goal systems where goals can preempt each other. Higher priority goals get executed first.

Common mistake: Setting position_tolerance too tight. With sensor noise and control imprecision, a tolerance of 0.01m (1cm) means the robot may never "arrive" — it oscillates around the target. Use 0.05-0.2m for wheeled robots, 0.01-0.05m for precise arms.

Example — Go-to-Goal Navigation Loop:

from horus import Node, run, NavGoal, Pose2D, CmdVel, Topic
import math

odom_topic = Topic(Pose2D)
cmd_topic = Topic(CmdVel)

goal = NavGoal(x=5.0, y=3.0, theta=0.0,
               position_tolerance=0.2,
               angle_tolerance=0.1).with_timeout(60.0)

def navigate(node):
    current = odom_topic.recv(node)
    if current is None:
        return

    if goal.is_reached(current):
        cmd_topic.send(CmdVel.zero(), node)  # Arrived — stop
        return

    # Simple proportional controller toward goal
    dx = goal.x - current.x
    dy = goal.y - current.y
    target_angle = math.atan2(dy, dx)
    angle_error = target_angle - current.theta

    # Normalize angle error to [-pi, pi]
    while angle_error > math.pi: angle_error -= 2 * math.pi
    while angle_error < -math.pi: angle_error += 2 * math.pi

    if abs(angle_error) > 0.3:
        # Turn toward goal first
        cmd_topic.send(CmdVel(linear=0.0, angular=0.5 * angle_error), node)
    else:
        # Drive toward goal
        dist = current.distance_to(Pose2D(x=goal.x, y=goal.y, theta=0.0))
        speed = min(0.5, dist)  # Slow down near goal
        cmd_topic.send(CmdVel(linear=speed, angular=0.3 * angle_error), node)

run(Node(tick=navigate, rate=20, pubs=["cmd_vel"], subs=["odom"]))

An ordered sequence of waypoints for path following. Includes progress tracking for monitoring and pure-pursuit controllers.

Constructor + Building

from horus import NavPath, Waypoint
path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=2.0, y=0.0))
path.add_waypoint(Waypoint(x=2.0, y=3.0).with_stop())  # Stop at end

.closest_waypoint_index(current_pose) — Nearest Waypoint

from horus import Pose2D
current = Pose2D(x=1.5, y=0.2, theta=0.0)
idx = path.closest_waypoint_index(current)
print(f"Nearest waypoint: #{idx}")

Returns the index of the waypoint closest to the current pose, or None if the path is empty. This is the "lookahead" for pure-pursuit path following — you steer toward the closest waypoint, then advance to the next one once you pass it.

.calculate_progress(current_pose) — How Far Along the Path?

progress = path.calculate_progress(current)
print(f"Path progress: {progress*100:.0f}%")

Returns a float from 0.0 (at start) to 1.0 (at end). The progress is based on which waypoint is closest and how far between waypoints the robot is. Use this for:

  • Progress bars in monitoring dashboards
  • Deciding when to start slowing down (e.g., if progress > 0.9, reduce speed)
  • Logging and analytics

Example — Path Following with Progress:

from horus import Node, run, NavPath, Waypoint, Pose2D, CmdVel, Topic

path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=3.0, y=0.0))
path.add_waypoint(Waypoint(x=3.0, y=4.0))
path.add_waypoint(Waypoint(x=0.0, y=4.0).with_stop())

odom = Topic(Pose2D)
cmd = Topic(CmdVel)

def follow_path(node):
    current = odom.recv(node)
    if current is None:
        return
    progress = path.calculate_progress(current)
    if progress >= 0.98:
        cmd.send(CmdVel.zero(), node)
        return
    # Steer toward closest waypoint (simplified)
    idx = path.closest_waypoint_index(current)
    if idx is not None:
        speed = 0.3 if progress < 0.9 else 0.1  # Slow near end
        cmd.send(CmdVel(linear=speed, angular=0.0), node)

run(Node(tick=follow_path, rate=10, pubs=["cmd_vel"], subs=["odom"]))

Waypoint

A single point on a path with optional velocity constraints.

Constructor

wp = Waypoint(x=5.0, y=3.0, theta=0.0)

.with_velocity(twist) — Set Desired Velocity

from horus import Twist
wp = Waypoint(x=5.0, y=3.0, theta=0.0).with_velocity(
    Twist.new_2d(linear_x=0.5, angular_z=0.0)
)

Returns a new Waypoint with a velocity constraint. The path follower should try to match this velocity when passing through the waypoint. Use for smooth trajectories where speed varies (e.g., slow down for turns).

.with_stop() — Must Stop Here

wp = Waypoint(x=5.0, y=3.0, theta=0.0).with_stop()

Returns a new Waypoint that requires the robot to come to a complete stop. Use for the final waypoint, or at intermediate points where the robot must pause (e.g., intersection, pickup location).


GoalResult

Feedback from the navigation system about goal execution.

.with_error(message) — Attach Error Message

result = GoalResult(goal_id=1, status=3)  # Aborted
result = result.with_error("Path blocked by obstacle")

Returns a new GoalResult with an error message. Use when the navigation fails — the message explains why (timeout, obstacle, invalid goal, etc.).


OccupancyGrid

A 2D grid map where each cell is free, occupied, or unknown. The foundation for obstacle avoidance and path planning.

Constructor

grid = OccupancyGrid(width=100, height=100, resolution=0.05)
# 100 × 100 cells at 5cm/cell = 5m × 5m map

Understanding the coordinate system:

  • resolution: meters per cell. 0.05 = each cell covers 5cm × 5cm.
  • width × height: number of cells. width * resolution = map width in meters.
  • Origin is at the bottom-left corner of the grid.

.world_to_grid(x, y) — Meters to Cell Indices

gx, gy = grid.world_to_grid(2.5, 2.5)
# Returns cell indices (50, 50) for a 0.05m resolution grid

Converts world coordinates (meters) to grid cell indices. The formula: gx = (x - origin.x) / resolution. Returns None if the point is outside the grid bounds.

You need this every time you have a sensor reading in meters and want to mark it on the map.

.grid_to_world(grid_x, grid_y) — Cell Indices to Meters

wx, wy = grid.grid_to_world(50, 50)
# Returns (2.5, 2.5) — the center of cell (50, 50)

The inverse — converts grid indices back to world coordinates. Returns the center of the cell in meters.

.set_occupancy(grid_x, grid_y, value) — Mark a Cell

grid.set_occupancy(50, 50, 100)   # Occupied
grid.set_occupancy(30, 30, 0)     # Free
grid.set_occupancy(10, 10, -1)    # Unknown

Takes grid indices (not meters). Values:

  • 100 = definitely occupied (wall, obstacle)
  • 0 = definitely free (empty space)
  • -1 = unknown (not yet observed)
  • Values 1-99 = probability of occupancy (higher = more likely occupied)

.is_free(x, y) — Can the Robot Go Here?

if grid.is_free(2.5, 2.5):
    print("Path is clear")

Takes world coordinates (meters), converts to grid internally, returns True if the cell value is 0 (free). This is the method you call in a path planner to check if a position is traversable.

Common mistake: Mixing up set_occupancy() (takes grid indices) and is_free() (takes world meters). They use different coordinate systems — set_occupancy(50, 50, ...) and is_free(2.5, 2.5) refer to the same cell when resolution=0.05.

.is_occupied(x, y) — Is There an Obstacle?

if grid.is_occupied(2.5, 2.5):
    print("Obstacle detected!")

Takes world coordinates. Returns True if the cell value is 100 (occupied).

.occupancy(grid_x, grid_y) — Raw Cell Value

value = grid.occupancy(50, 50)  # Returns 0, 100, -1, or probability

Takes grid indices. Returns the raw occupancy value for fine-grained decisions.

Example — Building a Map from LiDAR:

from horus import LaserScan, OccupancyGrid, Pose2D
import math

grid = OccupancyGrid(width=200, height=200, resolution=0.05)  # 10m × 10m

def update_map(scan, robot_pose):
    for i in range(len(scan.ranges)):
        if not scan.is_range_valid(i):
            continue
        angle = scan.angle_at(i) + robot_pose.theta
        r = scan.ranges[i]
        # Obstacle position in world frame
        ox = robot_pose.x + r * math.cos(angle)
        oy = robot_pose.y + r * math.sin(angle)
        result = grid.world_to_grid(ox, oy)
        if result is not None:
            gx, gy = result
            grid.set_occupancy(gx, gy, 100)  # Mark occupied

CostMap

Inflated cost map for path planning. Built from an OccupancyGrid by expanding obstacles with a safety margin.

Constructor

costmap = CostMap(grid=grid, inflation_radius=0.3)
# Inflates obstacles by 30cm — robot won't plan paths within 30cm of walls

.cost(x, y) — Query Cost at a Position

c = costmap.cost(2.0, 2.0)
if c is not None and c > 200:
    print("Too close to obstacle — find alternate path")

Takes world coordinates. Returns the cost at that position (0 = free, 254 = lethal/occupied, values in between = proximity to obstacles). Use this in a path planner to prefer paths that stay far from obstacles.

.compute_costs() — Recompute After Changes

costmap.compute_costs()

Recalculates the inflated cost layer from the underlying occupancy grid. Call this after modifying the grid (adding new obstacles from sensor data).


PathPlan

Planned path output from a path planning algorithm.

.add_waypoint(x, y, theta) — Add a Point

plan = PathPlan()
plan.add_waypoint(0.0, 0.0, 0.0)
plan.add_waypoint(3.0, 0.0, 0.0)
plan.add_waypoint(3.0, 4.0, 1.57)

.from_waypoints(waypoints, goal) — Build from Array

plan = PathPlan.from_waypoints(
    waypoints=[[0.0, 0.0, 0.0], [3.0, 0.0, 0.0], [3.0, 4.0, 1.57]],
    goal=[3.0, 4.0, 1.57],
)

.is_empty() — Check if Empty

if plan.is_empty():
    print("No path found!")

VelocityObstacle, VelocityObstacles

Dynamic obstacle avoidance using the velocity obstacle model. These are POD types with field access only — used by reactive collision avoidance algorithms.

from horus import VelocityObstacle, VelocityObstacles
vo = VelocityObstacle()  # position, velocity, radius fields

See Also