Python Examples

All examples use the standard horus.Node callback API — one pattern, no inheritance.

import horus

def my_tick(node):
    node.send("output", data)

node = horus.Node(name="my_node", pubs=["output"], tick=my_tick, rate=30)
horus.run(node)

Basic Node

A minimal node that reads sensor data and publishes motor commands:

import horus

def controller(node):
    """Simple obstacle avoidance"""
    if node.has_msg("sensor.distance"):
        distance = node.recv("sensor.distance")
        if distance < 0.5:
            node.send("motor.cmd", {"linear": 0.0, "angular": 0.5})
        else:
            node.send("motor.cmd", {"linear": 1.0, "angular": 0.0})

node = horus.Node(
    name="obstacle_avoider",
    subs=["sensor.distance"],
    pubs=["motor.cmd"],
    tick=controller,
    rate=10
)

horus.run(node)

Typed Topics

Using typed messages for proper logging and cross-language compatibility:

import horus

def controller(node):
    if node.has_msg("localization.pose"):
        pose = node.recv("localization.pose")
        # Logs show: Pose2D { x: 2.31, y: 1.31, theta: 0.5 }
        cmd = horus.CmdVel(linear=1.0, angular=0.0)
        node.send("control.cmd", cmd)

node = horus.Node(
    name="controller",
    subs={"localization.pose": {"type": horus.Pose2D}},
    pubs={"control.cmd": {"type": horus.CmdVel}},
    tick=controller,
    rate=30
)

horus.run(node)

Available typed messages: CmdVel, Pose2D, Imu, Odometry, LaserScan, JointState, MotorCommand, and 50+ more.


Multi-Node System

Multiple nodes with execution order:

import horus

def sensor_tick(node):
    node.send("sensor.distance", 1.5)

def controller_tick(node):
    if node.has_msg("sensor.distance"):
        dist = node.recv("sensor.distance")
        if dist < 0.5:
            node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.5))
        else:
            node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))

def logger_tick(node):
    if node.has_msg("cmd_vel"):
        cmd = node.recv("cmd_vel")
        print(f"Command: linear={cmd.linear:.1f} angular={cmd.angular:.1f}")

sensor = horus.Node(name="sensor", pubs=["sensor.distance"], tick=sensor_tick, rate=30)
controller = horus.Node(
    name="controller",
    subs=["sensor.distance"],
    pubs={"cmd_vel": {"type": horus.CmdVel}},
    tick=controller_tick,
    rate=30
)
logger = horus.Node(name="logger", subs=["cmd_vel"], tick=logger_tick, rate=10)

# Quick run — no scheduler needed
horus.run(sensor, controller, logger, duration=10.0)

Scheduler with Execution Control

When you need execution order, failure policies, or RT features:

import horus

