Python CV Node

A Python node that reads camera images, runs OpenCV processing (e.g., ArUco marker detection), and publishes detection results. Uses horus.Node with NumPy-backed images for zero-copy interop.

Problem

You need to run computer vision (OpenCV, ML inference) in Python while integrating with the HORUS node lifecycle and shared memory topics.

When To Use

  • ArUco marker detection, object detection, or lane following
  • Any task that benefits from Python's ML/CV ecosystem (OpenCV, PyTorch, TensorFlow)
  • Prototyping vision pipelines before porting hot paths to Rust

Prerequisites

  • HORUS installed (Installation Guide)
  • Python 3.8+ with opencv-python and numpy installed
  • A camera driver publishing images to camera.image

horus.toml

[package]
name = "python-cv"
version = "0.1.0"
description = "Python computer vision with OpenCV"
language = "python"

[dependencies]
opencv-python = { version = ">=4.8", source = "pypi" }
numpy = { version = ">=1.24", source = "pypi" }

Complete Code

import horus
import numpy as np

# ── Detection result ─────────────────────────────────────────

class DetectionResult:
    """Detected marker with ID and pose."""
    def __init__(self, marker_id=0, x=0.0, y=0.0, confidence=0.0):
        self.marker_id = marker_id
        self.x = x
        self.y = y
        self.confidence = confidence

# ── CV Node ──────────────────────────────────────────────────

def make_marker_detector():
    frame_count = [0]

    def tick(node):
        # IMPORTANT: always call recv() every tick to drain the buffer
        img = node.recv("camera.image")
        if img is None:
            return  # no frame yet

        frame_count[0] += 1

        # Convert horus Image to NumPy array (zero-copy via shared memory pool)
        frame = img.to_numpy()  # returns (height, width, channels) view — no copy

        # --- OpenCV processing ---
        # Convert to grayscale for detection
        gray = frame[:, :, 0]  # simplified — use cv2.cvtColor in production

        # Simulated detection (replace with cv2.aruco.detectMarkers)
        # In production:
        #   import cv2
        #   aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
        #   params = cv2.aruco.DetectorParameters()
        #   corners, ids, rejected = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=params)
        detection = DetectionResult(
            marker_id=42,
            x=float(frame.shape[1] / 2),
            y=float(frame.shape[0] / 2),
            confidence=0.95,
        )

        node.send("vision.detections", detection)

    def shutdown(node):
        print(f"MarkerDetector: processed {frame_count[0]} frames")

    return horus.Node(name="MarkerDetector", tick=tick, shutdown=shutdown,
                      subs=["camera.image"], pubs=["vision.detections"],
                      rate=30)

# ── Main ─────────────────────────────────────────────────────

if __name__ == "__main__":
    horus.run(make_marker_detector())

Expected Output

[HORUS] Scheduler running — tick_rate: 30 Hz
[HORUS] Node "MarkerDetector" started (30 Hz)
^C
[HORUS] Shutting down...
MarkerDetector: processed 150 frames
[HORUS] Node "MarkerDetector" shutdown complete

Key Points

  • horus.Node wraps the Rust scheduler -- Python nodes get the same lifecycle (init/tick/shutdown)
  • node.recv("topic") is the Python API for receiving messages -- always call every tick
  • node.send("topic", data) publishes to any topic
  • horus.run(*nodes) is the one-liner to start the scheduler
  • NumPy zero-copy: horus.Image data can be reshaped into NumPy arrays without copying
  • 30Hz matches most USB cameras -- no benefit running faster than the sensor
  • Pair with Rust nodes: Python CV node publishes detections, Rust control node subscribes at 100Hz+

Variations

  • Object detection: Replace ArUco detection with a YOLO or SSD model for general object detection
  • Stereo depth: Subscribe to two camera topics and compute a disparity map
  • GPU acceleration: Use cv2.cuda or PyTorch GPU tensors for real-time inference on large frames
  • Multi-camera: Create multiple horus.Node instances, each subscribed to a different camera topic

Common Errors

SymptomCauseFix
ImportError: No module named 'cv2'OpenCV not installedRun pip install opencv-python
Frame is all zerosImage format mismatch (RGB vs BGR, wrong resolution)Check img.width, img.height, and channel count before reshape
Node runs slower than camera rateProcessing takes longer than frame intervalReduce resolution, use ROI cropping, or offload to GPU
node.recv() always returns NoneCamera driver not running or wrong topic nameVerify topic name with horus monitor
Memory grows over timeNumPy arrays not freed between ticksAvoid storing frame history; process and discard each frame

See Also