HORUS Python Bindings
🚧 Under Development: HORUS Python bindings are currently under active development. Features and APIs may change. Check back for updates!
Simple & Intuitive Python API for the HORUS robotics framework - makes creating distributed robotic systems as easy as writing a function.
Why HORUS Python?
- Zero Boilerplate: Working node in 10 lines
- Functional API: No class inheritance required
- Production Performance: ~500ns latency (same shared memory as Rust)
- Pythonic: Feels like native Python, not wrapped C++
- Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.
Quick Start
Installation
Automatic (Recommended)
Python bindings are automatically installed when you run the HORUS installer:
# From HORUS root directory
./install.sh
The installer will detect Python 3.9+ and automatically build and install the bindings.
Manual Installation
If you prefer to install manually or need to rebuild:
# Install maturin (Python/Rust build tool)
pip install maturin
# Build and install from source
cd horus_py
maturin develop --release
Requirements:
- Python 3.9+
- Rust 1.70+
- Linux (for shared memory support)
Minimal Example
import horus
def process(node):
node.send("output", "Hello HORUS!")
node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)
That's it! No classes, no boilerplate, just pure logic.
Core API
Creating a Node
node = horus.Node(
name="my_node", # Optional: auto-generated if not provided
pubs=["topic1", "topic2"], # Topics to publish to
subs=["input1", "input2"], # Topics to subscribe to
tick=my_function, # Function called repeatedly
rate=30, # Hz (default: 30)
init=setup_fn, # Optional: called once at start
shutdown=cleanup_fn # Optional: called once at end
)
Parameters:
name(str, optional): Node name (auto-generated if omitted)pubs(str | list[str], optional): Topics to publish tosubs(str | list[str], optional): Topics to subscribe fromtick(callable): Function called each cyclerate(int, optional): Execution rate in Hz (default: 30)init(callable, optional): Setup functionshutdown(callable, optional): Cleanup function
Node Functions
Your tick function receives the node as a parameter:
def my_tick(node):
# Check for messages
if node.has_msg("input"):
data = node.get("input") # Get one message
# Get all messages
all_msgs = node.get_all("input")
# Send messages
node.send("output", {"value": 42})
Node Methods:
node.send(topic, data)- Publish messagenode.get(topic)- Get one message (returns None if empty)node.get_all(topic)- Get all messages as listnode.has_msg(topic)- Check if messages available
Running Nodes
# Single node
horus.run(node)
# Multiple nodes
horus.run(node1, node2, node3, duration=10)
Examples
1. Simple Publisher
import horus
def publish_temperature(node):
node.send("temperature", 25.5)
sensor = horus.Node(
name="temp_sensor",
pubs="temperature",
tick=publish_temperature,
rate=1 # 1 Hz
)
horus.run(sensor, duration=10)
2. Subscriber
import horus
def display_temperature(node):
if node.has_msg("temperature"):
temp = node.get("temperature")
print(f"Temperature: {temp}°C")
display = horus.Node(
name="display",
subs="temperature",
tick=display_temperature
)
horus.run(display)
3. Pub/Sub Pipeline
import horus
def publish(node):
node.send("raw", 42.0)
def process(node):
if node.has_msg("raw"):
data = node.get("raw")
result = data * 2.0
node.send("processed", result)
def display(node):
if node.has_msg("processed"):
value = node.get("processed")
print(f"Result: {value}")
# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)
# Run all together
horus.run(publisher, processor, displayer, duration=5)
4. Using Lambda Functions
import horus
# Producer (inline)
producer = horus.Node(
pubs="numbers",
tick=lambda n: n.send("numbers", 42),
rate=1
)
# Transformer (inline)
doubler = horus.Node(
subs="numbers",
pubs="doubled",
tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)
horus.run(producer, doubler, duration=5)
5. Multi-Topic Robot Controller
import horus
def robot_controller(node):
# Read from multiple sensors
lidar_data = None
camera_data = None
if node.has_msg("lidar"):
lidar_data = node.get("lidar")
if node.has_msg("camera"):
camera_data = node.get("camera")
# Compute commands
if lidar_data and camera_data:
cmd = compute_navigation(lidar_data, camera_data)
node.send("motors", cmd)
node.send("status", "navigating")
robot = horus.Node(
name="robot_controller",
subs=["lidar", "camera"],
pubs=["motors", "status"],
tick=robot_controller,
rate=50 # 50Hz control loop
)
6. Lifecycle Management
import horus
class Context:
def __init__(self):
self.count = 0
self.file = None
ctx = Context()
def init_handler(node):
print("Starting up!")
ctx.file = open("data.txt", "w")
def tick_handler(node):
ctx.count += 1
data = f"Tick {ctx.count}"
node.send("data", data)
ctx.file.write(data + "\n")
def shutdown_handler(node):
print(f"Processed {ctx.count} messages")
ctx.file.close()
node = horus.Node(
pubs="data",
init=init_handler,
tick=tick_handler,
shutdown=shutdown_handler,
rate=10
)
horus.run(node, duration=5)
Quick Helpers
HORUS Python provides convenience functions for common patterns:
Transform Node
# One-liner transform
doubler = horus.quick(
sub="input",
pub="output",
fn=lambda x: x * 2
)
horus.run(doubler)
Pipe
# Simple processing pipeline
horus.pipe("input", "output", lambda x: x ** 2)
Echo
# Mirror topic to another
horus.echo("sensor_raw", "sensor_backup")
Filter
# Filter messages
horus.filter_node("all_data", "positive_only", lambda x: x > 0)
Fanout (Broadcast)
# Send to multiple topics
horus.fanout("sensor", ["log", "display", "storage"])
Merge
# Combine multiple inputs
horus.merge(["sensor1", "sensor2", "sensor3"], "all_sensors")
Integration with Python Ecosystem
NumPy Integration
import horus
import numpy as np
def process_array(node):
if node.has_msg("raw_data"):
data = node.get("raw_data")
# Convert to NumPy array
arr = np.array(data)
# Process with NumPy
result = np.fft.fft(arr)
node.send("fft_result", result.tolist())
processor = horus.Node(
subs="raw_data",
pubs="fft_result",
tick=process_array
)
OpenCV Integration
import horus
import cv2
import numpy as np
def process_image(node):
if node.has_msg("camera"):
img_data = node.get("camera")
# Convert to OpenCV format
img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))
# Apply OpenCV processing
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Publish result
node.send("edges", edges.flatten().tolist())
vision = horus.Node(
subs="camera",
pubs="edges",
tick=process_image,
rate=30
)
scikit-learn Integration
import horus
from sklearn.linear_model import LinearRegression
import numpy as np
model = LinearRegression()
def train_model(node):
if node.has_msg("training_data"):
data = node.get("training_data")
X = np.array(data['features'])
y = np.array(data['labels'])
# Train model
model.fit(X, y)
score = model.score(X, y)
node.send("model_score", score)
trainer = horus.Node(
subs="training_data",
pubs="model_score",
tick=train_model
)
Advanced Patterns
State Management
import horus
class RobotState:
def __init__(self):
self.position = {"x": 0.0, "y": 0.0}
self.velocity = 0.0
self.last_update = 0
state = RobotState()
def update_state(node):
if node.has_msg("velocity"):
state.velocity = node.get("velocity")
if node.has_msg("position"):
state.position = node.get("position")
# Publish combined state
node.send("robot_state", {
"pos": state.position,
"vel": state.velocity
})
state_manager = horus.Node(
subs=["velocity", "position"],
pubs="robot_state",
tick=update_state
)
Rate Limiting
import horus
import time
class RateLimiter:
def __init__(self, min_interval):
self.min_interval = min_interval
self.last_send = 0
limiter = RateLimiter(min_interval=0.1) # 100ms minimum
def rate_limited_publish(node):
current_time = time.time()
if current_time - limiter.last_send >= limiter.min_interval:
node.send("output", "data")
limiter.last_send = current_time
node = horus.Node(
pubs="output",
tick=rate_limited_publish,
rate=100 # Node runs at 100Hz, but publishes at max 10Hz
)
Error Handling
import horus
def safe_processing(node):
try:
if node.has_msg("input"):
data = node.get("input")
result = risky_operation(data)
node.send("output", result)
except Exception as e:
node.send("errors", str(e))
print(f"Error: {e}")
processor = horus.Node(
subs="input",
pubs=["output", "errors"],
tick=safe_processing
)
Performance Tips
1. Use Appropriate Tick Rates
# High-frequency control loop
controller = horus.Node(tick=control_fn, rate=100) # 100Hz
# Low-frequency monitoring
monitor = horus.Node(tick=monitor_fn, rate=1) # 1Hz
2. Batch Processing
def batch_processor(node):
# Process all available messages
messages = node.get_all("input")
if messages:
results = [process(msg) for msg in messages]
for result in results:
node.send("output", result)
3. Keep tick() Fast
# GOOD: Fast tick
def good_tick(node):
if node.has_msg("input"):
data = node.get("input")
result = quick_operation(data)
node.send("output", result)
# BAD: Slow tick
def bad_tick(node):
time.sleep(1) # Don't block!
data = requests.get("http://api.example.com") # Don't do I/O!
4. Offload Heavy Processing
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def heavy_processing_node(node):
if node.has_msg("input"):
data = node.get("input")
# Offload to thread pool
future = executor.submit(expensive_operation, data)
# Don't block - check result later or use callback
Development
Building from Source
# Debug build (fast compile, slow runtime)
maturin develop
# Release build (slow compile, fast runtime)
maturin develop --release
# Build wheel for distribution
maturin build --release
Running Tests
# Install test dependencies
pip install pytest
# Run tests
pytest tests/
# With coverage
pytest --cov=horus tests/
Mock Mode
HORUS Python includes a mock mode for testing without Rust bindings:
# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."
# Use for unit testing Python logic without HORUS running
Interoperability
With Rust Nodes
Python and Rust nodes communicate seamlessly:
# Python node publishes
node.send("cmd_vel", {"linear": 1.0, "angular": 0.5})
// Rust node receives
let cmd: CmdVel = hub.recv(ctx)?;
With C Drivers
# Python processing node
def process_lidar(node):
if node.has_msg("laser_scan"):
scan = node.get("laser_scan") # From C driver
processed = process_scan(scan)
node.send("obstacles", processed)
// C hardware driver
send(scan_pub, &laser_scan); // Python receives this
Common Patterns
Producer-Consumer
# Producer
producer = horus.Node(
pubs="queue",
tick=lambda n: n.send("queue", generate_work())
)
# Consumer
consumer = horus.Node(
subs="queue",
tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)
horus.run(producer, consumer)
Request-Response
def request_node(node):
node.send("requests", {"id": 1, "query": "data"})
def response_node(node):
if node.has_msg("requests"):
req = node.get("requests")
response = handle_request(req)
node.send("responses", response)
req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)
Periodic Tasks
import time
class PeriodicTask:
def __init__(self, interval):
self.interval = interval
self.last_run = 0
task = PeriodicTask(interval=5.0) # Every 5 seconds
def periodic_tick(node):
current = time.time()
if current - task.last_run >= task.interval:
node.send("periodic", "task_executed")
task.last_run = current
node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)
Troubleshooting
Import Errors
# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release
Slow Performance
# Use release build (not debug)
maturin develop --release
# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30) # 30Hz is reasonable
Memory Issues
# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
all_data.append(node.get("input")) # Memory leak!
# GOOD:
def good_tick(node):
data = node.get("input")
process_and_discard(data) # Process immediately
See Also
- Examples - More code examples
- Core Concepts - Understanding HORUS architecture
- Performance - Optimization guide
- C Bindings - C API documentation
Remember: With HORUS Python, you focus on what your robot does, not how the framework works!