Sensor Messages
Standard sensor data formats for common robotics sensors. All are typed, zero-copy, and binary-compatible with Rust.
from horus import Imu, LaserScan, Odometry, NavSatFix, BatteryState, RangeSensor
Imu
IMU (Inertial Measurement Unit) — accelerometer + gyroscope data.
import horus
# All 6 accel/gyro fields are REQUIRED positional arguments
imu = horus.Imu(
0.0, # accel_x — m/s², linear acceleration X
0.0, # accel_y — m/s², linear acceleration Y
9.81, # accel_z — m/s², linear acceleration Z (gravity)
0.0, # gyro_x — rad/s, angular velocity X (roll rate)
0.0, # gyro_y — rad/s, angular velocity Y (pitch rate)
0.05, # gyro_z — rad/s, angular velocity Z (yaw rate)
timestamp_ns=0, # optional
)
# Access fields
print(f"Gravity: {imu.accel_z:.2f} m/s²")
print(f"Yaw rate: {imu.gyro_z:.3f} rad/s")
Constructor: Imu(accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, timestamp_ns=0)
All 6 accel/gyro values are required positional arguments. timestamp_ns is optional (keyword-only).
Fields (getters/setters):
| Field | Type | Unit | Description |
|---|---|---|---|
accel_x, accel_y, accel_z | float | m/s² | Linear acceleration |
gyro_x, gyro_y, gyro_z | float | rad/s | Angular velocity |
timestamp_ns | int | ns | Nanoseconds since epoch |
Methods:
| Method | Returns | Description |
|---|---|---|
is_valid() | bool | Check for NaN/inf in accel and gyro fields |
has_orientation() | bool | True if orientation quaternion is set (not identity with covariance[0] == -1) |
set_orientation_from_euler(roll, pitch, yaw) | — | Set orientation quaternion from Euler angles (radians) |
angular_velocity_vec() | tuple | Returns (gyro_x, gyro_y, gyro_z) |
linear_acceleration_vec() | tuple | Returns (accel_x, accel_y, accel_z) |
Validation pattern:
def imu_tick(node):
imu = read_hardware()
# IMPORTANT: always validate before publishing
if not imu.is_valid():
node.log_warning("Invalid IMU reading (NaN/inf) — skipping")
return
node.send("imu.raw", imu)
Common pitfalls:
- All 6 accel/gyro fields are required positional args — you cannot use keyword-only construction
- Hardware fault produces NaN — always check
is_valid()before publishing - Orientation is NOT set by default —
has_orientation()returnsFalseuntil you callset_orientation_from_euler() - There are NO
mag_x/mag_y/mag_zfields onImuin Python — useMagneticFieldfor magnetometer data - Units: acceleration is m/s² (not g), angular velocity is rad/s (not deg/s)
ROS2 equivalent:
sensor_msgs/msg/Imu
LaserScan
2D LiDAR scan data — 360 range readings.
import horus
scan = horus.LaserScan(
angle_min=-3.14159, # rad — start angle
angle_max=3.14159, # rad — end angle
angle_increment=0.01745, # rad — step between readings
range_min=0.1, # m — minimum valid range
range_max=30.0, # m — maximum valid range
ranges=[1.5, 2.0, 0.0, 3.2], # optional list of range measurements
timestamp_ns=0, # optional
)
# Access range data
for i in range(len(scan)):
if scan.is_range_valid(i):
print(f"Reading {i}: {scan.ranges[i]:.2f}m at {scan.angle_at(i):.2f} rad")
Constructor: LaserScan(angle_min=0.0, angle_max=0.0, angle_increment=0.0, range_min=0.0, range_max=0.0, ranges=None, timestamp_ns=0)
All parameters are optional keyword arguments.
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
ranges | list[float] | m | Range measurements (0.0 = invalid) |
angle_min | float | rad | Start angle |
angle_max | float | rad | End angle |
angle_increment | float | rad | Step between readings |
range_min | float | m | Minimum valid range |
range_max | float | m | Maximum valid range |
timestamp_ns | int | ns | Nanoseconds since epoch |
Methods:
| Method | Returns | Description |
|---|---|---|
len(scan) | int | Count of valid range readings (via __len__) |
angle_at(index) | float | Get the angle (radians) for a specific range index |
is_range_valid(index) | bool | Check if range_min <= ranges[index] <= range_max |
min_range() | float or None | Minimum valid range reading (None if all invalid) |
Validation pattern:
def lidar_tick(node):
scan = node.recv("scan")
if scan is None:
return
# Find closest obstacle
min_dist = scan.min_range()
if min_dist is not None and min_dist < 0.5:
node.log_warning(f"Obstacle at {min_dist:.2f}m!")
node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
return
# Count valid readings — use len(), not .valid_count()
valid = len(scan)
node.log_debug(f"{valid}/360 valid readings")
Typical sensor configurations:
| Sensor | Ranges | angle_min | angle_max | range_max |
|---|---|---|---|---|
| RPLiDAR A1 | 360 | 0 | 2π | 12m |
| RPLiDAR A2 | 360 | 0 | 2π | 18m |
| Hokuyo UTM-30LX | 1081 | -π/2 | π/2 | 30m |
Common pitfalls:
ranges[i] == 0.0means invalid/no return — NOT zero distance- Always check
is_range_valid(i)before using a reading - Use
len(scan)to get the valid reading count — there is novalid_count()method angle_at(i)assumes uniform spacing — verifyangle_incrementmatches your sensor- There is no
scan_timefield in Python
ROS2 equivalent:
sensor_msgs/msg/LaserScan
Odometry
Robot odometry — pose + velocity as flat scalar fields.
import horus
odom = horus.Odometry(
x=1.0, # m — position X
y=2.0, # m — position Y
theta=0.5, # rad — heading
linear_velocity=0.5, # m/s — forward speed
angular_velocity=0.1, # rad/s — turn rate
timestamp_ns=0, # optional
)
print(f"Position: ({odom.x}, {odom.y})")
print(f"Heading: {odom.theta:.2f} rad")
print(f"Speed: {odom.linear_velocity:.2f} m/s")
Constructor: Odometry(x=0.0, y=0.0, theta=0.0, linear_velocity=0.0, angular_velocity=0.0, timestamp_ns=0)
All parameters are optional keyword arguments with defaults of 0.0.
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
x | float | m | Position X |
y | float | m | Position Y |
theta | float | rad | Heading (orientation) |
linear_velocity | float | m/s | Forward speed |
angular_velocity | float | rad/s | Turn rate |
timestamp_ns | int | ns | Nanoseconds since epoch |
Common pitfalls:
- Odometry uses flat scalar fields — there are no nested
Pose2DorTwistobjects thetais in radians, not degrees
ROS2 equivalent:
nav_msgs/msg/Odometry
NavSatFix
GPS/GNSS fix — latitude, longitude, altitude.
import horus
fix = horus.NavSatFix(
latitude=37.7749, # degrees
longitude=-122.4194, # degrees
altitude=10.5, # meters above WGS84 ellipsoid
timestamp_ns=0, # optional
)
print(f"Position: {fix.latitude}, {fix.longitude}")
print(f"Satellites: {fix.satellites_visible}")
print(f"HDOP: {fix.hdop}")
Constructor: NavSatFix(latitude=0.0, longitude=0.0, altitude=0.0, timestamp_ns=0)
All parameters are optional keyword arguments.
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
latitude | float | deg | WGS84 latitude |
longitude | float | deg | WGS84 longitude |
altitude | float | m | Height above ellipsoid |
timestamp_ns | int | ns | Nanoseconds since epoch |
Additional getters:
| Getter | Type | Description |
|---|---|---|
satellites_visible | int | Number of visible satellites |
hdop | float | Horizontal dilution of precision |
speed | float | Ground speed |
heading | float | Course heading |
status | int | Fix status (0=no fix, 1=fix, 2=DGPS, 3=RTK) |
ROS2 equivalent:
sensor_msgs/msg/NavSatFix
BatteryState
Battery monitoring — voltage, current, charge level.
import horus
battery = horus.BatteryState(
voltage=12.4, # V
percentage=0.75, # 0.0-1.0
current=-2.5, # A (negative = discharging)
temperature=35.0, # °C (default: 25.0)
power_supply_status=1, # u8 status code
timestamp_ns=0, # optional
)
if battery.percentage < 0.2:
print("LOW BATTERY WARNING")
# Check power supply status (u8), NOT a bool
print(f"Status code: {battery.power_supply_status}")
Constructor: BatteryState(voltage=0.0, percentage=0.0, current=0.0, temperature=25.0, power_supply_status=0, timestamp_ns=0)
All parameters are optional keyword arguments. Note that temperature defaults to 25.0 (room temperature).
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
voltage | float | V | Battery voltage |
percentage | float | 0-1 | Charge level (0.0 = empty, 1.0 = full) |
current | float | A | Current (negative = discharging) |
temperature | float | °C | Battery temperature (default: 25.0) |
power_supply_status | int | — | Power supply status code (u8) |
timestamp_ns | int | ns | Nanoseconds since epoch |
Common pitfalls:
- There is NO
is_chargingfield — usepower_supply_status(au8integer) instead temperaturedefaults to 25.0, not 0.0
ROS2 equivalent:
sensor_msgs/msg/BatteryState
RangeSensor
Single-point distance sensor (ultrasonic, IR, ToF).
import horus
dist = horus.RangeSensor(
range=1.5, # m — measured distance
sensor_type=0, # sensor type code
field_of_view=0.26, # rad (~15°)
min_range=0.02, # m (default: 0.02)
max_range=4.0, # m (default: 4.0)
timestamp_ns=0, # optional
)
Constructor: RangeSensor(range=0.0, sensor_type=0, field_of_view=0.1, min_range=0.02, max_range=4.0, timestamp_ns=0)
All parameters are optional keyword arguments.
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
range | float | m | Measured distance |
sensor_type | int | — | Sensor type code |
field_of_view | float | rad | Sensor beam width (default: 0.1) |
min_range | float | m | Minimum valid range (default: 0.02) |
max_range | float | m | Maximum valid range (default: 4.0) |
timestamp_ns | int | ns | Nanoseconds since epoch |
JointState
Joint positions, velocities, and efforts for articulated robots.
import horus
state = horus.JointState(
names=["shoulder", "elbow", "wrist", "gripper", "j5", "j6"],
positions=[0.0, 0.5, -0.3, 1.2, 0.0, 0.0],
velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
efforts=[0.1, 0.5, 0.3, 0.8, 0.0, 0.0],
timestamp_ns=0, # optional
)
print(f"Joint '{state.names[1]}' at {state.positions[1]:.2f} rad")
Constructor: JointState(names=None, positions=None, velocities=None, efforts=None, timestamp_ns=0)
All parameters are optional keyword arguments. names is the first parameter.
Fields:
| Field | Type | Unit | Description |
|---|---|---|---|
names | list[str] | — | Joint names |
positions | list[float] | rad | Joint positions |
velocities | list[float] | rad/s | Joint velocities |
efforts | list[float] | Nm | Joint torques/forces |
timestamp_ns | int | ns | Nanoseconds since epoch |
ROS2 equivalent:
sensor_msgs/msg/JointState
Environmental Sensors
Temperature
temp = horus.Temperature(temperature=22.5, variance=0.1, timestamp_ns=0)
Constructor: Temperature(temperature=0.0, variance=0.0, timestamp_ns=0)
| Field | Type | Unit | Description |
|---|---|---|---|
temperature | float | °C | Temperature reading |
variance | float | — | Measurement variance |
timestamp_ns | int | ns | Nanoseconds since epoch |
MagneticField
mag = horus.MagneticField(x=0.00002, y=0.00005, z=-0.00004, timestamp_ns=0) # Tesla
Constructor: MagneticField(x=0.0, y=0.0, z=0.0, timestamp_ns=0)
| Field | Type | Unit | Description |
|---|---|---|---|
x, y, z | float | T | Magnetic field components |
timestamp_ns | int | ns | Nanoseconds since epoch |
FluidPressure
pressure = horus.FluidPressure(pressure=101325.0, variance=10.0, timestamp_ns=0) # Pascals
Constructor: FluidPressure(pressure=0.0, variance=0.0, timestamp_ns=0)
| Field | Type | Unit | Description |
|---|---|---|---|
pressure | float | Pa | Fluid pressure |
variance | float | — | Measurement variance |
timestamp_ns | int | ns | Nanoseconds since epoch |
Illuminance
lux = horus.Illuminance(illuminance=500.0, variance=5.0, timestamp_ns=0) # Lux
Constructor: Illuminance(illuminance=0.0, variance=0.0, timestamp_ns=0)
| Field | Type | Unit | Description |
|---|---|---|---|
illuminance | float | lx | Illuminance reading |
variance | float | — | Measurement variance |
timestamp_ns | int | ns | Nanoseconds since epoch |
Example: IMU Sensor Node
import horus
def imu_tick(node):
imu = read_hardware() # Your I2C/SPI driver
# Validate before publishing
if not imu.is_valid():
return
node.send("imu.raw", imu)
sensor = horus.Node(
name="imu_reader",
pubs=[horus.Imu],
tick=imu_tick,
rate=100,
order=0,
)
horus.run(sensor, rt=True)
See Also
- Control Messages — CmdVel, MotorCommand, ServoCommand
- Geometry Messages — Pose2D, Twist, Quaternion
- Standard Messages — Full message catalog
- IMU Reader Recipe — Complete IMU example
- Rust Sensor Messages — Rust equivalent