Sensor Messages

Standard sensor data formats for common robotics sensors. All are typed, zero-copy, and binary-compatible with Rust.

from horus import Imu, LaserScan, Odometry, NavSatFix, BatteryState, RangeSensor

Imu

IMU (Inertial Measurement Unit) — accelerometer + gyroscope data.

import horus

# All 6 accel/gyro fields are REQUIRED positional arguments
imu = horus.Imu(
    0.0,    # accel_x — m/s², linear acceleration X
    0.0,    # accel_y — m/s², linear acceleration Y
    9.81,   # accel_z — m/s², linear acceleration Z (gravity)
    0.0,    # gyro_x  — rad/s, angular velocity X (roll rate)
    0.0,    # gyro_y  — rad/s, angular velocity Y (pitch rate)
    0.05,   # gyro_z  — rad/s, angular velocity Z (yaw rate)
    timestamp_ns=0,  # optional
)

# Access fields
print(f"Gravity: {imu.accel_z:.2f} m/s²")
print(f"Yaw rate: {imu.gyro_z:.3f} rad/s")

Constructor: Imu(accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, timestamp_ns=0)

All 6 accel/gyro values are required positional arguments. timestamp_ns is optional (keyword-only).

Fields (getters/setters):

FieldTypeUnitDescription
accel_x, accel_y, accel_zfloatm/s²Linear acceleration
gyro_x, gyro_y, gyro_zfloatrad/sAngular velocity
timestamp_nsintnsNanoseconds since epoch

Methods:

MethodReturnsDescription
is_valid()boolCheck for NaN/inf in accel and gyro fields
has_orientation()boolTrue if orientation quaternion is set (not identity with covariance[0] == -1)
set_orientation_from_euler(roll, pitch, yaw)Set orientation quaternion from Euler angles (radians)
angular_velocity_vec()tupleReturns (gyro_x, gyro_y, gyro_z)
linear_acceleration_vec()tupleReturns (accel_x, accel_y, accel_z)

Validation pattern:

def imu_tick(node):
    imu = read_hardware()

    # IMPORTANT: always validate before publishing
    if not imu.is_valid():
        node.log_warning("Invalid IMU reading (NaN/inf) — skipping")
        return

    node.send("imu.raw", imu)

Common pitfalls:

  • All 6 accel/gyro fields are required positional args — you cannot use keyword-only construction
  • Hardware fault produces NaN — always check is_valid() before publishing
  • Orientation is NOT set by default — has_orientation() returns False until you call set_orientation_from_euler()
  • There are NO mag_x/mag_y/mag_z fields on Imu in Python — use MagneticField for magnetometer data
  • Units: acceleration is m/s² (not g), angular velocity is rad/s (not deg/s)

ROS2 equivalent: sensor_msgs/msg/Imu


LaserScan

2D LiDAR scan data — 360 range readings.

import horus

scan = horus.LaserScan(
    angle_min=-3.14159,      # rad — start angle
    angle_max=3.14159,       # rad — end angle
    angle_increment=0.01745, # rad — step between readings
    range_min=0.1,           # m — minimum valid range
    range_max=30.0,          # m — maximum valid range
    ranges=[1.5, 2.0, 0.0, 3.2],  # optional list of range measurements
    timestamp_ns=0,          # optional
)

# Access range data
for i in range(len(scan)):
    if scan.is_range_valid(i):
        print(f"Reading {i}: {scan.ranges[i]:.2f}m at {scan.angle_at(i):.2f} rad")

Constructor: LaserScan(angle_min=0.0, angle_max=0.0, angle_increment=0.0, range_min=0.0, range_max=0.0, ranges=None, timestamp_ns=0)

All parameters are optional keyword arguments.

Fields:

FieldTypeUnitDescription
rangeslist[float]mRange measurements (0.0 = invalid)
angle_minfloatradStart angle
angle_maxfloatradEnd angle
angle_incrementfloatradStep between readings
range_minfloatmMinimum valid range
range_maxfloatmMaximum valid range
timestamp_nsintnsNanoseconds since epoch

Methods:

MethodReturnsDescription
len(scan)intCount of valid range readings (via __len__)
angle_at(index)floatGet the angle (radians) for a specific range index
is_range_valid(index)boolCheck if range_min <= ranges[index] <= range_max
min_range()float or NoneMinimum valid range reading (None if all invalid)

Validation pattern:

