AI/ML Developer's Guide

You're an ML engineer. You have a trained model. You need it running on a robot at 30Hz, receiving camera frames and publishing detections — with minimal latency and zero unnecessary copies. This page is your complete guide.


Quick Start: YOLO on a Robot

import horus
import torch
import numpy as np

model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()

def detect_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # Zero-copy: shared memory → numpy → torch (no pixel copying)
    frame = img.to_numpy()                          # ~3μs (view into SHM)
    tensor = torch.from_numpy(frame).permute(2, 0, 1).unsqueeze(0).float() / 255.0

    with torch.no_grad():
        results = model(tensor)

    for *box, conf, cls in results.xyxy[0].cpu().numpy():
        if conf > 0.5:
            node.send("detections", {
                "class": model.names[int(cls)],
                "confidence": float(conf),
                "bbox": [float(x) for x in box],
            })

detector = horus.Node(
    name="yolo",
    subs=[horus.Image],
    pubs=["detections"],
    tick=detect_tick,
    rate=30,
    compute=True,           # Run on thread pool (CPU-bound inference)
    budget=30 * horus.ms,   # 30ms budget per frame
    on_miss="skip",          # Drop frames if inference is slow
)

horus.run(detector, tick_rate=100)

Data Flow: Camera → Model → Action

Camera Node (Rust, 30Hz)
    │
    │  Image via SHM pool (zero-copy, ~50ns)
    ▼
ML Node (Python)
    │
    ├── img.to_numpy()          ~3μs   (SHM → NumPy view, no copy)
    ├── torch.from_numpy()      ~1μs   (NumPy → PyTorch, shared memory)
    ├── tensor.cuda()           ~50μs  (CPU → GPU copy, unavoidable)
    ├── model(tensor)           ~10ms  (GPU inference)
    ├── results.cpu()           ~20μs  (GPU → CPU copy)
    └── node.send("det", data)  ~6μs   (GenericMessage via SHM)
    │
    ▼
Planner Node (Rust, 100Hz)
    │
    │  reads detections, plans path
    ▼
Motor Controller (Rust, 1kHz)

Key insight: The only unavoidable copies are CPU↔GPU transfers. Everything else — camera to ML node, ML node to planner — is zero-copy via shared memory.


Framework Integration

NumPy (always available)

Every HORUS domain type converts to/from NumPy with zero copy:

import numpy as np

# Image → NumPy (zero-copy view into shared memory)
frame = img.to_numpy()           # shape: (H, W, C), dtype: uint8
frame = np.from_dlpack(img)      # DLPack protocol — even faster (~1μs)

# NumPy → Image (copies data into SHM pool)
img = horus.Image.from_numpy(my_array)

# PointCloud → NumPy
points = cloud.to_numpy()        # shape: (N, fields_per_point), dtype: float32

# DepthImage → NumPy
depth = depth_img.to_numpy()     # shape: (H, W), dtype: float32

# Tensor (arbitrary shape)
data = tensor.numpy()            # zero-copy view
tensor = horus.Tensor.from_numpy(my_array)

PyTorch

import torch

# Image → PyTorch (via NumPy bridge)
frame = img.to_numpy()
tensor = torch.from_numpy(frame).permute(2, 0, 1).float() / 255.0  # HWC → CHW

# Or via DLPack (true zero-copy, no intermediate NumPy)
tensor = torch.from_dlpack(img)

# PyTorch → Image
result_np = output_tensor.cpu().numpy()
result_img = horus.Image.from_numpy(result_np)

# Tensor → PyTorch
t = horus.Tensor.from_numpy(np.zeros((3, 3), dtype=np.float32))
pt = t.torch()                   # zero-copy PyTorch tensor

# PyTorch → Tensor
t = horus.Tensor.from_numpy(pt.cpu().numpy())

Fastest inference on both CPU and GPU. No Python framework overhead:

import onnxruntime as ort
import numpy as np

# Load model once in init
session = ort.InferenceSession("model.onnx", providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
input_name = session.get_inputs()[0].name

def inference_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # ONNX Runtime accepts NumPy directly
    frame = img.to_numpy()
    input_data = frame.astype(np.float32).transpose(2, 0, 1)[np.newaxis] / 255.0

    outputs = session.run(None, {input_name: input_data})
    # Process outputs...

node = horus.Node(
    name="onnx_detector",
    subs=[horus.Image],
    pubs=["detections"],
    tick=inference_tick,
    rate=30,
    compute=True,
)

HuggingFace Transformers

from transformers import pipeline
import horus

# Load once — downloads model on first run
classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0)

