Python CV Node
A Python node that reads camera images, runs OpenCV processing (e.g., ArUco marker detection), and publishes detection results. Uses horus.Node with NumPy-backed images for zero-copy interop.
horus.toml
[package]
name = "python-cv"
version = "0.1.0"
description = "Python computer vision with OpenCV"
language = "python"
[dependencies]
opencv-python = { version = ">=4.8", source = "pypi" }
numpy = { version = ">=1.24", source = "pypi" }
Complete Code
import horus
import numpy as np
# ── Detection result ─────────────────────────────────────────
class DetectionResult:
"""Detected marker with ID and pose."""
def __init__(self, marker_id=0, x=0.0, y=0.0, confidence=0.0):
self.marker_id = marker_id
self.x = x
self.y = y
self.confidence = confidence
# ── CV Node ──────────────────────────────────────────────────
def make_marker_detector():
frame_count = [0]
def tick(node):
# IMPORTANT: always call recv() every tick to drain the buffer
img = node.recv("camera.image")
if img is None:
return # no frame yet
frame_count[0] += 1
# Convert horus Image to NumPy array (zero-copy when possible)
frame = np.frombuffer(img.data, dtype=np.uint8).reshape(
img.height, img.width, 3
)
# --- OpenCV processing ---
# Convert to grayscale for detection
gray = frame[:, :, 0] # simplified — use cv2.cvtColor in production
# Simulated detection (replace with cv2.aruco.detectMarkers)
# In production:
# import cv2
# aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
# params = cv2.aruco.DetectorParameters()
# corners, ids, rejected = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=params)
detection = DetectionResult(
marker_id=42,
x=float(frame.shape[1] / 2),
y=float(frame.shape[0] / 2),
confidence=0.95,
)
node.send("vision.detections", detection)
def shutdown(node):
print(f"MarkerDetector: processed {frame_count[0]} frames")
return horus.Node(name="MarkerDetector", tick=tick, shutdown=shutdown,
subs=["camera.image"], pubs=["vision.detections"],
rate=30)
# ── Main ─────────────────────────────────────────────────────
if __name__ == "__main__":
horus.run(make_marker_detector())
Expected Output
[HORUS] Scheduler running — tick_rate: 30 Hz
[HORUS] Node "MarkerDetector" started (30 Hz)
^C
[HORUS] Shutting down...
MarkerDetector: processed 150 frames
[HORUS] Node "MarkerDetector" shutdown complete
Key Points
horus.Nodewraps the Rust scheduler — Python nodes get the same lifecycle (init/tick/shutdown)self.get("topic")is the Python equivalent oftopic.recv()— always call every tickself.send("topic", data)publishes to any topichorus.run(*nodes)is the one-liner to start the scheduler- NumPy zero-copy:
horus.Imagedata can be reshaped into NumPy arrays without copying - 30Hz matches most USB cameras — no benefit running faster than the sensor
- Pair with Rust nodes: Python CV node publishes detections, Rust control node subscribes at 100Hz+