HORUS Python Bindings

🚧 Under Development: HORUS Python bindings are currently under active development. Features and APIs may change. Check back for updates!

Simple & Intuitive Python API for the HORUS robotics framework - makes creating distributed robotic systems as easy as writing a function.

Why HORUS Python?

  • Zero Boilerplate: Working node in 10 lines
  • Functional API: No class inheritance required
  • Production Performance: ~500ns latency (same shared memory as Rust)
  • Pythonic: Feels like native Python, not wrapped C++
  • Rich Ecosystem: Use NumPy, OpenCV, scikit-learn, etc.

Quick Start

Installation

Automatic (Recommended)

Python bindings are automatically installed when you run the HORUS installer:

# From HORUS root directory
./install.sh

The installer will detect Python 3.9+ and automatically build and install the bindings.

Manual Installation

If you prefer to install manually or need to rebuild:

# Install maturin (Python/Rust build tool)
pip install maturin

# Build and install from source
cd horus_py
maturin develop --release

Requirements:

  • Python 3.9+
  • Rust 1.70+
  • Linux (for shared memory support)

Minimal Example

import horus

def process(node):
    node.send("output", "Hello HORUS!")

node = horus.Node(pubs="output", tick=process, rate=1)
horus.run(node, duration=3)

That's it! No classes, no boilerplate, just pure logic.


Core API

Creating a Node

node = horus.Node(
    name="my_node",            # Optional: auto-generated if not provided
    pubs=["topic1", "topic2"], # Topics to publish to
    subs=["input1", "input2"], # Topics to subscribe to
    tick=my_function,          # Function called repeatedly
    rate=30,                   # Hz (default: 30)
    init=setup_fn,             # Optional: called once at start
    shutdown=cleanup_fn        # Optional: called once at end
)

Parameters:

  • name (str, optional): Node name (auto-generated if omitted)
  • pubs (str | list[str], optional): Topics to publish to
  • subs (str | list[str], optional): Topics to subscribe from
  • tick (callable): Function called each cycle
  • rate (int, optional): Execution rate in Hz (default: 30)
  • init (callable, optional): Setup function
  • shutdown (callable, optional): Cleanup function

Node Functions

Your tick function receives the node as a parameter:

def my_tick(node):
    # Check for messages
    if node.has_msg("input"):
        data = node.get("input")  # Get one message

    # Get all messages
    all_msgs = node.get_all("input")

    # Send messages
    node.send("output", {"value": 42})

Node Methods:

  • node.send(topic, data) - Publish message
  • node.get(topic) - Get one message (returns None if empty)
  • node.get_all(topic) - Get all messages as list
  • node.has_msg(topic) - Check if messages available

Running Nodes

# Single node
horus.run(node)

# Multiple nodes
horus.run(node1, node2, node3, duration=10)

Examples

1. Simple Publisher

import horus

def publish_temperature(node):
    node.send("temperature", 25.5)

sensor = horus.Node(
    name="temp_sensor",
    pubs="temperature",
    tick=publish_temperature,
    rate=1  # 1 Hz
)

horus.run(sensor, duration=10)

2. Subscriber

import horus

def display_temperature(node):
    if node.has_msg("temperature"):
        temp = node.get("temperature")
        print(f"Temperature: {temp}°C")

display = horus.Node(
    name="display",
    subs="temperature",
    tick=display_temperature
)

horus.run(display)

3. Pub/Sub Pipeline

import horus

def publish(node):
    node.send("raw", 42.0)

def process(node):
    if node.has_msg("raw"):
        data = node.get("raw")
        result = data * 2.0
        node.send("processed", result)

def display(node):
    if node.has_msg("processed"):
        value = node.get("processed")
        print(f"Result: {value}")

# Create pipeline
publisher = horus.Node("publisher", pubs="raw", tick=publish, rate=1)
processor = horus.Node("processor", subs="raw", pubs="processed", tick=process)
displayer = horus.Node("display", subs="processed", tick=display)

# Run all together
horus.run(publisher, processor, displayer, duration=5)

4. Using Lambda Functions

import horus

# Producer (inline)
producer = horus.Node(
    pubs="numbers",
    tick=lambda n: n.send("numbers", 42),
    rate=1
)

# Transformer (inline)
doubler = horus.Node(
    subs="numbers",
    pubs="doubled",
    tick=lambda n: n.send("doubled", n.get("numbers") * 2) if n.has_msg("numbers") else None
)

horus.run(producer, doubler, duration=5)

5. Multi-Topic Robot Controller

import horus