def classify_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    frame = img.to_numpy()
    # HF pipeline accepts numpy arrays via PIL
    from PIL import Image as PILImage
    pil_img = PILImage.fromarray(frame)

    results = classifier(pil_img, top_k=3)
    node.send("classification", {
        "labels": [r["label"] for r in results],
        "scores": [r["score"] for r in results],
    })

node = horus.Node(
    name="hf_classifier",
    subs=[horus.Image],
    pubs=["classification"],
    tick=classify_tick,
    rate=10,
    compute=True,
    budget=100 * horus.ms,
)
horus.run(node)

OpenCV

import cv2
import numpy as np

def vision_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # to_numpy() returns RGB; OpenCV uses BGR
    frame_rgb = img.to_numpy()
    frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

    # ArUco marker detection
    gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
    detector = cv2.aruco.ArucoDetector(cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50))
    corners, ids, _ = detector.detectMarkers(gray)

    if ids is not None:
        for marker_id, corner in zip(ids.flatten(), corners):
            center = corner[0].mean(axis=0)
            node.send("markers", {
                "id": int(marker_id),
                "x": float(center[0]),
                "y": float(center[1]),
            })

JAX

import jax
import jax.numpy as jnp

def jax_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    # DLPack: HORUS → JAX (zero-copy on same device)
    frame = jnp.from_dlpack(img)

    # Or via NumPy
    frame = jnp.array(img.to_numpy())

    # JAX computation
    processed = jax.jit(my_model)(frame)

Tensor for Matrix Math

horus.Tensor supports arbitrary shapes — use it for rotation matrices, Jacobians, homogeneous transforms, and any matrix computation:

import horus
import numpy as np

# 3x3 rotation matrix
R = horus.Tensor.from_numpy(np.eye(3, dtype=np.float32))

# 4x4 homogeneous transform
T = horus.Tensor.from_numpy(np.array([
    [1, 0, 0, 0.5],
    [0, 1, 0, 0.0],
    [0, 0, 1, 0.3],
    [0, 0, 0, 1.0],
], dtype=np.float32))

# 6x6 Jacobian
J = horus.Tensor.from_numpy(np.zeros((6, 6), dtype=np.float64))

# Share matrices between nodes via topics
topic = horus.Topic("jacobian")
topic.send(J)

# Receive and use
received = topic.recv()
J_np = received.numpy()  # zero-copy view

Supported Dtypes

DtypeNumPyBytesUse case
float32np.float324Images, point clouds, most robotics
float64np.float648Precision: covariances, Jacobians
uint8np.uint81Raw images, masks
uint16np.uint162Depth images (millimeters)
int32np.int324Indices, labels
int64np.int648Timestamps, counters
boolnp.bool_1Masks, occupancy

Shape Operations

t = horus.Tensor.from_numpy(np.zeros((480, 640, 3), dtype=np.float32))

t.shape          # (480, 640, 3)
t.numel          # 921600
t.nbytes         # 3686400
t.dtype          # 'float32'

# Reshape (zero-copy — same underlying data)
flat = t.reshape((921600,))
batched = t.view((1, 480, 640, 3))

# Slice (zero-copy view)
roi = t[100:200, 200:400, :]

# Arithmetic
scaled = t * 0.5
normalized = (t - t.mean()) / t.std()

Batch Inference (RL Vectorized Environments)

Process multiple observations in one forward pass:

import horus
import torch
import numpy as np

model = torch.jit.load("policy.pt").cuda().eval()

# Collect observations from N environments
env_topics = [horus.Topic(f"env.{i}.obs") for i in range(16)]

def batch_tick(node):
    observations = []
    for topic in env_topics:
        obs = topic.recv()
        if obs is not None:
            observations.append(obs.to_numpy())

    if len(observations) < 16:
        return  # wait for all envs

    # Stack into batch: (N, obs_dim)
    batch = np.stack(observations)
    batch_tensor = torch.from_numpy(batch).cuda()

    with torch.no_grad():
        actions = model(batch_tensor).cpu().numpy()

    # Send actions back to each environment
    for i, action in enumerate(actions):
        action_topic = horus.Topic(f"env.{i}.action")
        action_topic.send(horus.Tensor.from_numpy(action))

