Building Your Second Application (Python)
Now that you have built your first HORUS application in Python, let's create something more practical: a 3-node sensor pipeline that reads temperature data, filters out noise, and displays the results.
Prerequisites
- HORUS installed (Installation Guide)
- Quick Start (Python) completed (you know how to create a Python project and run it)
- Python 3.9+ with HORUS bindings (
python3 -c "import horus"works)
Time: ~20 minutes
What You'll Build
A real-time temperature monitoring system with three nodes:
- SensorNode: Publishes simulated temperature readings every second
- FilterNode: Subscribes to raw temperatures, filters noise, republishes clean data
- DisplayNode: Subscribes to filtered data, displays to console
This demonstrates:
- Multi-node communication patterns
- Data pipeline processing
- Real-time filtering with state
- CLI introspection tools
Architecture
Step 1: Create the Project
horus new temperature_pipeline -p
cd temperature_pipeline
The -p flag creates a Python project with src/main.py, horus.toml, and .horus/.
Step 2: Write the Code
Replace src/main.py with this complete, runnable code:
import math
import horus
# ============================================================================
# Node 1: SensorNode — Publishes temperature readings
# ============================================================================
def make_sensor():
state = {"reading": 0.0}
def init(node):
node.log_info("Temperature sensor initialized")
def tick(node):
# Simulate realistic temperature with noise
# Base temperature oscillates between 20-30°C
base_temp = 25.0 + math.sin(state["reading"] * 0.1) * 5.0
# Add random noise (+/- 2°C)
noise = math.sin(state["reading"] * 0.7) * 2.0
temperature = base_temp + noise
# Publish raw temperature
node.send("raw_temp", temperature)
node.log_info(f"Published raw temp: {temperature:.2f}°C")
state["reading"] += 1.0
def shutdown(node):
node.log_info("Sensor shutdown complete")
return horus.Node(
name="SensorNode",
tick=tick,
init=init,
shutdown=shutdown,
rate=1, # 1 Hz — one reading per second
order=0, # Runs first in each tick cycle
pubs=["raw_temp"],
)
# ============================================================================
# Node 2: FilterNode — Removes noise with exponential moving average
# ============================================================================
def make_filter():
state = {
"filtered_value": None,
"alpha": 0.3, # Smoothing factor: 30% new data, 70% previous
}
def init(node):
node.log_info(f"Filter initialized (alpha = {state['alpha']:.2f})")
def tick(node):
raw_temp = node.recv("raw_temp")
if raw_temp is None:
return
alpha = state["alpha"]
# Apply exponential moving average filter
if state["filtered_value"] is None:
filtered = raw_temp # First reading, no previous value
else:
prev = state["filtered_value"]
filtered = alpha * raw_temp + (1.0 - alpha) * prev
state["filtered_value"] = filtered
# Publish filtered temperature
node.send("filtered_temp", filtered)
noise_removed = raw_temp - filtered
node.log_info(
f"Filtered: {raw_temp:.2f}°C -> {filtered:.2f}°C "
f"(removed {noise_removed:.2f}°C noise)"
)
def shutdown(node):
node.log_info("Filter shutdown complete")
return horus.Node(
name="FilterNode",
tick=tick,
init=init,
shutdown=shutdown,
rate=1,
order=1, # Runs after SensorNode
pubs=["filtered_temp"],
subs=["raw_temp"],
)
# ============================================================================
# Node 3: DisplayNode — Shows filtered temperature on console
# ============================================================================
def make_display():
state = {"count": 0}
def init(node):
node.log_info("Display initialized")
print("\n========================================")
print(" Temperature Monitor — Press Ctrl+C to stop")
print("========================================\n")
def tick(node):
temp = node.recv("filtered_temp")
if temp is None:
return
state["count"] += 1
# Display temperature with status indicator
if temp < 22.0:
status = "COLD"
elif temp > 28.0:
status = "HOT"
else:
status = "NORMAL"
print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C — Status: {status}")
node.log_debug(f"Displayed reading #{state['count']}")
def shutdown(node):
print(f"\n========================================")
print(f" Total readings displayed: {state['count']}")
print(f"========================================\n")
node.log_info("Display shutdown complete")
return horus.Node(
name="DisplayNode",
tick=tick,
init=init,
shutdown=shutdown,
rate=1,
order=2, # Runs last — after FilterNode
subs=["filtered_temp"],
)
# ============================================================================
# Main — Configure and run the pipeline
# ============================================================================
if __name__ == "__main__":
print("Starting Temperature Pipeline...\n")
horus.run(make_sensor(), make_filter(), make_display())
Step 3: Run the Application
horus run
Expected output:
Starting Temperature Pipeline...
========================================
Temperature Monitor — Press Ctrl+C to stop
========================================
[Reading #1] Temperature: 23.4°C — Status: NORMAL
[Reading #2] Temperature: 24.1°C — Status: NORMAL
[Reading #3] Temperature: 25.8°C — Status: NORMAL
[Reading #4] Temperature: 27.2°C — Status: NORMAL
[Reading #5] Temperature: 28.6°C — Status: HOT
[Reading #6] Temperature: 27.9°C — Status: NORMAL
[Reading #7] Temperature: 26.3°C — Status: NORMAL
Press Ctrl+C to stop:
^C
Ctrl+C received! Shutting down HORUS scheduler...
========================================
Total readings displayed: 7
========================================
Always use horus run — do NOT run python src/main.py directly.
horus run sets up the SHM namespace, environment variables, and build pipeline before executing your code. Running python src/main.py directly means topics won't connect to other processes, and CLI introspection tools like horus topic echo won't work.
Step 4: Inspect with CLI Tools
While the application is running, open a second terminal to inspect the live data.
List Active Topics
horus topic list
You should see both topics:
raw_temp (active, 1 publisher)
filtered_temp (active, 1 publisher)
Echo Raw Sensor Data
Watch the noisy sensor readings in real time:
horus topic echo raw_temp
[1] 23.42
[2] 25.87
[3] 27.14
[4] 24.63
...
Press Ctrl+C to stop echoing.
Echo Filtered Data
Compare with the smoothed output:
horus topic echo filtered_temp
[1] 23.42
[2] 25.14
[3] 25.74
[4] 25.41
...
Notice how the filtered values change more gradually than the raw values.
Measure Publishing Rate
Verify each topic publishes at the expected rate:
horus topic hz raw_temp
average rate: 1.00 Hz
min: 0.998s, max: 1.002s, std dev: 0.001s
horus topic hz filtered_temp
The filtered topic should also show ~1 Hz since the FilterNode republishes every time it receives a new reading.
Monitor Dashboard
For a full overview of nodes, topics, and metrics:
horus monitor
The monitor will show:
- Nodes tab: SensorNode, FilterNode, DisplayNode with their rates and states
- Topics tab:
raw_tempandfiltered_tempwith message counts - Metrics tab: IPC latency, tick duration, and message throughput
Understanding the Code
SensorNode
def tick(node):
base_temp = 25.0 + math.sin(state["reading"] * 0.1) * 5.0
noise = math.sin(state["reading"] * 0.7) * 2.0
temperature = base_temp + noise
node.send("raw_temp", temperature)
Key points:
- Publishes to
"raw_temp"topic at 1 Hz (set byrate=1) - State lives in a dictionary captured by the closure
node.send(topic, data)publishes any serializable value
FilterNode
def tick(node):
raw_temp = node.recv("raw_temp")
if raw_temp is None:
return
filtered = alpha * raw_temp + (1.0 - alpha) * prev
node.send("filtered_temp", filtered)
Key points:
- Subscribes to
"raw_temp", publishes to"filtered_temp" - Implements an exponential moving average (EMA) filter
alpha = 0.3balances responsiveness vs smoothnessnode.recv()returnsNonewhen no message is available (not an error)
Filter behavior:
- High alpha (0.8): Fast response, less smoothing
- Low alpha (0.2): Slow response, more smoothing
DisplayNode
def tick(node):
temp = node.recv("filtered_temp")
if temp is None:
return
print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C — Status: {status}")
Key points:
- Subscribes to
"filtered_temp"only - Only prints when new data is available
- Uses
initandshutdowncallbacks for banner display
State Management
Python nodes manage state through closures. Each make_*() factory function creates a state dictionary that the tick, init, and shutdown functions capture:
def make_sensor():
state = {"reading": 0.0} # Mutable state
def tick(node):
state["reading"] += 1.0 # Mutate via dict
node.send("raw_temp", state["reading"])
return horus.Node(name="SensorNode", tick=tick, ...)
Dictionaries work because tick closes over the dict reference, and dict values are mutable in place. You can also use a list ([0.0]) for single values or a class instance for complex state.
Execution Order
The order parameter controls when each node runs within a tick cycle:
make_sensor() # order=0 — runs first, produces data
make_filter() # order=1 — runs second, consumes and re-publishes
make_display() # order=2 — runs third, consumes final output
Lower order values run first. This ensures data flows through the pipeline in a single tick cycle without a one-tick delay between stages.
Common Issues and Fixes
Issue: No Output Displayed
Symptom:
Starting Temperature Pipeline...
========================================
Temperature Monitor — Press Ctrl+C to stop
========================================
[Nothing appears]
Cause: Topic names don't match between publisher and subscriber.
Fix:
- Check topic names match exactly:
"raw_temp"and"filtered_temp" - Verify with
horus topic listin a second terminal - Ensure all three nodes are passed to
horus.run()
Issue: Too Much or Too Little Smoothing
Symptom: Temperature changes too fast or too slow.
Fix: Adjust the alpha value in make_filter():
state = {
"alpha": 0.3, # Current: moderate smoothing
# Try these alternatives:
# "alpha": 0.7, # More responsive, less smooth
# "alpha": 0.1, # Very smooth, slower response
}
Issue: ModuleNotFoundError: horus
Cause: Python bindings not installed.
Fix:
pip install horus-robotics
For source builds:
cd horus_py && maturin develop --release
Issue: horus topic echo Shows Nothing
Cause: Application not running, or running with python src/main.py instead of horus run.
Fix:
- Start the application with
horus run(notpython src/main.py) - In a separate terminal, run
horus topic echo raw_temp - If still empty, run
horus topic listto check which topics exist
Issue: Stale Shared Memory
Symptom: Failed to create Topic or topics from a previous run interfere.
Fix:
horus clean --shm
Experiments to Try
Change the Update Rate
Make the sensor publish faster (2 Hz instead of 1 Hz):
return horus.Node(
name="SensorNode",
tick=tick,
rate=2, # 2 Hz instead of 1 Hz
...
)
Remember to update FilterNode and DisplayNode rates to match, or they will only process every other reading.
Add Temperature Alerts
Add a warning when the temperature exceeds a threshold:
def tick(node):
temp = node.recv("filtered_temp")
if temp is None:
return
state["count"] += 1
print(f"[Reading #{state['count']}] Temperature: {temp:.1f}°C")
if temp > 30.0:
print(" WARNING: High temperature detected!")
node.log_warning(f"High temp alert: {temp:.1f}°C")
Log Data to a File
Add file logging to the DisplayNode:
def make_display():
state = {"count": 0, "logfile": None}
def init(node):
state["logfile"] = open("temperature_log.csv", "w")
state["logfile"].write("reading,temperature\n")
def tick(node):
temp = node.recv("filtered_temp")
if temp is None:
return
state["count"] += 1
state["logfile"].write(f"{state['count']},{temp:.2f}\n")
state["logfile"].flush()
print(f"[#{state['count']}] {temp:.1f}°C (logged)")
def shutdown(node):
if state["logfile"]:
state["logfile"].close()
node.log_info("Log file closed")
return horus.Node(name="DisplayNode", tick=tick, init=init,
shutdown=shutdown, rate=1, order=2,
subs=["filtered_temp"])
Use a Class for State
If you prefer classes over closures:
class SensorState:
def __init__(self):
self.reading = 0.0
sensor = SensorState()
def sensor_tick(node):
sensor.reading += 1.0
base_temp = 25.0 + math.sin(sensor.reading * 0.1) * 5.0
noise = math.sin(sensor.reading * 0.7) * 2.0
node.send("raw_temp", base_temp + noise)
sensor_node = horus.Node(
name="SensorNode", tick=sensor_tick, rate=1, order=0,
pubs=["raw_temp"],
)
Both approaches are valid. Closures keep state co-located with the factory function; classes work better when state is complex or shared.
Use the Scheduler Directly
For more control over execution, use horus.Scheduler instead of horus.run():
sched = horus.Scheduler(tick_rate=100, watchdog_ms=500)
sched.add(make_sensor())
sched.add(make_filter())
sched.add(make_display())
# Run for 30 seconds, then stop automatically
sched.run(duration=30.0)
This gives you access to runtime mutation, safety configuration, and introspection methods. See the Python API Reference for the full Scheduler API.
Key Takeaways
- Pipeline pattern: Data flows through stages (Sensor -> Filter -> Display), each a separate node
- Pub/sub decoupling: Nodes only know about topic names, not each other
- Execution order:
ordercontrols which node runs first in each tick cycle recv()is non-blocking: ReturnsNonewhen no message is available -- not an error- State in closures: Use dictionaries or class instances captured by tick functions
- CLI introspection:
horus topic echo,horus topic hz, andhorus monitorwork on live data from any terminal
Next Steps
Now that you have built a 3-node pipeline, try:
- Python API Reference -- Full
horus.Node,horus.Scheduler, andhorus.Topicdocs - Testing -- Learn how to unit test your nodes with
tick_once() - Message Types -- Use typed messages (
horus.CmdVel,horus.Imu) instead of primitives - Using Pre-Built Nodes -- Use library nodes instead of writing from scratch
- Choosing a Language -- When to use Python vs Rust
Full Code
The complete code above is production-ready. To save it:
- Copy the entire code block from Step 2
- Replace
src/main.pyin your project - Run with
horus run
For additional examples, see Python Examples.
See Also
- Quick Start (Python) -- Your first Python application
- Building Your Second Application (Rust) -- The same pipeline in Rust
- Tutorials -- Continue learning with guided tutorials
- Recipes -- Production-ready patterns to copy
- Core Concepts -- Understand how HORUS works