Input & Audio Messages
Input messages handle human interaction — gamepad teleoperation, keyboard shortcuts, and audio capture. These are the human interface layer of your robot.
from horus import JoystickInput, KeyboardInput, AudioFrame
JoystickInput
Gamepad/joystick events with typed factories for each event kind. The factory methods create properly-typed events without you needing to remember raw integer constants.
Factory Methods
# Button press
btn = JoystickInput.new_button(joystick_id=0, button_id=1, name="A", pressed=True)
# Analog axis (stick/trigger) — value range: -1.0 to 1.0 for sticks, 0.0 to 1.0 for triggers
axis = JoystickInput.new_axis(joystick_id=0, axis_id=0, name="left_stick_x", value=0.75)
# D-pad/hat switch
hat = JoystickInput.new_hat(joystick_id=0, hat_id=0, name="dpad", value=1.0)
# Controller connected/disconnected
conn = JoystickInput.new_connection(joystick_id=0, connected=True)
Event Type Queries — Dispatch Pattern
The standard way to handle mixed joystick events is to check the type and dispatch:
def handle_input(joy):
if joy.is_button():
if joy.pressed and joy.element_name == "A":
trigger_action()
elif joy.element_name == "B":
cancel_action()
elif joy.is_axis():
if "stick_x" in joy.element_name:
steer(joy.value)
elif "trigger" in joy.element_name:
throttle(joy.value)
elif joy.is_hat():
handle_dpad(joy.value)
elif joy.is_connection_event():
if joy.is_connected():
print("Controller connected")
else:
print("Controller disconnected — engaging e-stop!")
.is_button() — Is This a Button Event?
if joy.is_button():
print(f"Button {joy.element_name}: {'pressed' if joy.pressed else 'released'}")
.is_axis() — Is This an Axis Event?
if joy.is_axis():
print(f"Axis {joy.element_name}: {joy.value:.2f}")
Axis values are typically -1.0 to 1.0 for sticks (center = 0.0) and 0.0 to 1.0 for triggers (released = 0.0).
.is_hat() — Is This a Hat/D-pad Event?
if joy.is_hat():
print(f"D-pad direction: {joy.value}")
.is_connection_event() — Is This a Hotplug Event?
if joy.is_connection_event():
connected = joy.is_connected()
Handle controller hotplug — if the operator's gamepad disconnects mid-mission, you should trigger an emergency stop.
.is_connected() — Is the Controller Plugged In?
if joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
Example — Gamepad Teleoperation:
from horus import Node, run, JoystickInput, CmdVel, EmergencyStop, Topic
joy_topic = Topic(JoystickInput)
cmd_topic = Topic(CmdVel)
estop_topic = Topic(EmergencyStop)
speed_scale = 1.0
linear = 0.0
angular = 0.0
def teleop(node):
global speed_scale, linear, angular
joy = joy_topic.recv(node)
if joy is None:
return
if joy.is_axis():
name = joy.element_name
if "left_stick_y" in name:
linear = -joy.value * speed_scale # Forward/backward
elif "left_stick_x" in name:
angular = -joy.value * speed_scale # Turn left/right
elif "right_trigger" in name:
speed_scale = 0.5 + joy.value * 1.5 # Speed boost
elif joy.is_button() and joy.pressed:
if joy.element_name == "A":
estop_topic.send(EmergencyStop.engage("Operator e-stop"))
linear = angular = 0.0
elif joy.is_connection_event() and not joy.is_connected():
estop_topic.send(EmergencyStop.engage("Controller disconnected"))
linear = angular = 0.0
return
cmd_topic.send(CmdVel(linear=linear, angular=angular), node)
run(Node(tick=teleop, rate=50, pubs=["cmd_vel", "estop"], subs=["joystick"]))
KeyboardInput
Keyboard events with modifier key detection.
Constructor
key = KeyboardInput(key_name="A", code=65, pressed=True, modifiers=0)
.is_ctrl() / .is_shift() / .is_alt() — Modifier Checks
if key.is_ctrl() and key.key_name == "S":
save_map()
elif key.is_ctrl() and key.key_name == "Q":
shutdown()
elif key.key_name == "space" and key.pressed:
toggle_estop()
These check the modifier bit flags. Use them for keyboard shortcuts in operator consoles and development tools.
.pressed — Key State
if key.pressed:
print(f"Key down: {key.key_name}")
else:
print(f"Key up: {key.key_name}")
Example — Keyboard Shortcuts:
from horus import KeyboardInput, Topic
key_topic = Topic(KeyboardInput)
def handle_keys(node):
key = key_topic.recv(node)
if key is None or not key.pressed:
return
if key.key_name == "space":
toggle_pause()
elif key.is_ctrl() and key.key_name == "Z":
undo()
elif key.key_name == "escape":
emergency_stop()
AudioFrame
Audio data from microphones or audio sources. Factory methods handle channel layout — use mono() for single-mic, stereo() for stereo pair, multi_channel() for microphone arrays.
.mono(sample_rate, samples) — Single Channel
frame = AudioFrame.mono(sample_rate=16000, samples=[0.1, 0.2, -0.1, 0.0])
16kHz is standard for speech recognition. 44.1kHz or 48kHz for music-quality audio.
.stereo(sample_rate, samples) — Two Channels
# Interleaved L/R samples: [L0, R0, L1, R1, ...]
frame = AudioFrame.stereo(sample_rate=48000, samples=interleaved_data)
.multi_channel(sample_rate, channels, samples) — Microphone Array
# 4-channel microphone array at 16kHz
frame = AudioFrame.multi_channel(sample_rate=16000, channels=4, samples=array_data)
Used for sound source localization (beamforming) — determining which direction a sound comes from using multiple microphones.
.duration_ms() — Audio Duration
print(f"Frame duration: {frame.duration_ms():.1f} ms")
Computed from sample count and sample rate. Typical frame durations: 10-50ms for real-time processing, 100-500ms for batch processing.
.frame_count() — Number of Samples per Channel
print(f"Samples per channel: {frame.frame_count()}")
Example — Audio Recording Node:
from horus import Node, run, AudioFrame, Topic
audio_topic = Topic(AudioFrame)
recorded_samples = []
def record_audio(node):
frame = audio_topic.recv(node)
if frame is not None:
recorded_samples.extend(frame.samples)
if frame.duration_ms() > 0:
total_seconds = len(recorded_samples) / frame.sample_rate
if total_seconds > 10:
print(f"Recorded {total_seconds:.1f}s of audio")
See Also
- Control Messages — CmdVel for translating joystick input to velocity
- Diagnostics Messages — EmergencyStop for safety
- Rust Input Messages — Rust API reference