node = horus.Node(
    name="rl_policy",
    tick=batch_tick,
    rate=100,
    compute=True,
    budget=10 * horus.ms,
)
horus.run(node)

GPU Memory Management

Critical for embedded devices (Jetson: 4-8GB shared CPU/GPU RAM):

import torch

# Limit GPU memory (do this BEFORE loading any model)
torch.cuda.set_per_process_memory_fraction(0.5)  # use max 50% VRAM

# Always use no_grad for inference (saves ~30% memory)
with torch.no_grad():
    output = model(input_tensor)

# Periodically clear cache (every N ticks)
if horus.tick() % 100 == 0:
    torch.cuda.empty_cache()

# Monitor usage
print(f"GPU memory: {torch.cuda.memory_allocated() / 1e6:.1f}MB / {torch.cuda.max_memory_allocated() / 1e6:.1f}MB peak")

Model Warmup

First inference is 10-100x slower due to CUDA kernel compilation:

def my_init(node):
    global model
    model = torch.jit.load("model.pt").cuda().eval()

    # Warmup: run dummy inference in init (before tick loop starts)
    dummy = torch.zeros(1, 3, 640, 640).cuda()
    with torch.no_grad():
        model(dummy)
    node.log_info("Model warmed up")

Choosing the Right Precision

PrecisionMemorySpeedWhen to use
FP32100%1xTraining, debugging
FP1650%2xMost inference on GPU
INT825%4xEdge deployment (TensorRT)
TF32100%1.5xAmpere+ GPUs, automatic
# FP16 inference (halves memory, doubles speed on GPU)
model = model.half()
input_tensor = input_tensor.half()

# Or use torch.autocast
with torch.cuda.amp.autocast():
    output = model(input_tensor)

Production Deployment on Jetson

import horus
import numpy as np

# Use ONNX Runtime with TensorRT for maximum speed on Jetson
import onnxruntime as ort

def init_model(node):
    global session
    providers = [
        ('TensorrtExecutionProvider', {'trt_max_workspace_size': '2147483648'}),
        'CUDAExecutionProvider',
        'CPUExecutionProvider',
    ]
    session = ort.InferenceSession("model.onnx", providers=providers)
    node.log_info(f"Using provider: {session.get_providers()[0]}")

    # Warmup
    dummy = np.zeros((1, 3, 640, 640), dtype=np.float32)
    session.run(None, {session.get_inputs()[0].name: dummy})
    node.log_info("TensorRT engine built and warmed up")

def detect_tick(node):
    img = node.recv("camera.rgb")
    if img is None:
        return

    frame = img.to_numpy().astype(np.float32).transpose(2, 0, 1)[np.newaxis] / 255.0
    outputs = session.run(None, {session.get_inputs()[0].name: frame})
    # ... process outputs

node = horus.Node(
    name="jetson_detector",
    subs=[horus.Image],
    pubs=["detections"],
    tick=detect_tick,
    init=init_model,
    rate=30,
    compute=True,
    budget=30 * horus.ms,
    on_miss="skip",
)
horus.run(node, tick_rate=100)

Performance Tips

TipWhyImpact
Use img.to_numpy() not np.frombuffer(img.data, ...)to_numpy() is zero-copy from SHM pool4-60x faster for large images
Use np.from_dlpack(img) for NumPyDLPack is even faster than to_numpy()1.1μs vs 3μs
Use compute=True for CPU inferenceRuns on thread pool, doesn't block schedulerPrevents deadline misses
Use on_miss="skip" for ML nodesML inference is variable-latencyDrops frames gracefully
Warmup model in init()First inference compiles CUDA kernelsAvoids 100x latency spike on first tick
Use ONNX Runtime for productionFastest cross-platform inference2-5x faster than raw PyTorch
Use FP16/INT8 on edge devicesHalves memory, doubles throughputCritical for Jetson
Use typed topics for cross-languageGenericMessage (dicts) don't cross to Rust~1.7μs vs ~6μs
Pre-allocate output tensorsAvoid allocation in tick hot pathReduces GC pauses
Batch observations for RLOne forward pass for N envsN× throughput

See Also