Telemetry Logger

Subscribes to topics and writes them to a CSV log file using .async_io() execution class. File I/O happens on a Tokio blocking thread — it never blocks or delays the real-time control loop.

horus.toml

[package]
name = "telemetry-logger"
version = "0.1.0"
description = "Non-blocking topic logger to CSV"

Complete Code

use horus::prelude::*;
use std::fs::File;
use std::io::Write as IoWrite;

/// Pose data to log
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct FusedPose {
    x: f32,
    y: f32,
    theta: f32,
    speed: f32,
    confidence: f32,
}

/// Motor telemetry to log
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, LogSummary)]
#[repr(C)]
struct MotorTelemetry {
    left_rpm: f32,
    right_rpm: f32,
    battery_voltage: f32,
}

// ── Logger Node ─────────────────────────────────────────────

struct LoggerNode {
    pose_sub: Topic<FusedPose>,
    motor_sub: Topic<MotorTelemetry>,
    file: Option<File>,
    line_count: u64,
}

impl LoggerNode {
    fn new() -> Result<Self> {
        Ok(Self {
            pose_sub: Topic::new("pose.fused")?,
            motor_sub: Topic::new("motor.telemetry")?,
            file: None,
            line_count: 0,
        })
    }
}

impl Node for LoggerNode {
    fn name(&self) -> &str { "Logger" }

    fn init(&mut self) -> Result<()> {
        // Open log file in init() — runs once before tick loop
        let mut f = File::create("telemetry.csv")
            .map_err(|e| Error::Config(
                format!("Failed to create log file: {}", e)
            )))?;
        writeln!(f, "tick,x,y,theta,speed,confidence,left_rpm,right_rpm,battery_v")
            .map_err(|e| Error::Config(
                format!("Failed to write header: {}", e)
            )))?;
        self.file = Some(f);
        Ok(())
    }

    fn tick(&mut self) {
        self.line_count += 1;

        // IMPORTANT: always recv() every tick to drain buffers
        let pose = self.pose_sub.recv().unwrap_or_default();
        let motor = self.motor_sub.recv().unwrap_or_default();

        // Write CSV line — file I/O is safe here because we use async_io()
        if let Some(ref mut f) = self.file {
            let _ = writeln!(
                f,
                "{},{:.4},{:.4},{:.4},{:.4},{:.2},{:.1},{:.1},{:.2}",
                self.line_count,
                pose.x, pose.y, pose.theta, pose.speed, pose.confidence,
                motor.left_rpm, motor.right_rpm, motor.battery_voltage,
            );
        }
    }

    fn shutdown(&mut self) -> Result<()> {
        // Flush and close the file
        if let Some(ref mut f) = self.file {
            let _ = f.flush();
        }
        self.file = None;
        Ok(())
    }
}

fn main() -> Result<()> {
    let mut scheduler = Scheduler::new();

    // Execution order: logger runs on async I/O thread pool — never blocks RT nodes
    scheduler.add(LoggerNode::new()?)
        .order(99)             // runs after all data-producing nodes
        .async_io()            // Tokio blocking pool — file I/O is safe
        .rate(10_u64.hz())     // 10Hz logging — enough for post-analysis
        .build()?;

    scheduler.run()
}

Expected Output

[HORUS] Scheduler running — tick_rate: 10 Hz
[HORUS] Node "Logger" started (AsyncIo, 10 Hz)
^C
[HORUS] Shutting down...
[HORUS] Node "Logger" shutdown complete

Generated telemetry.csv:

tick,x,y,theta,speed,confidence,left_rpm,right_rpm,battery_v
1,0.0000,0.0000,0.0000,0.0000,0.00,0.0,0.0,0.00
2,0.0100,0.0000,0.0100,0.5000,0.90,45.0,47.0,12.40
...

Key Points

  • .async_io() runs the node on a Tokio blocking thread pool — file writes never block the RT scheduler
  • init() opens the file once before the tick loop starts
  • shutdown() flushes and closes the file — prevents data loss
  • unwrap_or_default() on recv — logger uses default (zeros) if a topic hasn't published yet
  • 10Hz logging is typical for post-flight analysis; use 100Hz+ for real-time debugging
  • Combine with any other recipe — just match the topic names and message types