Sensor Messages
Sensor messages carry raw and processed data from your robot's sensors ā LiDAR scans, IMU readings, GPS positions, battery levels, and more. These are the inputs to every robotics pipeline: perception, navigation, monitoring.
When you need these: Every robot has sensors. If it moves, you need Odometry. If it has a lidar, you need LaserScan. If it has an IMU, you need Imu. If it runs on battery, you need BatteryState.
from horus import (
LaserScan, Imu, Odometry, JointState,
BatteryState, NavSatFix, RangeSensor,
MagneticField, Temperature, FluidPressure, Illuminance,
Clock, TimeReference,
)
LaserScan
2D LiDAR scan ā an array of range readings at known angles. The most common sensor for obstacle avoidance in mobile robotics.
Constructor
import math
scan = LaserScan(
angle_min=-math.pi, # Start angle (radians)
angle_max=math.pi, # End angle (radians)
range_min=0.1, # Minimum valid range (meters)
range_max=10.0, # Maximum valid range (meters)
ranges=[1.0, 1.5, 2.0], # Range readings
)
.angle_at(index) ā Angle for a Range Reading
angle = scan.angle_at(0) # Angle of the first reading
angle = scan.angle_at(100) # Angle of the 101st reading
Computes the angle (in radians) for a given index in the ranges array. The formula is: angle_min + index * angle_increment. You need this to convert polar (angle, range) readings to cartesian (x, y) coordinates:
import math
for i in range(len(scan.ranges)):
if scan.is_range_valid(i):
angle = scan.angle_at(i)
r = scan.ranges[i]
x = r * math.cos(angle) # In robot's frame
y = r * math.sin(angle)
.is_range_valid(index) ā Filter Bad Readings
if scan.is_range_valid(5):
print(f"Valid: {scan.ranges[5]:.2f} m")
else:
print("Invalid: too close, too far, or NaN")
Returns True if the range at the given index falls within [range_min, range_max]. LiDAR sensors return invalid readings for many reasons ā reflective surfaces, transparent objects, direct sunlight, readings below minimum range (too close to the sensor), or at maximum range (nothing detected). Always filter with this before using a range value.
Common mistake: Iterating
scan.rangeswithout checking validity. Invalid readings are often 0.0 orinf, which will corrupt your obstacle map if not filtered.
.min_range() ā Closest Valid Object
closest = scan.min_range()
if closest is not None and closest < 0.5:
print("DANGER: object at {:.2f} m!".format(closest))
Returns the minimum valid range reading, or None if all readings are invalid. This is the fastest way to check for close obstacles ā one call instead of iterating the entire array.
len(scan) ā Valid Reading Count
print(f"{len(scan)} valid readings out of {len(scan.ranges)} total")
Example ā LiDAR Obstacle Detection:
from horus import Node, run, LaserScan, CmdVel, Topic
import math
scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)
def avoid_obstacles(node):
scan = scan_topic.recv(node)
if scan is None:
return
closest = scan.min_range()
if closest is not None and closest < 0.5:
# Too close ā stop and turn
cmd_topic.send(CmdVel(linear=0.0, angular=0.5), node)
else:
# Clear path ā drive forward
cmd_topic.send(CmdVel(linear=0.3, angular=0.0), node)
run(Node(tick=avoid_obstacles, rate=10, pubs=["cmd_vel"], subs=["scan"]))
Imu
Inertial Measurement Unit ā accelerometer (linear acceleration) + gyroscope (angular velocity) + optional orientation quaternion.
Constructor
imu = Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
gyro_x=0.0, gyro_y=0.0, gyro_z=0.1)
.set_orientation_from_euler(roll, pitch, yaw) ā Set Orientation
import math
imu.set_orientation_from_euler(roll=0.0, pitch=0.05, yaw=math.pi/4)
Converts Euler angles (radians) to the internal quaternion representation and stores it. Use this when you have orientation estimates from a sensor fusion algorithm (complementary filter, Madgwick, EKF) and want to publish them with the IMU data.
- roll: tilt left/right (positive = right side down)
- pitch: tilt forward/backward (positive = nose down)
- yaw: compass heading (positive = counterclockwise from above)
Common mistake: Publishing orientation without actually computing it. If your IMU only has a 6-axis accelerometer+gyroscope (no magnetometer or fusion), don't set orientation ā call
has_orientation()to check, and let downstream nodes know the orientation field is empty.
.has_orientation() ā Is Orientation Data Present?
if imu.has_orientation():
# Safe to use orientation fields
print("Orientation available")
else:
# Only acceleration and angular velocity are valid
print("Raw IMU ā no orientation estimate")
Not all IMUs provide orientation. A raw 6-axis chip gives only acceleration and angular velocity ā orientation must be computed by a sensor fusion node. This method checks whether the orientation quaternion has been set (is non-identity).
.is_valid() ā Validation
if not imu.is_valid():
print("Bad IMU reading ā NaN or inf detected")
.angular_velocity_vec() ā Gyroscope as Vector3
gyro = imu.angular_velocity_vec() # Returns Vector3
print(f"Rotation rate: {gyro.magnitude():.3f} rad/s")
if gyro.magnitude() > 2.0:
print("Spinning too fast!")
Returns the gyroscope reading as a Vector3, giving you access to magnitude(), dot(), cross(), and other vector operations. Much more useful than raw gyro_x, gyro_y, gyro_z fields when you need to compute rotation rates or compare angular velocities.
.linear_acceleration_vec() ā Accelerometer as Vector3
accel = imu.linear_acceleration_vec()
gravity_removed = accel.magnitude() - 9.81
print(f"Dynamic acceleration: {gravity_removed:.2f} m/s²")
Same idea ā returns a Vector3 for the accelerometer. On a stationary robot, the magnitude should be ā 9.81 m/s² (gravity). Deviations indicate dynamic acceleration (movement, vibration, impact).
Example ā Tilt Detection:
from horus import Imu, Topic
import math
imu_topic = Topic(Imu)
def check_tilt(node):
imu = imu_topic.recv(node)
if imu is None or not imu.is_valid():
return
accel = imu.linear_acceleration_vec()
# Pitch angle from accelerometer (simplified, assumes no dynamic motion)
pitch = math.atan2(accel.x, math.sqrt(accel.y**2 + accel.z**2))
if abs(pitch) > math.radians(15):
print(f"WARNING: Robot tilted {math.degrees(pitch):.1f}° ā risk of tipping!")
Odometry
Robot odometry ā 2D pose + velocity, typically published by wheel encoders or visual odometry systems.
Constructor
odom = Odometry(x=1.0, y=2.0, theta=0.5,
linear_velocity=0.5, angular_velocity=0.1)
.set_frames(frame, child_frame) ā Set Coordinate Frames
odom.set_frames("odom", "base_link")
Sets the parent and child frame names. By convention:
frame= "odom" (the fixed odometry frame)child_frame= "base_link" (the robot's body frame)
These names are used by the TransformFrame system to build the transform tree.
.update(pose, twist) ā Update from New Measurements
from horus import Pose2D, Twist
new_pose = Pose2D(x=1.1, y=2.05, theta=0.52)
new_twist = Twist.new_2d(linear_x=0.5, angular_z=0.1)
odom.update(new_pose, new_twist)
Replaces the pose and twist with new measurements. Use this in a sensor fusion node where you compute updated estimates each tick.
.is_valid() ā Validation
if not odom.is_valid():
print("Odometry contains NaN ā encoder fault?")
JointState
Multi-joint state for robot arms and legged robots. Stores up to 16 joint names, positions, velocities, and efforts. The key feature is by-name lookup ā query individual joints without iterating arrays.
Constructor
js = JointState(
names=["shoulder", "elbow", "wrist"],
positions=[1.57, 0.785, 0.0],
velocities=[0.0, 0.1, 0.0],
efforts=[5.0, 2.0, 0.5],
)
.position(name) ā Look Up Joint Position
shoulder_angle = js.position("shoulder") # 1.57
gripper_angle = js.position("gripper") # None (not found)
Returns the position of the named joint, or None if that joint doesn't exist in this message. Positions are typically in radians for revolute joints or meters for prismatic joints.
.velocity(name) ā Look Up Joint Velocity
elbow_speed = js.velocity("elbow") # 0.1 rad/s
.effort(name) ā Look Up Joint Effort/Torque
wrist_torque = js.effort("wrist") # 0.5 Nm
Common mistake: Expecting the same joint names in
JointStateandJointCommand. If the publisher uses "joint_1" but the subscriber expects "shoulder", the by-name lookup returnsNone. Standardize joint names across your system.
Example ā Arm Joint Monitor:
from horus import JointState, Topic
js_topic = Topic(JointState)
def monitor_arm(node):
js = js_topic.recv(node)
if js is None:
return
for name in js.names:
pos = js.position(name)
eff = js.effort(name)
if eff is not None and abs(eff) > 10.0:
print(f"WARNING: {name} effort {eff:.1f} Nm exceeds limit!")
BatteryState
Battery monitoring with threshold checks and time estimation.
Constructor
battery = BatteryState(voltage=11.1, percentage=15.0, current=-2.5)
.is_critical() ā Below 10%
if battery.is_critical():
print("CRITICAL ā land immediately!")
Hardcoded threshold at 10%. Intended for "land now" or "return to dock immediately" decisions.
.is_low(threshold) ā Below Custom Threshold
if battery.is_low(20.0):
print("Low battery ā start heading home")
Check against your own threshold. Typical values: 20-30% for "start returning", 10% for critical.
.time_remaining() ā Estimated Remaining Time
remaining = battery.time_remaining()
if remaining is not None:
print(f"~{remaining/60:.0f} minutes remaining")
else:
print("Not discharging (charging or current=0)")
Returns estimated seconds of battery life based on current draw, or None if the battery isn't discharging (current ā„ 0 or current = 0). This is a simple linear estimate ā actual runtime depends on load variations.
Common mistake: Treating
Noneas "infinite battery".Nonemeans the estimate isn't available ā the battery might not be discharging, or the current sensor might not be connected.
NavSatFix
GPS/GNSS position with fix status and distance calculation.
.from_coordinates(lat, lon, alt) ā From GPS Coordinates
pos = NavSatFix.from_coordinates(lat=37.7749, lon=-122.4194, alt=10.0)
Factory method that creates a fix with STATUS_FIX and sets the coordinates. Simpler than setting fields individually.
.has_fix() ā Is GPS Valid?
if not gps.has_fix():
print("No GPS fix ā using dead reckoning only")
Returns True when the receiver has a valid position fix. Without a fix, latitude/longitude are meaningless.
.distance_to(other) ā Great-Circle Distance
home = NavSatFix.from_coordinates(lat=37.7750, lon=-122.4195, alt=10.0)
current = NavSatFix.from_coordinates(lat=37.7760, lon=-122.4180, alt=10.0)
dist = current.distance_to(home)
print(f"Distance to home: {dist:.1f} m")
Computes the great-circle distance between two GPS positions using the Haversine formula. Returns meters. This accounts for Earth's curvature ā accurate for any distance from centimeters to thousands of kilometers.
Common mistake: Using Euclidean distance on latitude/longitude values.
distance = sqrt((lat1-lat2)² + (lon1-lon2)²)is wrong ā it doesn't account for Earth's curvature or the fact that longitude degrees get shorter near the poles. Always usedistance_to().
.horizontal_accuracy() / .is_valid()
print(f"Accuracy: ±{gps.horizontal_accuracy():.1f} m")
print(gps.is_valid())
Clock
Time source for wall-clock, simulation, and replay modes. The factory methods create the right clock type for your use case.
.wall_clock() ā Real Time
clk = Clock.wall_clock()
Production time ā actual wall-clock nanoseconds. Use this for logging, timestamps, and real-world operation.
.sim_time(sim_ns, speed) ā Simulation Time
clk = Clock.sim_time(sim_ns=0, speed=2.0) # Start at t=0, 2x speed
Controlled time for simulation. The scheduler advances sim_time deterministically, so replaying the same sequence produces identical results. Use this for testing and development.
Common mistake: Using
wall_clock()during simulation. Wall time advances independently of the simulation ā your node will process data at the wrong rate. Always usesim_time()in simulation mode.
.replay_time(replay_ns, speed) ā Replay Time
clk = Clock.replay_time(replay_ns=1000000000, speed=1.0)
Playback from a recorded session. Time advances at speed multiplier (1.0 = real-time, 0.5 = half-speed).
.elapsed_since(earlier) ā Time Delta
start = Clock.wall_clock()
# ... do work ...
end = Clock.wall_clock()
elapsed_ns = end.elapsed_since(start)
print(f"Took {elapsed_ns / 1e6:.1f} ms")
Nanoseconds elapsed since another clock reading. Use this for profiling and timing measurements.
.is_paused() ā Check Pause State
if clk.is_paused():
print("Simulation paused")
TimeReference
External time source for synchronization (GPS time, NTP offset).
.correct_timestamp(local_ns) ā Apply Time Offset
tref = TimeReference(time_ref_ns=1000000000, source="gps", offset_ns=-500)
corrected = tref.correct_timestamp(local_ns=1000000500)
print(tref.source) # "gps"
Applies the offset correction to a local timestamp: corrected = local_ns + offset_ns. Use this when your system clock differs from an external reference (GPS, NTP server).
Simple Sensor Types
These types have constructors + field access but no helper methods.
from horus import MagneticField, Temperature, FluidPressure, Illuminance, RangeSensor
mag = MagneticField(x=0.25, y=-0.1, z=0.45) # Tesla
temp = Temperature(temperature=72.5, variance=0.1) # Celsius
pressure = FluidPressure(pressure=101325.0) # Pascals
lux = Illuminance(illuminance=500.0) # Lux
range_s = RangeSensor(range=2.5, min_range=0.02, max_range=10.0)
See Also
- Rust Sensor Messages ā Rust API reference
- Geometry Messages ā Pose2D, Vector3, Quaternion
- Navigation Messages ā NavGoal, OccupancyGrid
- Diagnostics Messages ā BatteryState monitoring patterns