sensor = horus.Node(name="sensor", pubs=["scan"], tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", subs=["scan"], pubs=["cmd"], tick=ctrl_tick, rate=100, order=1)
logger = horus.Node(name="logger", subs=["cmd"], tick=log_tick, rate=10, order=10)

scheduler = horus.Scheduler(tick_rate=100, watchdog_ms=500)

# Critical — runs first
scheduler.add(sensor)

# RT control — runs after sensor
scheduler.add(controller)

# Non-critical — runs last
scheduler.add(logger)

scheduler.run()

Context Manager

sensor = horus.Node(name="sensor", tick=sensor_tick, rate=100, order=0)
controller = horus.Node(name="ctrl", tick=ctrl_tick, rate=100, order=1)

with horus.Scheduler(tick_rate=100) as sched:
    sched.add(sensor)
    sched.add(controller)
    sched.run(duration=30.0)
# auto-stop on exit

Advanced Node Configuration

For execution classes (compute, async I/O, event-driven), configure via Node kwargs:

scheduler = horus.Scheduler(tick_rate=500, rt=True)

# Event-triggered: ticks when "lidar_scan" topic receives data
scheduler.add(horus.Node(tick=detect_fn, on="lidar_scan", order=0))

# Compute pool: CPU-bound work on separate thread pool
scheduler.add(horus.Node(tick=plan_fn, compute=True, rate=10))

# Async I/O: non-blocking network/file operations
scheduler.add(horus.Node(tick=upload_fn, rate=1, failure_policy="ignore"))

scheduler.run()

Sensor Processing Pipeline

Processing laser scans for obstacle avoidance:

import horus

def obstacle_avoidance(node):
    if node.has_msg("scan"):
        scan = node.recv("scan")
        if scan and hasattr(scan, 'ranges') and scan.ranges:
            min_dist = min(r for r in scan.ranges if r > 0.01)
            if min_dist < 0.5:
                node.send("cmd_vel", horus.CmdVel(linear=0.0, angular=0.5))
            else:
                node.send("cmd_vel", horus.CmdVel(linear=1.0, angular=0.0))

node = horus.Node(
    name="obstacle_avoider",
    subs={"scan": {"type": horus.LaserScan}},
    pubs={"cmd_vel": {"type": horus.CmdVel}},
    tick=obstacle_avoidance,
    rate=10
)

horus.run(node)

Camera Image Pipeline

Send and receive images with zero-copy shared memory:

import horus
import numpy as np

# Create image backed by shared memory
img = horus.Image(480, 640, "rgb8")

# Fill from NumPy (zero-copy when possible)
pixels = np.zeros((480, 640, 3), dtype=np.uint8)
pixels[:, :, 2] = 255  # Blue
img = horus.Image.from_numpy(pixels)

# Send over topic
topic = horus.Topic("camera.rgb")
topic.send(img)

# Receive (in another node or process)
received = topic.recv()
if received is not None:
    arr = received.to_numpy()       # zero-copy NumPy
    tensor = received.to_torch()    # zero-copy PyTorch via DLPack
    print(f"Received {arr.shape[1]}x{arr.shape[0]} image")

Cross-Language Communication

Python and Rust nodes communicate via shared memory topics — same machine, automatic:

Python publisher:

import horus
import time

topic = horus.Topic(horus.CmdVel)

while True:
    topic.send(horus.CmdVel(linear=1.0, angular=0.2))
    time.sleep(0.1)

Rust subscriber (separate process):

use horus::prelude::*;

let topic: Topic<CmdVel> = Topic::new("cmd_vel")?;
loop {
    if let Some(cmd) = topic.recv() {
        println!("linear={}, angular={}", cmd.linear, cmd.angular);
    }
}

Time API

Framework clock for deterministic-compatible code:

import horus

def physics_tick(node):
    dt = horus.dt()                    # Fixed in deterministic mode
    elapsed = horus.elapsed()          # Time since start
    tick = horus.tick()                # Tick number
    rng = horus.rng_float()            # Deterministic random [0, 1)
    budget = horus.budget_remaining()  # Seconds left in budget

    # Physics integration
    node.velocity += node.acceleration * dt
    node.position += node.velocity * dt

node = horus.Node(name="physics", tick=physics_tick, rate=100)
horus.run(node)
FunctionReturnsDescription
horus.now()floatCurrent time (seconds)
horus.dt()floatTimestep (seconds)
horus.elapsed()floatTime since start (seconds)
horus.tick()intCurrent tick number
horus.budget_remaining()floatBudget left (seconds, inf if none)
horus.rng_float()floatRandom in [0, 1)
horus.timestamp_ns()intNanosecond timestamp

Async Node

Nodes with async def tick functions run on the async I/O thread pool — perfect for network requests and file I/O:

import horus

async def fetch_weather(node):
    """Fetch weather data from an API every tick"""
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("http://api.weather.local/current") as resp:
            data = await resp.json()
            node.send("weather", data)

weather = horus.Node(
    name="weather_fetcher",
    pubs=["weather"],
    tick=fetch_weather,  # async def auto-detected
    rate=1               # 1 Hz — fetch every second
)

horus.run(weather)

HORUS auto-detects async def and runs it on the async executor. No manual event loop setup needed.


ML Inference

ONNX model inference with performance monitoring:

import horus
import numpy as np

def detect_tick(node):
    if node.has_msg("camera.rgb"):
        img = node.recv("camera.rgb")
        arr = img.to_numpy()

        # Preprocess
        input_data = preprocess(arr)

        # Run inference
        results = model.run(None, {"input": input_data})[0]

        # Publish detections
        for det in parse_detections(results):
            node.send("detections", det)

node = horus.Node(
    name="detector",
    subs={"camera.rgb": {"type": horus.Image}},
    pubs=["detections"],
    tick=detect_tick,
    rate=30,
    compute=True,
    order=5
)

scheduler = horus.Scheduler(tick_rate=30)
scheduler.add(node)  # Runs on compute pool
scheduler.run()

See Also