def lidar_tick(node):
    scan = node.recv("scan")
    if scan is None:
        return

    # Find closest obstacle
    min_dist = scan.min_range()
    if min_dist is not None and min_dist < 0.5:
        node.log_warning(f"Obstacle at {min_dist:.2f}m!")
        node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.0))
        return

    # Count valid readings — use len(), not .valid_count()
    valid = len(scan)
    node.log_debug(f"{valid}/360 valid readings")

Typical sensor configurations:

SensorRangesangle_minangle_maxrange_max
RPLiDAR A1360012m
RPLiDAR A2360018m
Hokuyo UTM-30LX1081-π/2π/230m

Common pitfalls:

  • ranges[i] == 0.0 means invalid/no return — NOT zero distance
  • Always check is_range_valid(i) before using a reading
  • Use len(scan) to get the valid reading count — there is no valid_count() method
  • angle_at(i) assumes uniform spacing — verify angle_increment matches your sensor
  • There is no scan_time field in Python

ROS2 equivalent: sensor_msgs/msg/LaserScan


Odometry

Robot odometry — pose + velocity as flat scalar fields.

import horus

odom = horus.Odometry(
    x=1.0,                # m — position X
    y=2.0,                # m — position Y
    theta=0.5,            # rad — heading
    linear_velocity=0.5,  # m/s — forward speed
    angular_velocity=0.1, # rad/s — turn rate
    timestamp_ns=0,       # optional
)

print(f"Position: ({odom.x}, {odom.y})")
print(f"Heading: {odom.theta:.2f} rad")
print(f"Speed: {odom.linear_velocity:.2f} m/s")

Constructor: Odometry(x=0.0, y=0.0, theta=0.0, linear_velocity=0.0, angular_velocity=0.0, timestamp_ns=0)

All parameters are optional keyword arguments with defaults of 0.0.

Fields:

FieldTypeUnitDescription
xfloatmPosition X
yfloatmPosition Y
thetafloatradHeading (orientation)
linear_velocityfloatm/sForward speed
angular_velocityfloatrad/sTurn rate
timestamp_nsintnsNanoseconds since epoch

Common pitfalls:

  • Odometry uses flat scalar fields — there are no nested Pose2D or Twist objects
  • theta is in radians, not degrees

ROS2 equivalent: nav_msgs/msg/Odometry


GPS/GNSS fix — latitude, longitude, altitude.

import horus

fix = horus.NavSatFix(
    latitude=37.7749,      # degrees
    longitude=-122.4194,   # degrees
    altitude=10.5,         # meters above WGS84 ellipsoid
    timestamp_ns=0,        # optional
)

print(f"Position: {fix.latitude}, {fix.longitude}")
print(f"Satellites: {fix.satellites_visible}")
print(f"HDOP: {fix.hdop}")

Constructor: NavSatFix(latitude=0.0, longitude=0.0, altitude=0.0, timestamp_ns=0)

All parameters are optional keyword arguments.

Fields:

FieldTypeUnitDescription
latitudefloatdegWGS84 latitude
longitudefloatdegWGS84 longitude
altitudefloatmHeight above ellipsoid
timestamp_nsintnsNanoseconds since epoch

Additional getters:

GetterTypeDescription
satellites_visibleintNumber of visible satellites
hdopfloatHorizontal dilution of precision
speedfloatGround speed
headingfloatCourse heading
statusintFix status (0=no fix, 1=fix, 2=DGPS, 3=RTK)

ROS2 equivalent: sensor_msgs/msg/NavSatFix


BatteryState

Battery monitoring — voltage, current, charge level.

import horus

battery = horus.BatteryState(
    voltage=12.4,              # V
    percentage=0.75,           # 0.0-1.0
    current=-2.5,              # A (negative = discharging)
    temperature=35.0,          # °C (default: 25.0)
    power_supply_status=1,     # u8 status code
    timestamp_ns=0,            # optional
)

if battery.percentage < 0.2:
    print("LOW BATTERY WARNING")

# Check power supply status (u8), NOT a bool
print(f"Status code: {battery.power_supply_status}")

Constructor: BatteryState(voltage=0.0, percentage=0.0, current=0.0, temperature=25.0, power_supply_status=0, timestamp_ns=0)

All parameters are optional keyword arguments. Note that temperature defaults to 25.0 (room temperature).

Fields:

