Python CV Node
A Python node that reads camera images, runs OpenCV processing (e.g., ArUco marker detection), and publishes detection results. Uses horus.Node with NumPy-backed images for zero-copy interop.
Problem
You need to run computer vision (OpenCV, ML inference) in Python while integrating with the HORUS node lifecycle and shared memory topics.
When To Use
- ArUco marker detection, object detection, or lane following
- Any task that benefits from Python's ML/CV ecosystem (OpenCV, PyTorch, TensorFlow)
- Prototyping vision pipelines before porting hot paths to Rust
Prerequisites
- HORUS installed (Installation Guide)
- Python 3.8+ with
opencv-pythonandnumpyinstalled - A camera driver publishing images to
camera.image
horus.toml
[package]
name = "python-cv"
version = "0.1.0"
description = "Python computer vision with OpenCV"
language = "python"
[dependencies]
opencv-python = { version = ">=4.8", source = "pypi" }
numpy = { version = ">=1.24", source = "pypi" }
Complete Code
import horus
import numpy as np
# ── Detection result ─────────────────────────────────────────
class DetectionResult:
"""Detected marker with ID and pose."""
def __init__(self, marker_id=0, x=0.0, y=0.0, confidence=0.0):
self.marker_id = marker_id
self.x = x
self.y = y
self.confidence = confidence
# ── CV Node ──────────────────────────────────────────────────
def make_marker_detector():
frame_count = [0]
def tick(node):
# IMPORTANT: always call recv() every tick to drain the buffer
img = node.recv("camera.image")
if img is None:
return # no frame yet
frame_count[0] += 1
# Convert horus Image to NumPy array (zero-copy via shared memory pool)
frame = img.to_numpy() # returns (height, width, channels) view — no copy
# --- OpenCV processing ---
# Convert to grayscale for detection
gray = frame[:, :, 0] # simplified — use cv2.cvtColor in production
# Simulated detection (replace with cv2.aruco.detectMarkers)
# In production:
# import cv2
# aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
# params = cv2.aruco.DetectorParameters()
# corners, ids, rejected = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=params)
detection = DetectionResult(
marker_id=42,
x=float(frame.shape[1] / 2),
y=float(frame.shape[0] / 2),
confidence=0.95,
)
node.send("vision.detections", detection)
def shutdown(node):
print(f"MarkerDetector: processed {frame_count[0]} frames")
return horus.Node(name="MarkerDetector", tick=tick, shutdown=shutdown,
subs=["camera.image"], pubs=["vision.detections"],
rate=30)
# ── Main ─────────────────────────────────────────────────────
if __name__ == "__main__":
horus.run(make_marker_detector())
Expected Output
[HORUS] Scheduler running — tick_rate: 30 Hz
[HORUS] Node "MarkerDetector" started (30 Hz)
^C
[HORUS] Shutting down...
MarkerDetector: processed 150 frames
[HORUS] Node "MarkerDetector" shutdown complete
Key Points
horus.Nodewraps the Rust scheduler -- Python nodes get the same lifecycle (init/tick/shutdown)node.recv("topic")is the Python API for receiving messages -- always call every ticknode.send("topic", data)publishes to any topichorus.run(*nodes)is the one-liner to start the scheduler- NumPy zero-copy:
horus.Imagedata can be reshaped into NumPy arrays without copying - 30Hz matches most USB cameras -- no benefit running faster than the sensor
- Pair with Rust nodes: Python CV node publishes detections, Rust control node subscribes at 100Hz+
Variations
- Object detection: Replace ArUco detection with a YOLO or SSD model for general object detection
- Stereo depth: Subscribe to two camera topics and compute a disparity map
- GPU acceleration: Use
cv2.cudaor PyTorch GPU tensors for real-time inference on large frames - Multi-camera: Create multiple
horus.Nodeinstances, each subscribed to a different camera topic
Common Errors
| Symptom | Cause | Fix |
|---|---|---|
ImportError: No module named 'cv2' | OpenCV not installed | Run pip install opencv-python |
| Frame is all zeros | Image format mismatch (RGB vs BGR, wrong resolution) | Check img.width, img.height, and channel count before reshape |
| Node runs slower than camera rate | Processing takes longer than frame interval | Reduce resolution, use ROI cropping, or offload to GPU |
node.recv() always returns None | Camera driver not running or wrong topic name | Verify topic name with horus monitor |
| Memory grows over time | NumPy arrays not freed between ticks | Avoid storing frame history; process and discard each frame |
See Also
- Python Bindings — Python API reference
- Image — Image message type
- Image, PointCloud, DepthImage — zero-copy NumPy/PyTorch/JAX