def robot_controller(node):
    # Read from multiple sensors
    lidar_data = None
    camera_data = None

    if node.has_msg("lidar"):
        lidar_data = node.get("lidar")

    if node.has_msg("camera"):
        camera_data = node.get("camera")

    # Compute commands
    if lidar_data and camera_data:
        cmd = compute_navigation(lidar_data, camera_data)
        node.send("motors", cmd)
        node.send("status", "navigating")

robot = horus.Node(
    name="robot_controller",
    subs=["lidar", "camera"],
    pubs=["motors", "status"],
    tick=robot_controller,
    rate=50  # 50Hz control loop
)

6. Lifecycle Management

import horus

class Context:
    def __init__(self):
        self.count = 0
        self.file = None

ctx = Context()

def init_handler(node):
    print("Starting up!")
    ctx.file = open("data.txt", "w")

def tick_handler(node):
    ctx.count += 1
    data = f"Tick {ctx.count}"
    node.send("data", data)
    ctx.file.write(data + "\n")

def shutdown_handler(node):
    print(f"Processed {ctx.count} messages")
    ctx.file.close()

node = horus.Node(
    pubs="data",
    init=init_handler,
    tick=tick_handler,
    shutdown=shutdown_handler,
    rate=10
)

horus.run(node, duration=5)

Quick Helpers

HORUS Python provides convenience functions for common patterns:

Transform Node

# One-liner transform
doubler = horus.quick(
    sub="input",
    pub="output",
    fn=lambda x: x * 2
)
horus.run(doubler)

Pipe

# Simple processing pipeline
horus.pipe("input", "output", lambda x: x ** 2)

Echo

# Mirror topic to another
horus.echo("sensor_raw", "sensor_backup")

Filter

# Filter messages
horus.filter_node("all_data", "positive_only", lambda x: x > 0)

Fanout (Broadcast)

# Send to multiple topics
horus.fanout("sensor", ["log", "display", "storage"])

Merge

# Combine multiple inputs
horus.merge(["sensor1", "sensor2", "sensor3"], "all_sensors")

Integration with Python Ecosystem

NumPy Integration

import horus
import numpy as np

def process_array(node):
    if node.has_msg("raw_data"):
        data = node.get("raw_data")
        # Convert to NumPy array
        arr = np.array(data)
        # Process with NumPy
        result = np.fft.fft(arr)
        node.send("fft_result", result.tolist())

processor = horus.Node(
    subs="raw_data",
    pubs="fft_result",
    tick=process_array
)

OpenCV Integration

import horus
import cv2
import numpy as np

def process_image(node):
    if node.has_msg("camera"):
        img_data = node.get("camera")
        # Convert to OpenCV format
        img = np.array(img_data, dtype=np.uint8).reshape((480, 640, 3))

        # Apply OpenCV processing
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        # Publish result
        node.send("edges", edges.flatten().tolist())

vision = horus.Node(
    subs="camera",
    pubs="edges",
    tick=process_image,
    rate=30
)

scikit-learn Integration

import horus
from sklearn.linear_model import LinearRegression
import numpy as np

model = LinearRegression()

def train_model(node):
    if node.has_msg("training_data"):
        data = node.get("training_data")
        X = np.array(data['features'])
        y = np.array(data['labels'])

        # Train model
        model.fit(X, y)
        score = model.score(X, y)

        node.send("model_score", score)

trainer = horus.Node(
    subs="training_data",
    pubs="model_score",
    tick=train_model
)

Advanced Patterns

State Management

import horus

class RobotState:
    def __init__(self):
        self.position = {"x": 0.0, "y": 0.0}
        self.velocity = 0.0
        self.last_update = 0

state = RobotState()

def update_state(node):
    if node.has_msg("velocity"):
        state.velocity = node.get("velocity")

    if node.has_msg("position"):
        state.position = node.get("position")

    # Publish combined state
    node.send("robot_state", {
        "pos": state.position,
        "vel": state.velocity
    })

state_manager = horus.Node(
    subs=["velocity", "position"],
    pubs="robot_state",
    tick=update_state
)

Rate Limiting

import horus
import time

class RateLimiter:
    def __init__(self, min_interval):
        self.min_interval = min_interval
        self.last_send = 0

limiter = RateLimiter(min_interval=0.1)  # 100ms minimum

def rate_limited_publish(node):
    current_time = time.time()

    if current_time - limiter.last_send >= limiter.min_interval:
        node.send("output", "data")
        limiter.last_send = current_time

