AI/ML Developer's Guide
You're an ML engineer. You have a trained model. You need it running on a robot at 30Hz, receiving camera frames and publishing detections — with minimal latency and zero unnecessary copies. This page is your complete guide.
Quick Start: YOLO on a Robot
import horus
import torch
import numpy as np
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.eval()
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# Zero-copy: shared memory → numpy → torch (no pixel copying)
frame = img.to_numpy() # ~3μs (view into SHM)
tensor = torch.from_numpy(frame).permute(2, 0, 1).unsqueeze(0).float() / 255.0
with torch.no_grad():
results = model(tensor)
for *box, conf, cls in results.xyxy[0].cpu().numpy():
if conf > 0.5:
node.send("detections", {
"class": model.names[int(cls)],
"confidence": float(conf),
"bbox": [float(x) for x in box],
})
detector = horus.Node(
name="yolo",
subs=[horus.Image],
pubs=["detections"],
tick=detect_tick,
rate=30,
compute=True, # Run on thread pool (CPU-bound inference)
budget=30 * horus.ms, # 30ms budget per frame
on_miss="skip", # Drop frames if inference is slow
)
horus.run(detector, tick_rate=100)
Data Flow: Camera → Model → Action
Camera Node (Rust, 30Hz)
│
│ Image via SHM pool (zero-copy, ~50ns)
▼
ML Node (Python)
│
├── img.to_numpy() ~3μs (SHM → NumPy view, no copy)
├── torch.from_numpy() ~1μs (NumPy → PyTorch, shared memory)
├── tensor.cuda() ~50μs (CPU → GPU copy, unavoidable)
├── model(tensor) ~10ms (GPU inference)
├── results.cpu() ~20μs (GPU → CPU copy)
└── node.send("det", data) ~6μs (GenericMessage via SHM)
│
▼
Planner Node (Rust, 100Hz)
│
│ reads detections, plans path
▼
Motor Controller (Rust, 1kHz)
Key insight: The only unavoidable copies are CPU↔GPU transfers. Everything else — camera to ML node, ML node to planner — is zero-copy via shared memory.
Framework Integration
NumPy (always available)
Every HORUS domain type converts to/from NumPy with zero copy:
import numpy as np
# Image → NumPy (zero-copy view into shared memory)
frame = img.to_numpy() # shape: (H, W, C), dtype: uint8
frame = np.from_dlpack(img) # DLPack protocol — even faster (~1μs)
# NumPy → Image (copies data into SHM pool)
img = horus.Image.from_numpy(my_array)
# PointCloud → NumPy
points = cloud.to_numpy() # shape: (N, fields_per_point), dtype: float32
# DepthImage → NumPy
depth = depth_img.to_numpy() # shape: (H, W), dtype: float32
# Tensor (arbitrary shape)
data = tensor.numpy() # zero-copy view
tensor = horus.Tensor.from_numpy(my_array)
PyTorch
import torch
# Image → PyTorch (via NumPy bridge)
frame = img.to_numpy()
tensor = torch.from_numpy(frame).permute(2, 0, 1).float() / 255.0 # HWC → CHW
# Or via DLPack (true zero-copy, no intermediate NumPy)
tensor = torch.from_dlpack(img)
# PyTorch → Image
result_np = output_tensor.cpu().numpy()
result_img = horus.Image.from_numpy(result_np)
# Tensor → PyTorch
t = horus.Tensor.from_numpy(np.zeros((3, 3), dtype=np.float32))
pt = t.torch() # zero-copy PyTorch tensor
# PyTorch → Tensor
t = horus.Tensor.from_numpy(pt.cpu().numpy())
ONNX Runtime (recommended for production)
Fastest inference on both CPU and GPU. No Python framework overhead:
import onnxruntime as ort
import numpy as np
# Load model once in init
session = ort.InferenceSession("model.onnx", providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
input_name = session.get_inputs()[0].name
def inference_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# ONNX Runtime accepts NumPy directly
frame = img.to_numpy()
input_data = frame.astype(np.float32).transpose(2, 0, 1)[np.newaxis] / 255.0
outputs = session.run(None, {input_name: input_data})
# Process outputs...
node = horus.Node(
name="onnx_detector",
subs=[horus.Image],
pubs=["detections"],
tick=inference_tick,
rate=30,
compute=True,
)
HuggingFace Transformers
from transformers import pipeline
import horus
# Load once — downloads model on first run
classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0)
def classify_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
frame = img.to_numpy()
# HF pipeline accepts numpy arrays via PIL
from PIL import Image as PILImage
pil_img = PILImage.fromarray(frame)
results = classifier(pil_img, top_k=3)
node.send("classification", {
"labels": [r["label"] for r in results],
"scores": [r["score"] for r in results],
})
node = horus.Node(
name="hf_classifier",
subs=[horus.Image],
pubs=["classification"],
tick=classify_tick,
rate=10,
compute=True,
budget=100 * horus.ms,
)
horus.run(node)
OpenCV
import cv2
import numpy as np
def vision_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# to_numpy() returns RGB; OpenCV uses BGR
frame_rgb = img.to_numpy()
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
# ArUco marker detection
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
detector = cv2.aruco.ArucoDetector(cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50))
corners, ids, _ = detector.detectMarkers(gray)
if ids is not None:
for marker_id, corner in zip(ids.flatten(), corners):
center = corner[0].mean(axis=0)
node.send("markers", {
"id": int(marker_id),
"x": float(center[0]),
"y": float(center[1]),
})
JAX
import jax
import jax.numpy as jnp
def jax_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
# DLPack: HORUS → JAX (zero-copy on same device)
frame = jnp.from_dlpack(img)
# Or via NumPy
frame = jnp.array(img.to_numpy())
# JAX computation
processed = jax.jit(my_model)(frame)
Tensor for Matrix Math
horus.Tensor supports arbitrary shapes — use it for rotation matrices, Jacobians, homogeneous transforms, and any matrix computation:
import horus
import numpy as np
# 3x3 rotation matrix
R = horus.Tensor.from_numpy(np.eye(3, dtype=np.float32))
# 4x4 homogeneous transform
T = horus.Tensor.from_numpy(np.array([
[1, 0, 0, 0.5],
[0, 1, 0, 0.0],
[0, 0, 1, 0.3],
[0, 0, 0, 1.0],
], dtype=np.float32))
# 6x6 Jacobian
J = horus.Tensor.from_numpy(np.zeros((6, 6), dtype=np.float64))
# Share matrices between nodes via topics
topic = horus.Topic("jacobian")
topic.send(J)
# Receive and use
received = topic.recv()
J_np = received.numpy() # zero-copy view
Supported Dtypes
| Dtype | NumPy | Bytes | Use case |
|---|---|---|---|
float32 | np.float32 | 4 | Images, point clouds, most robotics |
float64 | np.float64 | 8 | Precision: covariances, Jacobians |
uint8 | np.uint8 | 1 | Raw images, masks |
uint16 | np.uint16 | 2 | Depth images (millimeters) |
int32 | np.int32 | 4 | Indices, labels |
int64 | np.int64 | 8 | Timestamps, counters |
bool | np.bool_ | 1 | Masks, occupancy |
Shape Operations
t = horus.Tensor.from_numpy(np.zeros((480, 640, 3), dtype=np.float32))
t.shape # (480, 640, 3)
t.numel # 921600
t.nbytes # 3686400
t.dtype # 'float32'
# Reshape (zero-copy — same underlying data)
flat = t.reshape((921600,))
batched = t.view((1, 480, 640, 3))
# Slice (zero-copy view)
roi = t[100:200, 200:400, :]
# Arithmetic
scaled = t * 0.5
normalized = (t - t.mean()) / t.std()
Batch Inference (RL Vectorized Environments)
Process multiple observations in one forward pass:
import horus
import torch
import numpy as np
model = torch.jit.load("policy.pt").cuda().eval()
# Collect observations from N environments
env_topics = [horus.Topic(f"env.{i}.obs") for i in range(16)]
def batch_tick(node):
observations = []
for topic in env_topics:
obs = topic.recv()
if obs is not None:
observations.append(obs.to_numpy())
if len(observations) < 16:
return # wait for all envs
# Stack into batch: (N, obs_dim)
batch = np.stack(observations)
batch_tensor = torch.from_numpy(batch).cuda()
with torch.no_grad():
actions = model(batch_tensor).cpu().numpy()
# Send actions back to each environment
for i, action in enumerate(actions):
action_topic = horus.Topic(f"env.{i}.action")
action_topic.send(horus.Tensor.from_numpy(action))
node = horus.Node(
name="rl_policy",
tick=batch_tick,
rate=100,
compute=True,
budget=10 * horus.ms,
)
horus.run(node)
GPU Memory Management
Critical for embedded devices (Jetson: 4-8GB shared CPU/GPU RAM):
import torch
# Limit GPU memory (do this BEFORE loading any model)
torch.cuda.set_per_process_memory_fraction(0.5) # use max 50% VRAM
# Always use no_grad for inference (saves ~30% memory)
with torch.no_grad():
output = model(input_tensor)
# Periodically clear cache (every N ticks)
if horus.tick() % 100 == 0:
torch.cuda.empty_cache()
# Monitor usage
print(f"GPU memory: {torch.cuda.memory_allocated() / 1e6:.1f}MB / {torch.cuda.max_memory_allocated() / 1e6:.1f}MB peak")
Model Warmup
First inference is 10-100x slower due to CUDA kernel compilation:
def my_init(node):
global model
model = torch.jit.load("model.pt").cuda().eval()
# Warmup: run dummy inference in init (before tick loop starts)
dummy = torch.zeros(1, 3, 640, 640).cuda()
with torch.no_grad():
model(dummy)
node.log_info("Model warmed up")
Choosing the Right Precision
| Precision | Memory | Speed | When to use |
|---|---|---|---|
| FP32 | 100% | 1x | Training, debugging |
| FP16 | 50% | 2x | Most inference on GPU |
| INT8 | 25% | 4x | Edge deployment (TensorRT) |
| TF32 | 100% | 1.5x | Ampere+ GPUs, automatic |
# FP16 inference (halves memory, doubles speed on GPU)
model = model.half()
input_tensor = input_tensor.half()
# Or use torch.autocast
with torch.cuda.amp.autocast():
output = model(input_tensor)
Production Deployment on Jetson
import horus
import numpy as np
# Use ONNX Runtime with TensorRT for maximum speed on Jetson
import onnxruntime as ort
def init_model(node):
global session
providers = [
('TensorrtExecutionProvider', {'trt_max_workspace_size': '2147483648'}),
'CUDAExecutionProvider',
'CPUExecutionProvider',
]
session = ort.InferenceSession("model.onnx", providers=providers)
node.log_info(f"Using provider: {session.get_providers()[0]}")
# Warmup
dummy = np.zeros((1, 3, 640, 640), dtype=np.float32)
session.run(None, {session.get_inputs()[0].name: dummy})
node.log_info("TensorRT engine built and warmed up")
def detect_tick(node):
img = node.recv("camera.rgb")
if img is None:
return
frame = img.to_numpy().astype(np.float32).transpose(2, 0, 1)[np.newaxis] / 255.0
outputs = session.run(None, {session.get_inputs()[0].name: frame})
# ... process outputs
node = horus.Node(
name="jetson_detector",
subs=[horus.Image],
pubs=["detections"],
tick=detect_tick,
init=init_model,
rate=30,
compute=True,
budget=30 * horus.ms,
on_miss="skip",
)
horus.run(node, tick_rate=100)
Performance Tips
| Tip | Why | Impact |
|---|---|---|
Use img.to_numpy() not np.frombuffer(img.data, ...) | to_numpy() is zero-copy from SHM pool | 4-60x faster for large images |
Use np.from_dlpack(img) for NumPy | DLPack is even faster than to_numpy() | 1.1μs vs 3μs |
Use compute=True for CPU inference | Runs on thread pool, doesn't block scheduler | Prevents deadline misses |
Use on_miss="skip" for ML nodes | ML inference is variable-latency | Drops frames gracefully |
Warmup model in init() | First inference compiles CUDA kernels | Avoids 100x latency spike on first tick |
| Use ONNX Runtime for production | Fastest cross-platform inference | 2-5x faster than raw PyTorch |
| Use FP16/INT8 on edge devices | Halves memory, doubles throughput | Critical for Jetson |
| Use typed topics for cross-language | GenericMessage (dicts) don't cross to Rust | ~1.7μs vs ~6μs |
| Pre-allocate output tensors | Avoid allocation in tick hot path | Reduces GC pauses |
| Batch observations for RL | One forward pass for N envs | N× throughput |
See Also
- Tensor API — Full Tensor class reference
- Image API — Camera frame handling with DLPack
- PointCloud API — 3D point cloud with ML interop
- DepthImage API — Depth maps with NumPy/PyTorch
- ML Utilities — ONNX, PyTorch, OpenCV, TensorFlow integration
- Benchmarks — Python binding latency numbers
- Python API — Complete Python reference