Navigation Messages
Navigation messages represent the "where do I go?" problem — goals, paths, maps, and planning. These types work together: set a NavGoal, follow a NavPath, build an OccupancyGrid, plan with CostMap.
When you need these: Any robot that moves autonomously. Mobile robots, delivery bots, drones with waypoint missions, cleaning robots with coverage plans.
from horus import (
NavGoal, GoalResult, Waypoint, NavPath,
PathPlan, OccupancyGrid, CostMap,
VelocityObstacle, VelocityObstacles,
)
NavGoal
A navigation goal with tolerance-based arrival checking. This is the core of every "go to point" behavior — you set a target, then check is_reached() every tick until the robot arrives.
Constructor
goal = NavGoal(x=10.0, y=5.0, theta=0.0,
position_tolerance=0.1, angle_tolerance=0.05)
position_tolerance: meters. The robot must be within this distance of (x, y).angle_tolerance: radians. The robot's heading must be within this of theta.
.is_reached(current_pose) — Has the Robot Arrived?
from horus import Pose2D
current = Pose2D(x=9.95, y=5.02, theta=0.01)
if goal.is_reached(current):
print("Goal reached!")
The single most important navigation method. Returns True when both conditions are met:
- Distance from current position to goal position <
position_tolerance - Angular difference between current heading and goal heading <
angle_tolerance
This is what you call every tick in a navigation loop. When it returns True, send a stop command.
Common mistake: Using
Pose2D.distance_to()instead ofis_reached(). Distance only checks position — your robot can be at the right spot but facing the wrong direction.is_reached()checks both.
.is_position_reached(current_pose) — Position Only
if goal.is_position_reached(current):
print("At the right position (heading may differ)")
Checks position tolerance only, ignores heading. Use this when heading doesn't matter — picking up an object (you just need to be close), reaching a charging dock (alignment handled by a separate docking controller).
.is_orientation_reached(current_pose) — Heading Only
if goal.is_orientation_reached(current):
print("Facing the right direction (position may differ)")
Checks heading tolerance only, ignores position. Use this after position is already correct — for example, rotating in place to face a docking station before approaching.
.with_timeout(seconds) — Set Time Limit
goal = NavGoal(x=10.0, y=5.0, theta=0.0,
position_tolerance=0.2).with_timeout(30.0)
Returns a new NavGoal with a timeout. The timeout value is stored on the goal — your navigation controller checks it and aborts if time runs out. This prevents the robot from circling forever trying to reach an unreachable goal.
.with_priority(level) — Set Priority
goal = goal.with_priority(1) # High priority
For multi-goal systems where goals can preempt each other. Higher priority goals get executed first.
Common mistake: Setting
position_tolerancetoo tight. With sensor noise and control imprecision, a tolerance of 0.01m (1cm) means the robot may never "arrive" — it oscillates around the target. Use 0.05-0.2m for wheeled robots, 0.01-0.05m for precise arms.
Example — Go-to-Goal Navigation Loop:
from horus import Node, run, NavGoal, Pose2D, CmdVel, Topic
import math
odom_topic = Topic(Pose2D)
cmd_topic = Topic(CmdVel)
goal = NavGoal(x=5.0, y=3.0, theta=0.0,
position_tolerance=0.2,
angle_tolerance=0.1).with_timeout(60.0)
def navigate(node):
current = odom_topic.recv(node)
if current is None:
return
if goal.is_reached(current):
cmd_topic.send(CmdVel.zero(), node) # Arrived — stop
return
# Simple proportional controller toward goal
dx = goal.x - current.x
dy = goal.y - current.y
target_angle = math.atan2(dy, dx)
angle_error = target_angle - current.theta
# Normalize angle error to [-pi, pi]
while angle_error > math.pi: angle_error -= 2 * math.pi
while angle_error < -math.pi: angle_error += 2 * math.pi
if abs(angle_error) > 0.3:
# Turn toward goal first
cmd_topic.send(CmdVel(linear=0.0, angular=0.5 * angle_error), node)
else:
# Drive toward goal
dist = current.distance_to(Pose2D(x=goal.x, y=goal.y, theta=0.0))
speed = min(0.5, dist) # Slow down near goal
cmd_topic.send(CmdVel(linear=speed, angular=0.3 * angle_error), node)
run(Node(tick=navigate, rate=20, pubs=["cmd_vel"], subs=["odom"]))
NavPath
An ordered sequence of waypoints for path following. Includes progress tracking for monitoring and pure-pursuit controllers.
Constructor + Building
from horus import NavPath, Waypoint
path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=2.0, y=0.0))
path.add_waypoint(Waypoint(x=2.0, y=3.0).with_stop()) # Stop at end
.closest_waypoint_index(current_pose) — Nearest Waypoint
from horus import Pose2D
current = Pose2D(x=1.5, y=0.2, theta=0.0)
idx = path.closest_waypoint_index(current)
print(f"Nearest waypoint: #{idx}")
Returns the index of the waypoint closest to the current pose, or None if the path is empty. This is the "lookahead" for pure-pursuit path following — you steer toward the closest waypoint, then advance to the next one once you pass it.
.calculate_progress(current_pose) — How Far Along the Path?
progress = path.calculate_progress(current)
print(f"Path progress: {progress*100:.0f}%")
Returns a float from 0.0 (at start) to 1.0 (at end). The progress is based on which waypoint is closest and how far between waypoints the robot is. Use this for:
- Progress bars in monitoring dashboards
- Deciding when to start slowing down (e.g., if progress > 0.9, reduce speed)
- Logging and analytics
Example — Path Following with Progress:
from horus import Node, run, NavPath, Waypoint, Pose2D, CmdVel, Topic
path = NavPath()
path.add_waypoint(Waypoint(x=0.0, y=0.0))
path.add_waypoint(Waypoint(x=3.0, y=0.0))
path.add_waypoint(Waypoint(x=3.0, y=4.0))
path.add_waypoint(Waypoint(x=0.0, y=4.0).with_stop())
odom = Topic(Pose2D)
cmd = Topic(CmdVel)
def follow_path(node):
current = odom.recv(node)
if current is None:
return
progress = path.calculate_progress(current)
if progress >= 0.98:
cmd.send(CmdVel.zero(), node)
return
# Steer toward closest waypoint (simplified)
idx = path.closest_waypoint_index(current)
if idx is not None:
speed = 0.3 if progress < 0.9 else 0.1 # Slow near end
cmd.send(CmdVel(linear=speed, angular=0.0), node)
run(Node(tick=follow_path, rate=10, pubs=["cmd_vel"], subs=["odom"]))
Waypoint
A single point on a path with optional velocity constraints.
Constructor
wp = Waypoint(x=5.0, y=3.0, theta=0.0)
.with_velocity(twist) — Set Desired Velocity
from horus import Twist
wp = Waypoint(x=5.0, y=3.0, theta=0.0).with_velocity(
Twist.new_2d(linear_x=0.5, angular_z=0.0)
)
Returns a new Waypoint with a velocity constraint. The path follower should try to match this velocity when passing through the waypoint. Use for smooth trajectories where speed varies (e.g., slow down for turns).
.with_stop() — Must Stop Here
wp = Waypoint(x=5.0, y=3.0, theta=0.0).with_stop()
Returns a new Waypoint that requires the robot to come to a complete stop. Use for the final waypoint, or at intermediate points where the robot must pause (e.g., intersection, pickup location).
GoalResult
Feedback from the navigation system about goal execution.
.with_error(message) — Attach Error Message
result = GoalResult(goal_id=1, status=3) # Aborted
result = result.with_error("Path blocked by obstacle")
Returns a new GoalResult with an error message. Use when the navigation fails — the message explains why (timeout, obstacle, invalid goal, etc.).
OccupancyGrid
A 2D grid map where each cell is free, occupied, or unknown. The foundation for obstacle avoidance and path planning.
Constructor
grid = OccupancyGrid(width=100, height=100, resolution=0.05)
# 100 × 100 cells at 5cm/cell = 5m × 5m map
Understanding the coordinate system:
resolution: meters per cell. 0.05 = each cell covers 5cm × 5cm.width×height: number of cells.width * resolution= map width in meters.- Origin is at the bottom-left corner of the grid.
.world_to_grid(x, y) — Meters to Cell Indices
gx, gy = grid.world_to_grid(2.5, 2.5)
# Returns cell indices (50, 50) for a 0.05m resolution grid
Converts world coordinates (meters) to grid cell indices. The formula: gx = (x - origin.x) / resolution. Returns None if the point is outside the grid bounds.
You need this every time you have a sensor reading in meters and want to mark it on the map.
.grid_to_world(grid_x, grid_y) — Cell Indices to Meters
wx, wy = grid.grid_to_world(50, 50)
# Returns (2.5, 2.5) — the center of cell (50, 50)
The inverse — converts grid indices back to world coordinates. Returns the center of the cell in meters.
.set_occupancy(grid_x, grid_y, value) — Mark a Cell
grid.set_occupancy(50, 50, 100) # Occupied
grid.set_occupancy(30, 30, 0) # Free
grid.set_occupancy(10, 10, -1) # Unknown
Takes grid indices (not meters). Values:
100= definitely occupied (wall, obstacle)0= definitely free (empty space)-1= unknown (not yet observed)- Values 1-99 = probability of occupancy (higher = more likely occupied)
.is_free(x, y) — Can the Robot Go Here?
if grid.is_free(2.5, 2.5):
print("Path is clear")
Takes world coordinates (meters), converts to grid internally, returns True if the cell value is 0 (free). This is the method you call in a path planner to check if a position is traversable.
Common mistake: Mixing up
set_occupancy()(takes grid indices) andis_free()(takes world meters). They use different coordinate systems —set_occupancy(50, 50, ...)andis_free(2.5, 2.5)refer to the same cell when resolution=0.05.
.is_occupied(x, y) — Is There an Obstacle?
if grid.is_occupied(2.5, 2.5):
print("Obstacle detected!")
Takes world coordinates. Returns True if the cell value is 100 (occupied).
.occupancy(grid_x, grid_y) — Raw Cell Value
value = grid.occupancy(50, 50) # Returns 0, 100, -1, or probability
Takes grid indices. Returns the raw occupancy value for fine-grained decisions.
Example — Building a Map from LiDAR:
from horus import LaserScan, OccupancyGrid, Pose2D
import math
grid = OccupancyGrid(width=200, height=200, resolution=0.05) # 10m × 10m
def update_map(scan, robot_pose):
for i in range(len(scan.ranges)):
if not scan.is_range_valid(i):
continue
angle = scan.angle_at(i) + robot_pose.theta
r = scan.ranges[i]
# Obstacle position in world frame
ox = robot_pose.x + r * math.cos(angle)
oy = robot_pose.y + r * math.sin(angle)
result = grid.world_to_grid(ox, oy)
if result is not None:
gx, gy = result
grid.set_occupancy(gx, gy, 100) # Mark occupied
CostMap
Inflated cost map for path planning. Built from an OccupancyGrid by expanding obstacles with a safety margin.
Constructor
costmap = CostMap(grid=grid, inflation_radius=0.3)
# Inflates obstacles by 30cm — robot won't plan paths within 30cm of walls
.cost(x, y) — Query Cost at a Position
c = costmap.cost(2.0, 2.0)
if c is not None and c > 200:
print("Too close to obstacle — find alternate path")
Takes world coordinates. Returns the cost at that position (0 = free, 254 = lethal/occupied, values in between = proximity to obstacles). Use this in a path planner to prefer paths that stay far from obstacles.
.compute_costs() — Recompute After Changes
costmap.compute_costs()
Recalculates the inflated cost layer from the underlying occupancy grid. Call this after modifying the grid (adding new obstacles from sensor data).
PathPlan
Planned path output from a path planning algorithm.
.add_waypoint(x, y, theta) — Add a Point
plan = PathPlan()
plan.add_waypoint(0.0, 0.0, 0.0)
plan.add_waypoint(3.0, 0.0, 0.0)
plan.add_waypoint(3.0, 4.0, 1.57)
.from_waypoints(waypoints, goal) — Build from Array
plan = PathPlan.from_waypoints(
waypoints=[[0.0, 0.0, 0.0], [3.0, 0.0, 0.0], [3.0, 4.0, 1.57]],
goal=[3.0, 4.0, 1.57],
)
.is_empty() — Check if Empty
if plan.is_empty():
print("No path found!")
VelocityObstacle, VelocityObstacles
Dynamic obstacle avoidance using the velocity obstacle model. These are POD types with field access only — used by reactive collision avoidance algorithms.
from horus import VelocityObstacle, VelocityObstacles
vo = VelocityObstacle() # position, velocity, radius fields
See Also
- Geometry Messages — Pose2D, CmdVel for goal checking and velocity commands
- Sensor Messages — LaserScan for building maps
- Control Messages — DifferentialDriveCommand for wheel-level control
- Rust Navigation Messages — Rust API reference