Sensor Messages

Sensor messages carry raw and processed data from your robot's sensors — LiDAR scans, IMU readings, GPS positions, battery levels, and more. These are the inputs to every robotics pipeline: perception, navigation, monitoring.

When you need these: Every robot has sensors. If it moves, you need Odometry. If it has a lidar, you need LaserScan. If it has an IMU, you need Imu. If it runs on battery, you need BatteryState.

from horus import (
    LaserScan, Imu, Odometry, JointState,
    BatteryState, NavSatFix, RangeSensor,
    MagneticField, Temperature, FluidPressure, Illuminance,
    Clock, TimeReference,
)

LaserScan

2D LiDAR scan — an array of range readings at known angles. The most common sensor for obstacle avoidance in mobile robotics.

Constructor

import math
scan = LaserScan(
    angle_min=-math.pi,      # Start angle (radians)
    angle_max=math.pi,       # End angle (radians)
    range_min=0.1,            # Minimum valid range (meters)
    range_max=10.0,           # Maximum valid range (meters)
    ranges=[1.0, 1.5, 2.0],  # Range readings
)

.angle_at(index) — Angle for a Range Reading

angle = scan.angle_at(0)   # Angle of the first reading
angle = scan.angle_at(100) # Angle of the 101st reading

Computes the angle (in radians) for a given index in the ranges array. The formula is: angle_min + index * angle_increment. You need this to convert polar (angle, range) readings to cartesian (x, y) coordinates:

import math
for i in range(len(scan.ranges)):
    if scan.is_range_valid(i):
        angle = scan.angle_at(i)
        r = scan.ranges[i]
        x = r * math.cos(angle)  # In robot's frame
        y = r * math.sin(angle)

.is_range_valid(index) — Filter Bad Readings

if scan.is_range_valid(5):
    print(f"Valid: {scan.ranges[5]:.2f} m")
else:
    print("Invalid: too close, too far, or NaN")

Returns True if the range at the given index falls within [range_min, range_max]. LiDAR sensors return invalid readings for many reasons — reflective surfaces, transparent objects, direct sunlight, readings below minimum range (too close to the sensor), or at maximum range (nothing detected). Always filter with this before using a range value.

Common mistake: Iterating scan.ranges without checking validity. Invalid readings are often 0.0 or inf, which will corrupt your obstacle map if not filtered.

.min_range() — Closest Valid Object

closest = scan.min_range()
if closest is not None and closest < 0.5:
    print("DANGER: object at {:.2f} m!".format(closest))

Returns the minimum valid range reading, or None if all readings are invalid. This is the fastest way to check for close obstacles — one call instead of iterating the entire array.

len(scan) — Valid Reading Count

print(f"{len(scan)} valid readings out of {len(scan.ranges)} total")

Example — LiDAR Obstacle Detection:

from horus import Node, run, LaserScan, CmdVel, Topic
import math

scan_topic = Topic(LaserScan)
cmd_topic = Topic(CmdVel)

def avoid_obstacles(node):
    scan = scan_topic.recv(node)
    if scan is None:
        return

    closest = scan.min_range()
    if closest is not None and closest < 0.5:
        # Too close — stop and turn
        cmd_topic.send(CmdVel(linear=0.0, angular=0.5), node)
    else:
        # Clear path — drive forward
        cmd_topic.send(CmdVel(linear=0.3, angular=0.0), node)

run(Node(tick=avoid_obstacles, rate=10, pubs=["cmd_vel"], subs=["scan"]))

Imu

Inertial Measurement Unit — accelerometer (linear acceleration) + gyroscope (angular velocity) + optional orientation quaternion.

Constructor

imu = Imu(accel_x=0.0, accel_y=0.0, accel_z=9.81,
          gyro_x=0.0, gyro_y=0.0, gyro_z=0.1)

.set_orientation_from_euler(roll, pitch, yaw) — Set Orientation

import math
imu.set_orientation_from_euler(roll=0.0, pitch=0.05, yaw=math.pi/4)

Converts Euler angles (radians) to the internal quaternion representation and stores it. Use this when you have orientation estimates from a sensor fusion algorithm (complementary filter, Madgwick, EKF) and want to publish them with the IMU data.

  • roll: tilt left/right (positive = right side down)
  • pitch: tilt forward/backward (positive = nose down)
  • yaw: compass heading (positive = counterclockwise from above)

Common mistake: Publishing orientation without actually computing it. If your IMU only has a 6-axis accelerometer+gyroscope (no magnetometer or fusion), don't set orientation — call has_orientation() to check, and let downstream nodes know the orientation field is empty.

.has_orientation() — Is Orientation Data Present?

if imu.has_orientation():
    # Safe to use orientation fields
    print("Orientation available")