node = horus.Node(
    pubs="output",
    tick=rate_limited_publish,
    rate=100  # Node runs at 100Hz, but publishes at max 10Hz
)

Error Handling

import horus

def safe_processing(node):
    try:
        if node.has_msg("input"):
            data = node.get("input")
            result = risky_operation(data)
            node.send("output", result)
    except Exception as e:
        node.send("errors", str(e))
        print(f"Error: {e}")

processor = horus.Node(
    subs="input",
    pubs=["output", "errors"],
    tick=safe_processing
)

Performance Tips

1. Use Appropriate Tick Rates

# High-frequency control loop
controller = horus.Node(tick=control_fn, rate=100)  # 100Hz

# Low-frequency monitoring
monitor = horus.Node(tick=monitor_fn, rate=1)  # 1Hz

2. Batch Processing

def batch_processor(node):
    # Process all available messages
    messages = node.get_all("input")
    if messages:
        results = [process(msg) for msg in messages]
        for result in results:
            node.send("output", result)

3. Keep tick() Fast

# GOOD: Fast tick
def good_tick(node):
    if node.has_msg("input"):
        data = node.get("input")
        result = quick_operation(data)
        node.send("output", result)

# BAD: Slow tick
def bad_tick(node):
    time.sleep(1)  # Don't block!
    data = requests.get("http://api.example.com")  # Don't do I/O!

4. Offload Heavy Processing

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def heavy_processing_node(node):
    if node.has_msg("input"):
        data = node.get("input")
        # Offload to thread pool
        future = executor.submit(expensive_operation, data)
        # Don't block - check result later or use callback

Development

Building from Source

# Debug build (fast compile, slow runtime)
maturin develop

# Release build (slow compile, fast runtime)
maturin develop --release

# Build wheel for distribution
maturin build --release

Running Tests

# Install test dependencies
pip install pytest

# Run tests
pytest tests/

# With coverage
pytest --cov=horus tests/

Mock Mode

HORUS Python includes a mock mode for testing without Rust bindings:

# If Rust bindings aren't available, automatically falls back to mock
# You'll see: "Warning: Rust bindings not available. Running in mock mode."

# Use for unit testing Python logic without HORUS running

Interoperability

With Rust Nodes

Python and Rust nodes communicate seamlessly:

# Python node publishes
node.send("cmd_vel", {"linear": 1.0, "angular": 0.5})
// Rust node receives
let cmd: CmdVel = hub.recv(ctx)?;

With C Drivers

# Python processing node
def process_lidar(node):
    if node.has_msg("laser_scan"):
        scan = node.get("laser_scan")  # From C driver
        processed = process_scan(scan)
        node.send("obstacles", processed)
// C hardware driver
send(scan_pub, &laser_scan);  // Python receives this

Common Patterns

Producer-Consumer

# Producer
producer = horus.Node(
    pubs="queue",
    tick=lambda n: n.send("queue", generate_work())
)

# Consumer
consumer = horus.Node(
    subs="queue",
    tick=lambda n: process_work(n.get("queue")) if n.has_msg("queue") else None
)

horus.run(producer, consumer)

Request-Response

def request_node(node):
    node.send("requests", {"id": 1, "query": "data"})

def response_node(node):
    if node.has_msg("requests"):
        req = node.get("requests")
        response = handle_request(req)
        node.send("responses", response)

req = horus.Node(pubs="requests", tick=request_node)
res = horus.Node(subs="requests", pubs="responses", tick=response_node)

Periodic Tasks

import time

class PeriodicTask:
    def __init__(self, interval):
        self.interval = interval
        self.last_run = 0

task = PeriodicTask(interval=5.0)  # Every 5 seconds

def periodic_tick(node):
    current = time.time()
    if current - task.last_run >= task.interval:
        node.send("periodic", "task_executed")
        task.last_run = current

node = horus.Node(pubs="periodic", tick=periodic_tick, rate=10)

Troubleshooting

Import Errors

# If you see: ModuleNotFoundError: No module named 'horus'
# Rebuild and install:
cd horus_py
maturin develop --release

Slow Performance

# Use release build (not debug)
maturin develop --release

# Check tick rate isn't too high
node = horus.Node(tick=fn, rate=30)  # 30Hz is reasonable

Memory Issues

# Avoid accumulating data in closures
# BAD:
all_data = []
def bad_tick(node):
    all_data.append(node.get("input"))  # Memory leak!

# GOOD:
def good_tick(node):
    data = node.get("input")
    process_and_discard(data)  # Process immediately

See Also


Remember: With HORUS Python, you focus on what your robot does, not how the framework works!