ML Integration

Use ML frameworks directly in horus nodes — no wrapper library needed. Import PyTorch, ONNX Runtime, TensorFlow, or OpenCV and use them in your tick function.

Zero-Copy Interop Matrix

horus data types integrate with the Python ML ecosystem via three protocols: __array_interface__ (NumPy), __dlpack__ (universal), and __cuda_array_interface__ (GPU).

horus typeNumPyPyTorchJAXOpenCVONNX RT
Imageto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()via to_numpy()
PointCloudto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()
DepthImageto_numpy() / from_numpy()to_torch() / from_torch()to_jax()via to_numpy()via to_numpy()

All conversions are zero-copy (~3μs constant time, regardless of data size). The Python side gets a view into horus shared memory — no pixel data is copied.

img = node.recv("camera")

# Any of these — all zero-copy, all ~3μs:
np_arr = img.to_numpy()          # NumPy ndarray
tensor = img.to_torch()          # PyTorch tensor
jax_arr = img.to_jax()           # JAX array
dlpack = np.from_dlpack(img)     # DLPack protocol (979ns)

Performance: A 1920×1080 RGB image (6MB) takes 3μs to access as NumPy vs 178μs to copy — 59x faster. See Benchmarks for full numbers.

import horus
import onnxruntime as ort
import numpy as np

session = ort.InferenceSession("yolov8n.onnx", providers=["CUDAExecutionProvider"])

def detect(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_numpy()
        img = img.astype(np.float32) / 255.0
        img = np.transpose(img, (2, 0, 1))[np.newaxis]  # HWC→NCHW
        output = session.run(None, {"images": img})
        node.send("detections", output[0])

horus.run(
    horus.Node(tick=detect, rate=30, subs=["camera"], pubs=["detections"], order=0),
)

PyTorch

import horus
import torch

model = torch.jit.load("resnet50.pt", map_location="cuda:0")
model.eval()

def classify(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_torch()  # Zero-copy to PyTorch tensor
        with torch.no_grad():
            output = model(img.unsqueeze(0).cuda())
        class_id = output.argmax(dim=1).item()
        node.send("class", {"id": class_id, "confidence": output.max().item()})

horus.run(
    horus.Node(tick=classify, rate=10, subs=["camera"], pubs=["class"]),
)

OpenCV

import horus
import cv2
import numpy as np

def process_frame(node):
    if node.has_msg("camera"):
        img = node.recv("camera").to_numpy()
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        result = horus.Image.from_numpy(edges)
        node.send("edges", result)

horus.run(
    horus.Node(tick=process_frame, rate=30, subs=["camera"], pubs=["edges"]),
)

TensorFlow / TFLite

import horus
import tensorflow as tf

model = tf.saved_model.load("saved_model")

def infer(node):
    if node.has_msg("input"):
        data = node.recv("input")
        tensor = tf.convert_to_tensor(data, dtype=tf.float32)
        output = model(tensor)
        node.send("output", output.numpy())

horus.run(horus.Node(tick=infer, rate=10, subs=["input"], pubs=["output"]))

Performance Tips

  • Use compute=True for CPU-bound inference — runs on thread pool, releases GIL during C extension calls (NumPy, ONNX, PyTorch):

    horus.Node(tick=detect, rate=30, compute=True, ...)
    
  • Set realistic budget to detect slow inference:

    horus.Node(tick=detect, rate=30, budget=50 * horus.ms, on_miss="skip")
    
  • Use horus.Image.to_torch() for zero-copy GPU transfer — no pixel data copied.

  • Batch with recv_all() if messages queue up:

    def batch_infer(node):
        frames = node.recv_all("camera")
        if frames:
            batch = np.stack([f.to_numpy() for f in frames])
            outputs = session.run(None, {"images": batch})
            for det in outputs[0]:
                node.send("detections", det)
    

See Also