ML Integration
Use ML frameworks directly in horus nodes — no wrapper library needed. Import PyTorch, ONNX Runtime, TensorFlow, or OpenCV and use them in your tick function.
Zero-Copy Interop Matrix
horus data types integrate with the Python ML ecosystem via three protocols: __array_interface__ (NumPy), __dlpack__ (universal), and __cuda_array_interface__ (GPU).
| horus type | NumPy | PyTorch | JAX | OpenCV | ONNX RT |
|---|---|---|---|---|---|
| Image | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | via to_numpy() | via to_numpy() |
| PointCloud | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | — | via to_numpy() |
| DepthImage | to_numpy() / from_numpy() | to_torch() / from_torch() | to_jax() | via to_numpy() | via to_numpy() |
All conversions are zero-copy (~3μs constant time, regardless of data size). The Python side gets a view into horus shared memory — no pixel data is copied.
img = node.recv("camera")
# Any of these — all zero-copy, all ~3μs:
np_arr = img.to_numpy() # NumPy ndarray
tensor = img.to_torch() # PyTorch tensor
jax_arr = img.to_jax() # JAX array
dlpack = np.from_dlpack(img) # DLPack protocol (979ns)
Performance: A 1920×1080 RGB image (6MB) takes 3μs to access as NumPy vs 178μs to copy — 59x faster. See Benchmarks for full numbers.
ONNX Runtime (Recommended for Production)
import horus
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession("yolov8n.onnx", providers=["CUDAExecutionProvider"])
def detect(node):
if node.has_msg("camera"):
img = node.recv("camera").to_numpy()
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[np.newaxis] # HWC→NCHW
output = session.run(None, {"images": img})
node.send("detections", output[0])
horus.run(
horus.Node(tick=detect, rate=30, subs=["camera"], pubs=["detections"], order=0),
)
PyTorch
import horus
import torch
model = torch.jit.load("resnet50.pt", map_location="cuda:0")
model.eval()
def classify(node):
if node.has_msg("camera"):
img = node.recv("camera").to_torch() # Zero-copy to PyTorch tensor
with torch.no_grad():
output = model(img.unsqueeze(0).cuda())
class_id = output.argmax(dim=1).item()
node.send("class", {"id": class_id, "confidence": output.max().item()})
horus.run(
horus.Node(tick=classify, rate=10, subs=["camera"], pubs=["class"]),
)
OpenCV
import horus
import cv2
import numpy as np
def process_frame(node):
if node.has_msg("camera"):
img = node.recv("camera").to_numpy()
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, 50, 150)
result = horus.Image.from_numpy(edges)
node.send("edges", result)
horus.run(
horus.Node(tick=process_frame, rate=30, subs=["camera"], pubs=["edges"]),
)
TensorFlow / TFLite
import horus
import tensorflow as tf
model = tf.saved_model.load("saved_model")
def infer(node):
if node.has_msg("input"):
data = node.recv("input")
tensor = tf.convert_to_tensor(data, dtype=tf.float32)
output = model(tensor)
node.send("output", output.numpy())
horus.run(horus.Node(tick=infer, rate=10, subs=["input"], pubs=["output"]))
Performance Tips
-
Use
compute=Truefor CPU-bound inference — runs on thread pool, releases GIL during C extension calls (NumPy, ONNX, PyTorch):horus.Node(tick=detect, rate=30, compute=True, ...) -
Set realistic
budgetto detect slow inference:horus.Node(tick=detect, rate=30, budget=50 * horus.ms, on_miss="skip") -
Use
horus.Image.to_torch()for zero-copy GPU transfer — no pixel data copied. -
Batch with
recv_all()if messages queue up:def batch_infer(node): frames = node.recv_all("camera") if frames: batch = np.stack([f.to_numpy() for f in frames]) outputs = session.run(None, {"images": batch}) for det in outputs[0]: node.send("detections", det)
See Also
- Memory Types — Image, PointCloud zero-copy interop
- Async Nodes — Non-blocking inference with
async def - Perception Types — Detection, BoundingBox, Landmark types