Python CV Node

A Python node that reads camera images, runs OpenCV processing (e.g., ArUco marker detection), and publishes detection results. Uses horus.Node with NumPy-backed images for zero-copy interop.

horus.toml

[package]
name = "python-cv"
version = "0.1.0"
description = "Python computer vision with OpenCV"
language = "python"

[dependencies]
opencv-python = { version = ">=4.8", source = "pypi" }
numpy = { version = ">=1.24", source = "pypi" }

Complete Code

import horus
import numpy as np

# ── Detection result ─────────────────────────────────────────

class DetectionResult:
    """Detected marker with ID and pose."""
    def __init__(self, marker_id=0, x=0.0, y=0.0, confidence=0.0):
        self.marker_id = marker_id
        self.x = x
        self.y = y
        self.confidence = confidence

# ── CV Node ──────────────────────────────────────────────────

def make_marker_detector():
    frame_count = [0]

    def tick(node):
        # IMPORTANT: always call recv() every tick to drain the buffer
        img = node.recv("camera.image")
        if img is None:
            return  # no frame yet

        frame_count[0] += 1

        # Convert horus Image to NumPy array (zero-copy when possible)
        frame = np.frombuffer(img.data, dtype=np.uint8).reshape(
            img.height, img.width, 3
        )

        # --- OpenCV processing ---
        # Convert to grayscale for detection
        gray = frame[:, :, 0]  # simplified — use cv2.cvtColor in production

        # Simulated detection (replace with cv2.aruco.detectMarkers)
        # In production:
        #   import cv2
        #   aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
        #   params = cv2.aruco.DetectorParameters()
        #   corners, ids, rejected = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=params)
        detection = DetectionResult(
            marker_id=42,
            x=float(frame.shape[1] / 2),
            y=float(frame.shape[0] / 2),
            confidence=0.95,
        )

        node.send("vision.detections", detection)

    def shutdown(node):
        print(f"MarkerDetector: processed {frame_count[0]} frames")

    return horus.Node(name="MarkerDetector", tick=tick, shutdown=shutdown,
                      subs=["camera.image"], pubs=["vision.detections"],
                      rate=30)

# ── Main ─────────────────────────────────────────────────────

if __name__ == "__main__":
    horus.run(make_marker_detector())

Expected Output

[HORUS] Scheduler running — tick_rate: 30 Hz
[HORUS] Node "MarkerDetector" started (30 Hz)
^C
[HORUS] Shutting down...
MarkerDetector: processed 150 frames
[HORUS] Node "MarkerDetector" shutdown complete

Key Points

  • horus.Node wraps the Rust scheduler — Python nodes get the same lifecycle (init/tick/shutdown)
  • self.get("topic") is the Python equivalent of topic.recv() — always call every tick
  • self.send("topic", data) publishes to any topic
  • horus.run(*nodes) is the one-liner to start the scheduler
  • NumPy zero-copy: horus.Image data can be reshaped into NumPy arrays without copying
  • 30Hz matches most USB cameras — no benefit running faster than the sensor
  • Pair with Rust nodes: Python CV node publishes detections, Rust control node subscribes at 100Hz+