Python Memory Types
Pool-backed types for images, point clouds, and tensors. Zero-copy interop with NumPy, PyTorch, and JAX.
Quick example — camera processing pipeline with numpy:
import horus
from horus import Image, Topic
import numpy as np
sub_rgb = Topic(Image, "camera.rgb")
pub_edges = Topic(Image, "camera.edges")
def edge_tick(node):
img = sub_rgb.try_recv()
if img is not None:
# Zero-copy to numpy — no pixel data copied
pixels = img.to_numpy()
# Process with numpy (Sobel edge detection)
gray = np.mean(pixels, axis=2).astype(np.uint8)
edges = np.abs(np.diff(gray, axis=1))
# Zero-copy back to HORUS Image
result = Image.from_numpy(edges)
pub_edges.send(result)
detector = horus.Node(name="edge_detector", tick=edge_tick, rate=30,
subs=["camera.rgb"], pubs=["camera.edges"])
Image
Pool-backed camera image with zero-copy framework conversions.
Creating Images
from horus import Image
# Create empty RGB image (height, width, encoding)
img = Image(height=480, width=640, encoding="rgb8")
# From NumPy array (zero-copy when possible)
import numpy as np
pixels = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(pixels, encoding="rgb8")
# From PyTorch tensor
import torch
tensor = torch.zeros(480, 640, 3, dtype=torch.uint8)
img = Image.from_torch(tensor, encoding="rgb8")
# From raw bytes
img = Image.from_bytes(raw_data, height=480, width=640, encoding="rgb8")
Supported Encodings
| Encoding | Channels | Bytes/Pixel | Description |
|---|---|---|---|
"mono8" | 1 | 1 | 8-bit grayscale |
"mono16" | 1 | 2 | 16-bit grayscale |
"rgb8" | 3 | 3 | 8-bit RGB |
"bgr8" | 3 | 3 | 8-bit BGR (OpenCV) |
"rgba8" | 4 | 4 | 8-bit RGBA |
"bgra8" | 4 | 4 | 8-bit BGRA |
"yuv422" | 2 | 2 | YUV 4:2:2 |
"mono32f" | 1 | 4 | 32-bit float mono |
"rgb32f" | 3 | 12 | 32-bit float RGB |
"bayer_rggb8" | 1 | 1 | Bayer raw |
"depth16" | 1 | 2 | 16-bit depth (mm) |
Properties
img.height # Image height in pixels
img.width # Image width in pixels
img.channels # Number of channels (e.g., 3 for RGB)
img.encoding # Encoding string (e.g., "rgb8")
img.dtype # Data type string
img.nbytes # Total data size in bytes
img.step # Row stride in bytes
img.frame_id # Sensor frame identifier
img.timestamp_ns # Timestamp in nanoseconds
Framework Conversions (Zero-Copy)
# To NumPy — zero-copy, shared memory
np_array = img.to_numpy() # Shape: (H, W, C) for color, (H, W) for mono
# To PyTorch — zero-copy via DLPack
torch_tensor = img.to_torch()
# To JAX — zero-copy via DLPack
jax_array = img.to_jax()
Pixel Access
# Read pixel at (x, y)
pixel = img.pixel(320, 240) # Returns list, e.g., [128, 64, 255]
# Write pixel
img.set_pixel(320, 240, [255, 0, 0]) # Red pixel
# Fill entire image with a color
img.fill([0, 0, 0]) # Black
# Copy data from bytes
img.copy_from(raw_bytes)
# Extract region of interest (raw bytes)
roi_data = img.roi(x=100, y=100, w=200, h=200)
Metadata
img.set_frame_id("camera_front")
img.set_timestamp_ns(1234567890)
# Device info
img.is_cpu() # True (always CPU-backed currently)
DLPack Protocol
Image implements the DLPack protocol for framework-agnostic zero-copy:
# NumPy array protocol
np_array = np.asarray(img) # Uses __array_interface__
# DLPack (PyTorch, JAX, CuPy, etc.)
capsule = img.__dlpack__()
device = img.__dlpack_device__()
PointCloud
Pool-backed 3D point cloud with zero-copy ML framework interop.
Creating Point Clouds
from horus import PointCloud
# Create XYZ point cloud (num_points, fields_per_point, dtype)
cloud = PointCloud(num_points=10000, fields=3, dtype="float32")
# From NumPy array — shape (N, F) where F = fields per point
import numpy as np
points = np.random.randn(10000, 3).astype(np.float32)
cloud = PointCloud.from_numpy(points)
# From PyTorch tensor
import torch
tensor = torch.randn(10000, 3)
cloud = PointCloud.from_torch(tensor)
Properties
cloud.point_count # Number of points
cloud.fields_per_point # Floats per point (3=XYZ, 4=XYZI, 6=XYZRGB)
cloud.dtype # Data type string
cloud.nbytes # Total data size in bytes
cloud.frame_id # Sensor frame identifier
cloud.timestamp_ns # Timestamp in nanoseconds
# Point format queries
cloud.is_xyz() # True if 3 fields (XYZ)
cloud.has_intensity() # True if 4+ fields (XYZI)
cloud.has_color() # True if 6+ fields (XYZRGB)
Framework Conversions
# To NumPy — shape (N, F), zero-copy
np_points = cloud.to_numpy()
# To PyTorch — zero-copy via DLPack
torch_points = cloud.to_torch()
# To JAX — zero-copy via DLPack
jax_points = cloud.to_jax()
Point Access
# Get i-th point as list of floats (float32 clouds only)
point = cloud.point_at(0) # e.g., [1.0, 2.0, 3.0]
Metadata and DLPack
cloud.set_frame_id("lidar_front")
cloud.set_timestamp_ns(1234567890)
cloud.is_cpu() # True (always CPU-backed currently)
# DLPack protocol
capsule = cloud.__dlpack__()
DepthImage
Pool-backed depth image supporting F32 (meters) and U16 (millimeters) formats.
Creating Depth Images
from horus import DepthImage
# Create F32 depth image (meters)
depth = DepthImage(height=480, width=640, dtype="float32")
# Create U16 depth image (millimeters)
depth_u16 = DepthImage(height=480, width=640, dtype="uint16")
# From NumPy — shape (H, W)
import numpy as np
depth_data = np.random.uniform(0.5, 5.0, (480, 640)).astype(np.float32)
depth = DepthImage.from_numpy(depth_data)
# From PyTorch
import torch
depth = DepthImage.from_torch(torch.randn(480, 640))
Properties
depth.height # Image height
depth.width # Image width
depth.dtype # "float32" or "uint16"
depth.nbytes # Total data size
depth.frame_id # Camera frame identifier
depth.timestamp_ns # Timestamp
depth.depth_scale # Scale factor
depth.is_meters() # True if F32 (meters)
depth.is_millimeters() # True if U16 (millimeters)
Depth Access
# Get depth at pixel (always returns meters as float)
d = depth.get_depth(320, 240)
print(f"Depth at center: {d:.3f}m")
# Set depth at pixel (value in meters)
depth.set_depth(100, 100, 1.5)
# Get statistics (min, max, mean) — None if no valid data
stats = depth.depth_statistics()
if stats:
min_d, max_d, mean_d = stats
print(f"Range: {min_d:.2f}-{max_d:.2f}m, mean: {mean_d:.2f}m")
Framework Conversions
np_depth = depth.to_numpy() # Shape: (H, W)
torch_depth = depth.to_torch()
jax_depth = depth.to_jax()
Memory Management
Image, PointCloud, and DepthImage are backed by a shared memory pool that handles allocation, reference counting, and cross-process transport automatically. You don't need to manage the pool directly — HORUS creates and sizes it for you.
For custom tensor shapes that don't fit Image/PointCloud/DepthImage, use GenericMessage or define a custom typed message with the message! macro.
Usage with Topics
All memory types work seamlessly with typed topics for zero-copy IPC:
from horus import Topic, Image, PointCloud, DepthImage
import numpy as np
# Publish an image
img_topic = Topic(Image)
img = Image.from_numpy(np.zeros((480, 640, 3), dtype=np.uint8), encoding="rgb8")
img_topic.send(img)
# Receive an image
received = img_topic.recv()
if received:
np_img = received.to_numpy() # Zero-copy access
print(f"Received {received.width}x{received.height} image")
ML Pipeline Example
import horus
from horus import Image, Topic
import numpy as np
img_topic = Topic(Image)
def camera_tick(node):
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
img = Image.from_numpy(frame, encoding="rgb8")
img.set_frame_id("camera_front")
img_topic.send(img)
def inference_tick(node):
img = img_topic.recv()
if img:
# Zero-copy to PyTorch for inference
tensor = img.to_torch() # No data copy!
# ... run model ...
camera = horus.Node(name="camera", tick=camera_tick, rate=30, order=0)
model = horus.Node(name="model", tick=inference_tick, rate=30, order=1)
horus.run(camera, model)
See Also
- Python Bindings — Core Python API
- ML Utilities — ML framework integration
- Image API (Rust) — Rust Image reference
- PointCloud API (Rust) — Rust PointCloud reference
- DepthImage API (Rust) — Rust DepthImage reference