FieldTypeUnitDescription
voltagefloatVBattery voltage
percentagefloat0-1Charge level (0.0 = empty, 1.0 = full)
currentfloatACurrent (negative = discharging)
temperaturefloat°CBattery temperature (default: 25.0)
power_supply_statusintPower supply status code (u8)
timestamp_nsintnsNanoseconds since epoch

Common pitfalls:

  • There is NO is_charging field — use power_supply_status (a u8 integer) instead
  • temperature defaults to 25.0, not 0.0

ROS2 equivalent: sensor_msgs/msg/BatteryState


RangeSensor

Single-point distance sensor (ultrasonic, IR, ToF).

import horus

dist = horus.RangeSensor(
    range=1.5,           # m — measured distance
    sensor_type=0,       # sensor type code
    field_of_view=0.26,  # rad (~15°)
    min_range=0.02,      # m (default: 0.02)
    max_range=4.0,       # m (default: 4.0)
    timestamp_ns=0,      # optional
)

Constructor: RangeSensor(range=0.0, sensor_type=0, field_of_view=0.1, min_range=0.02, max_range=4.0, timestamp_ns=0)

All parameters are optional keyword arguments.

Fields:

FieldTypeUnitDescription
rangefloatmMeasured distance
sensor_typeintSensor type code
field_of_viewfloatradSensor beam width (default: 0.1)
min_rangefloatmMinimum valid range (default: 0.02)
max_rangefloatmMaximum valid range (default: 4.0)
timestamp_nsintnsNanoseconds since epoch

JointState

Joint positions, velocities, and efforts for articulated robots.

import horus

state = horus.JointState(
    names=["shoulder", "elbow", "wrist", "gripper", "j5", "j6"],
    positions=[0.0, 0.5, -0.3, 1.2, 0.0, 0.0],
    velocities=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    efforts=[0.1, 0.5, 0.3, 0.8, 0.0, 0.0],
    timestamp_ns=0,  # optional
)

print(f"Joint '{state.names[1]}' at {state.positions[1]:.2f} rad")

Constructor: JointState(names=None, positions=None, velocities=None, efforts=None, timestamp_ns=0)

All parameters are optional keyword arguments. names is the first parameter.

Fields:

FieldTypeUnitDescription
nameslist[str]Joint names
positionslist[float]radJoint positions
velocitieslist[float]rad/sJoint velocities
effortslist[float]NmJoint torques/forces
timestamp_nsintnsNanoseconds since epoch

ROS2 equivalent: sensor_msgs/msg/JointState


Environmental Sensors

Temperature

temp = horus.Temperature(temperature=22.5, variance=0.1, timestamp_ns=0)

Constructor: Temperature(temperature=0.0, variance=0.0, timestamp_ns=0)

FieldTypeUnitDescription
temperaturefloat°CTemperature reading
variancefloatMeasurement variance
timestamp_nsintnsNanoseconds since epoch

MagneticField

mag = horus.MagneticField(x=0.00002, y=0.00005, z=-0.00004, timestamp_ns=0)  # Tesla

Constructor: MagneticField(x=0.0, y=0.0, z=0.0, timestamp_ns=0)

FieldTypeUnitDescription
x, y, zfloatTMagnetic field components
timestamp_nsintnsNanoseconds since epoch

FluidPressure

pressure = horus.FluidPressure(pressure=101325.0, variance=10.0, timestamp_ns=0)  # Pascals

Constructor: FluidPressure(pressure=0.0, variance=0.0, timestamp_ns=0)

FieldTypeUnitDescription
pressurefloatPaFluid pressure
variancefloatMeasurement variance
timestamp_nsintnsNanoseconds since epoch

Illuminance

lux = horus.Illuminance(illuminance=500.0, variance=5.0, timestamp_ns=0)  # Lux

Constructor: Illuminance(illuminance=0.0, variance=0.0, timestamp_ns=0)

FieldTypeUnitDescription
illuminancefloatlxIlluminance reading
variancefloatMeasurement variance
timestamp_nsintnsNanoseconds since epoch

Example: IMU Sensor Node

import horus

def imu_tick(node):
    imu = read_hardware()  # Your I2C/SPI driver

    # Validate before publishing
    if not imu.is_valid():
        return

    node.send("imu.raw", imu)

sensor = horus.Node(
    name="imu_reader",
    pubs=[horus.Imu],
    tick=imu_tick,
    rate=100,
    order=0,
)
horus.run(sensor, rt=True)

See Also