else:
    # Only acceleration and angular velocity are valid
    print("Raw IMU — no orientation estimate")

Not all IMUs provide orientation. A raw 6-axis chip gives only acceleration and angular velocity — orientation must be computed by a sensor fusion node. This method checks whether the orientation quaternion has been set (is non-identity).

.is_valid() — Validation

if not imu.is_valid():
    print("Bad IMU reading — NaN or inf detected")

.angular_velocity_vec() — Gyroscope as Vector3

gyro = imu.angular_velocity_vec()  # Returns Vector3
print(f"Rotation rate: {gyro.magnitude():.3f} rad/s")
if gyro.magnitude() > 2.0:
    print("Spinning too fast!")

Returns the gyroscope reading as a Vector3, giving you access to magnitude(), dot(), cross(), and other vector operations. Much more useful than raw gyro_x, gyro_y, gyro_z fields when you need to compute rotation rates or compare angular velocities.

.linear_acceleration_vec() — Accelerometer as Vector3

accel = imu.linear_acceleration_vec()
gravity_removed = accel.magnitude() - 9.81
print(f"Dynamic acceleration: {gravity_removed:.2f} m/s²")

Same idea — returns a Vector3 for the accelerometer. On a stationary robot, the magnitude should be ā‰ˆ 9.81 m/s² (gravity). Deviations indicate dynamic acceleration (movement, vibration, impact).

Example — Tilt Detection:

from horus import Imu, Topic
import math

imu_topic = Topic(Imu)

def check_tilt(node):
    imu = imu_topic.recv(node)
    if imu is None or not imu.is_valid():
        return
    accel = imu.linear_acceleration_vec()
    # Pitch angle from accelerometer (simplified, assumes no dynamic motion)
    pitch = math.atan2(accel.x, math.sqrt(accel.y**2 + accel.z**2))
    if abs(pitch) > math.radians(15):
        print(f"WARNING: Robot tilted {math.degrees(pitch):.1f}° — risk of tipping!")

Odometry

Robot odometry — 2D pose + velocity, typically published by wheel encoders or visual odometry systems.

Constructor

odom = Odometry(x=1.0, y=2.0, theta=0.5,
                linear_velocity=0.5, angular_velocity=0.1)

.set_frames(frame, child_frame) — Set Coordinate Frames

odom.set_frames("odom", "base_link")

