Input & Audio Messages

Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.

from horus import JoystickInput, KeyboardInput, AudioFrame

JoystickInput

Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.

Factory Methods

# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)

# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)

# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)

# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)

Event Type Queries — Dispatch Pattern

The standard way to handle mixed joystick events is to check the type and dispatch:

def handle_input(joy):
    if joy.is_button():
        if joy.pressed and joy.element_name == "A":
            trigger_action()
        elif joy.element_name == "B":
            cancel_action()

    elif joy.is_axis():
        if "stick_x" in joy.element_name:
            steer(joy.value)
        elif "trigger" in joy.element_name:
            throttle(joy.value)

    elif joy.is_hat():
        handle_dpad(joy.value)

    elif joy.is_connection_event():
        if joy.is_connected():
            print("Controller connected")
        else:
            print("Controller disconnected — engaging e-stop!")

.is_button() — Is This a Button Event?

if joy.is_button():
    print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")

.is_axis() — Is This an Axis Event?

if joy.is_axis():
    print(f"Axis {joy.element_name}: {joy.value:.2f}")

Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).

.is_hat() — Is This a Hat/D-pad Event?

if joy.is_hat():
    print(f"D-pad direction: {joy.value}")

.is_connection_event() — Is This a Hotplug Event?

if joy.is_connection_event():
    connected = joy.is_connected()

Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.

.is_connected() — Is the Controller Plugged In?

if joy.is_connection_event() and not joy.is_connected():
    estop_topic.send(EmergencyStop.engage("Controller disconnected"))

Example — Gamepad Teleoperation:

from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic

joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)

speed_scale = 1.0
linear = 0.0
angular = 0.0

def teleop(node):
    global speed_scale, linear, angular
    joy = joy_topic.recv(node)
    if joy is None:
        return

    if joy.is_axis():
        name = joy.element_name
        if "left_stick_y" in name:
            linear = -joy.value * speed_scale    # Forward/backward
        elif "left_stick_x" in name:
            angular = -joy.value * speed_scale   # Turn left/right
        elif "right_trigger" in name:
            speed_scale = 0.5 + joy.value * 1.5  # Speed boost

    elif joy.is_button() and joy.pressed:
        if joy.element_name == "A":
            estop_topic.send(EmergencyStop.engage("Operator e-stop"))
            linear = angular = 0.0

    elif joy.is_connection_event() and not joy.is_connected():
        estop_topic.send(EmergencyStop.engage("Controller disconnected"))
        linear = angular = 0.0
        return

    cmd_topic.send(CmdVel(linear=linear, angular=angular), node)

run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))

KeyboardInput

Keyboard events with modifier key detection.

Constructor

key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)

.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks

if key.is_ctrl() and key.key_name == "S":
    save_map()
elif key.is_ctrl() and key.key_name == "Q":
    shutdown()
elif key.key_name == "space" and key.pressed:
    toggle_estop()

These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.

.pressed — Key State

if key.pressed:
    print(f"Key down: {key.key_name}")
else:
    print(f"Key up: {key.key_name}")

Example — Keyboard Shortcuts:

from horus import KeyboardInput, Topic

key_topic = Topic(KeyboardInput)

def handle_keys(node):
    key = key_topic.recv(node)
    if key is None or not key.pressed:
        return
    if key.key_name == "space":
        toggle_pause()
    elif key.is_ctrl() and key.key_name == "Z":
        undo()
    elif key.key_name == "escape":
        emergency_stop()

AudioFrame

Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.

.mono(sample_rate, samples) — Single Channel

frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])

16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.

.stereo(sample_rate, samples) — Two Channels

# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)

.multi_channel(sample_rate, channels, samples) — Microphone Array

# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)

Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.

.duration_ms() — Audio Duration

print(f"Frame duration: {frame.duration_ms():.1f} ms")

Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.

.frame_count() — Number of Samples per Channel

print(f"Samples per channel: {frame.frame_count()}")

Example — Audio Recording Node:

from horus import Node, run, AudioFrame, Topic

audio_topic = Topic(AudioFrame)
recorded_samples = []

def record_audio(node):
    frame = audio_topic.recv(node)
    if frame is not None:
        recorded_samples.extend(frame.samples)
        if frame.duration_ms() > 0:
            total_seconds = len(recorded_samples) / frame.sample_rate
            if total_seconds > 10:
                print(f"Recorded {total_seconds:.1f}s of audio")

See Also