Sets the parent and child frame names. By convention:

  • frame = "odom" (the fixed odometry frame)
  • child_frame = "base_link" (the robot's body frame)

These names are used by the TransformFrame system to build the transform tree.

.update(pose, twist) — Update from New Measurements

from horus import Pose2D, Twist
new_pose = Pose2D(x=1.1, y=2.05, theta=0.52)
new_twist = Twist.new_2d(linear_x=0.5, angular_z=0.1)
odom.update(new_pose, new_twist)

Replaces the pose and twist with new measurements. Use this in a sensor fusion node where you compute updated estimates each tick.

.is_valid() — Validation

if not odom.is_valid():
    print("Odometry contains NaN — encoder fault?")

JointState

Multi-joint state for robot arms and legged robots. Stores up to 16 joint names, positions, velocities, and efforts. The key feature is by-name lookup — query individual joints without iterating arrays.

Constructor

js = JointState(
    names=["shoulder", "elbow", "wrist"],
    positions=[1.57, 0.785, 0.0],
    velocities=[0.0, 0.1, 0.0],
    efforts=[5.0, 2.0, 0.5],
)

.position(name) — Look Up Joint Position

shoulder_angle = js.position("shoulder")  # 1.57
gripper_angle = js.position("gripper")    # None (not found)

Returns the position of the named joint, or None if that joint doesn't exist in this message. Positions are typically in radians for revolute joints or meters for prismatic joints.

.velocity(name) — Look Up Joint Velocity

elbow_speed = js.velocity("elbow")  # 0.1 rad/s

.effort(name) — Look Up Joint Effort/Torque

wrist_torque = js.effort("wrist")  # 0.5 Nm

Common mistake: Expecting the same joint names in JointState and JointCommand. If the publisher uses "joint_1" but the subscriber expects "shoulder", the by-name lookup returns None. Standardize joint names across your system.

Example — Arm Joint Monitor:

from horus import JointState, Topic

js_topic = Topic(JointState)

def monitor_arm(node):
    js = js_topic.recv(node)
    if js is None:
        return
    for name in js.names:
        pos = js.position(name)
        eff = js.effort(name)
        if eff is not None and abs(eff) > 10.0:
            print(f"WARNING: {name} effort {eff:.1f} Nm exceeds limit!")

BatteryState

Battery monitoring with threshold checks and time estimation.

Constructor

battery = BatteryState(voltage=11.1, percentage=15.0, current=-2.5)

.is_critical() — Below 10%

if battery.is_critical():
    print("CRITICAL — land immediately!")

Hardcoded threshold at 10%. Intended for "land now" or "return to dock immediately" decisions.

.is_low(threshold) — Below Custom Threshold

if battery.is_low(20.0):
    print("Low battery — start heading home")

Check against your own threshold. Typical values: 20-30% for "start returning", 10% for critical.

.time_remaining() — Estimated Remaining Time

remaining = battery.time_remaining()
if remaining is not None:
    print(f"~{remaining/60:.0f} minutes remaining")
else:
    print("Not discharging (charging or current=0)")

Returns estimated seconds of battery life based on current draw, or None if the battery isn't discharging (current ≄ 0 or current = 0). This is a simple linear estimate — actual runtime depends on load variations.

Common mistake: Treating None as "infinite battery". None means the estimate isn't available — the battery might not be discharging, or the current sensor might not be connected.


GPS/GNSS position with fix status and distance calculation.

.from_coordinates(lat, lon, alt) — From GPS Coordinates

pos = NavSatFix.from_coordinates(lat=37.7749, lon=-122.4194, alt=10.0)

Factory method that creates a fix with STATUS_FIX and sets the coordinates. Simpler than setting fields individually.

.has_fix() — Is GPS Valid?

if not gps.has_fix():
    print("No GPS fix — using dead reckoning only")

Returns True when the receiver has a valid position fix. Without a fix, latitude/longitude are meaningless.

.distance_to(other) — Great-Circle Distance

home = NavSatFix.from_coordinates(lat=37.7750, lon=-122.4195, alt=10.0)
current = NavSatFix.from_coordinates(lat=37.7760, lon=-122.4180, alt=10.0)
dist = current.distance_to(home)
print(f"Distance to home: {dist:.1f} m")

Computes the great-circle distance between two GPS positions using the Haversine formula. Returns meters. This accounts for Earth's curvature — accurate for any distance from centimeters to thousands of kilometers.

Common mistake: Using Euclidean distance on latitude/longitude values. distance = sqrt((lat1-lat2)² + (lon1-lon2)²) is wrong — it doesn't account for Earth's curvature or the fact that longitude degrees get shorter near the poles. Always use distance_to().

.horizontal_accuracy() / .is_valid()

print(f"Accuracy: ±{gps.horizontal_accuracy():.1f} m")
print(gps.is_valid())

Clock

Time source for wall-clock, simulation, and replay modes. The factory methods create the right clock type for your use case.

.wall_clock() — Real Time

clk = Clock.wall_clock()

Production time — actual wall-clock nanoseconds. Use this for logging, timestamps, and real-world operation.

.sim_time(sim_ns, speed) — Simulation Time

clk = Clock.sim_time(sim_ns=0, speed=2.0)  # Start at t=0, 2x speed

Controlled time for simulation. The scheduler advances sim_time deterministically, so replaying the same sequence produces identical results. Use this for testing and development.

Common mistake: Using wall_clock() during simulation. Wall time advances independently of the simulation — your node will process data at the wrong rate. Always use sim_time() in simulation mode.

.replay_time(replay_ns, speed) — Replay Time

clk = Clock.replay_time(replay_ns=1000000000, speed=1.0)

Playback from a recorded session. Time advances at speed multiplier (1.0 = real-time, 0.5 = half-speed).

.elapsed_since(earlier) — Time Delta

start = Clock.wall_clock()
# ... do work ...
end = Clock.wall_clock()
elapsed_ns = end.elapsed_since(start)
print(f"Took {elapsed_ns / 1e6:.1f} ms")

Nanoseconds elapsed since another clock reading. Use this for profiling and timing measurements.

.is_paused() — Check Pause State

if clk.is_paused():
    print("Simulation paused")

TimeReference

External time source for synchronization (GPS time, NTP offset).

.correct_timestamp(local_ns) — Apply Time Offset

tref = TimeReference(time_ref_ns=1000000000, source="gps", offset_ns=-500)
corrected = tref.correct_timestamp(local_ns=1000000500)
print(tref.source)  # "gps"

Applies the offset correction to a local timestamp: corrected = local_ns + offset_ns. Use this when your system clock differs from an external reference (GPS, NTP server).


Simple Sensor Types

These types have constructors + field access but no helper methods.

from horus import MagneticField, Temperature, FluidPressure, Illuminance, RangeSensor

mag = MagneticField(x=0.25, y=-0.1, z=0.45)        # Tesla
temp = Temperature(temperature=72.5, variance=0.1)   # Celsius
pressure = FluidPressure(pressure=101325.0)           # Pascals
lux = Illuminance(illuminance=500.0)                  # Lux
range_s = RangeSensor(range=2.5, min_range=0.02, max_range=10.0)

See Also