Services API

HORUS services provide synchronous request/response communication between nodes. Define a service with the service! macro, run a server with ServiceServerBuilder, and call it with ServiceClient or AsyncServiceClient.

Defining a Service

Use the service! macro to define request and response types:

use horus::prelude::*;

service! {
    /// Look up a robot's current pose by name.
    GetRobotPose {
        request {
            robot_name: String,
        }
        response {
            x: f64,
            y: f64,
            theta: f64,
            timestamp_ns: u64,
        }
    }
}

Service Trait

All services implement the Service trait:

MethodReturnsDescription
name()&'static strService name (used as topic prefix)
request_topic()StringRequest channel name ("{name}.request")
response_topic()StringResponse channel name ("{name}.response")
request_type_name()&'static strHuman-readable request type name
response_type_name()&'static strHuman-readable response type name

ServiceClient (Blocking)

Synchronous client that blocks until a response arrives or the timeout elapses.

Constructor

MethodReturnsDescription
ServiceClient::<S>::new()Result<Self>Create a client with default 1ms poll interval
ServiceClient::<S>::with_poll_interval(interval)Result<Self>Create a client with custom poll interval

Calling

MethodReturnsDescription
call(request, timeout)ServiceResult<S::Response>Block until response or timeout
call_resilient(request, timeout)ServiceResult<S::Response>Auto-retry on transient errors (3 retries, 10ms backoff, 2x multiplier)
call_resilient_with(request, timeout, config)ServiceResult<S::Response>Auto-retry with custom RetryConfig
call_optional(request, timeout)ServiceResult<Option<S::Response>>Returns Ok(None) on timeout instead of Err

Only transient errors (Timeout, Transport) are retried. Permanent errors (ServiceFailed, NoServer) propagate immediately.

Detailed Method Reference

call

pub fn call(&self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>

Send a request and block until a response arrives or the timeout elapses.

Parameters:

  • request: S::Request — The typed request message. S is the service type implementing ServiceDef.
  • timeout: Duration — Maximum time to wait for a response. Use 100_u64.ms(), 1_u64.secs(), etc.

Returns: ServiceResult<S::Response>Ok(response) or Err(ServiceError).

Errors:

  • ServiceError::Timeout — No response within timeout
  • ServiceError::NoServer — No server is registered for this service
  • ServiceError::ServiceFailed(msg) — Server returned an error

Example:

let response = client.call(GetPose { frame: "base" }, 100.ms())?;
println!("x={}, y={}", response.x, response.y);

call_resilient

pub fn call_resilient(&self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>

Like call(), but auto-retries on transient errors (timeout, transport). Uses default retry config: 3 retries, 10ms backoff, 2x multiplier.

Retry behavior: Only Timeout and Transport errors trigger retries. NoServer and ServiceFailed propagate immediately. Total time can exceed timeout due to retries.

call_optional

pub fn call_optional(&self, request: S::Request, timeout: Duration) -> ServiceResult<Option<S::Response>>

Like call(), but returns Ok(None) on timeout instead of Err(Timeout). Useful for non-critical lookups where missing data is acceptable.

Returns: Ok(Some(response)) on success, Ok(None) on timeout, Err(e) on other errors.

Example

use horus::prelude::*;

// Create client for the GetRobotPose service
let mut client = ServiceClient::<GetRobotPose>::new()?;

// Blocking call with 1-second timeout
let response = client.call(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    1_u64.secs(),
)?;
println!("Robot at ({:.2}, {:.2})", response.x, response.y);

// Resilient call — retries on transient failures
let response = client.call_resilient(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    2_u64.secs(),
)?;

// Optional call — Ok(None) on timeout
match client.call_optional(
    GetRobotPoseRequest { robot_name: "arm_1".into() },
    100_u64.ms(),
)? {
    Some(res) => println!("Pose: ({:.2}, {:.2})", res.x, res.y),
    None => println!("Pose server not responding"),
}

AsyncServiceClient (Non-Blocking)

Non-blocking client that returns a PendingServiceCall handle. Check the handle each tick without blocking the scheduler.

Constructor

MethodReturnsDescription
AsyncServiceClient::<S>::new()Result<Self>Create with default 1ms poll interval
AsyncServiceClient::<S>::with_poll_interval(interval)Result<Self>Create with custom poll interval

Calling

MethodReturnsDescription
call_async(request, timeout)PendingServiceCall<S::Response>Send request, return pending handle immediately

PendingServiceCall

MethodReturnsDescription
check()ServiceResult<Option<Res>>Non-blocking check: Ok(Some(res)) if ready, Ok(None) if waiting, Err on timeout/failure
wait()ServiceResult<Res>Block until response arrives or timeout
is_expired()boolWhether the deadline has passed

Example

use horus::prelude::*;

service! {
    GetRobotPose {
        request { robot_name: String }
        response { x: f64, y: f64, theta: f64, timestamp_ns: u64 }
    }
}

struct PlannerNode {
    client: AsyncServiceClient<GetRobotPose>,
    pending: Option<PendingServiceCall<GetRobotPoseResponse>>,
}

impl Node for PlannerNode {
    fn name(&self) -> &str { "Planner" }

    fn tick(&mut self) {
        // Send request if none pending
        if self.pending.is_none() {
            self.pending = Some(self.client.call_async(
                GetRobotPoseRequest { robot_name: "arm_0".into() },
                500_u64.ms(),
            ));
        }

        // Check for response (non-blocking)
        if let Some(ref mut call) = self.pending {
            match call.check() {
                Ok(Some(pose)) => {
                    hlog!(info, "Robot at ({:.2}, {:.2})", pose.x, pose.y);
                    self.pending = None;
                }
                Ok(None) => {} // Still waiting
                Err(e) => {
                    hlog!(warn, "Service call failed: {}", e);
                    self.pending = None;
                }
            }
        }
    }
}

ServiceServerBuilder

Fluent builder for creating a service server.

MethodReturnsDescription
ServiceServerBuilder::<S>::new()SelfCreate a new builder
on_request(handler)SelfRegister the request handler (Fn(Req) -> Result<Res, String>)
poll_interval(interval)SelfOverride poll interval (default: 5ms)
build()Result<ServiceServer<S>>Build and start the server (spawns background thread)

The handler receives the request payload and returns either a response (Ok) or an error message (Err).

The handler's type is:

type RequestHandler<Req, Res> = Box<dyn Fn(Req) -> Result<Res, String> + Send + Sync + 'static>;

ServiceServer

MethodReturnsDescription
stop()()Stop the server (also happens automatically on drop)

The server runs in a background thread. Dropping the ServiceServer handle shuts it down.

Example

use horus::prelude::*;
use std::collections::HashMap;

// Build and start server
let poses: HashMap<String, (f64, f64, f64)> = HashMap::from([
    ("arm_1".into(), (1.5, 2.0, 0.0)),
    ("arm_2".into(), (3.0, 1.0, 1.57)),
]);

let server = ServiceServerBuilder::<GetRobotPose>::new()
    .on_request(move |req| {
        match poses.get(&req.robot_name) {
            Some(&(x, y, theta)) => Ok(GetRobotPoseResponse {
                x, y, theta, timestamp_ns: horus::timestamp_now(),
            }),
            None => Err(format!("Unknown robot: {}", req.robot_name)),
        }
    })
    .poll_interval(1_u64.ms())
    .build()?;

// Server runs in background thread until dropped

ServiceRequest / ServiceResponse

Wrapper types that flow over the wire:

ServiceRequest<Req>

FieldTypeDescription
request_idu64Unique correlation ID (auto-assigned by client)
payloadReqThe actual request data

ServiceResponse<Res>

FieldTypeDescription
request_idu64Echoes the request's correlation ID
okbooltrue if handled successfully
payloadOption<Res>Response data (Some when ok == true)
errorOption<String>Error message (Some when ok == false)
MethodReturnsDescription
ServiceResponse::success(request_id, payload)SelfCreate a successful response
ServiceResponse::failure(request_id, error)SelfCreate an error response

ServiceError

VariantDescriptionTransient?
TimeoutCall timed out waiting for responseYes
ServiceFailed(String)Server returned an errorNo
NoServerNo server registered for this serviceNo
Transport(String)Topic I/O errorYes
MethodReturnsDescription
is_transient()boolWhether a retry may succeed (Timeout and Transport are transient)

ServiceInfo

Metadata returned by horus service list:

FieldTypeDescription
nameStringService name
request_typeStringRust type name of request
response_typeStringRust type name of response
serversusizeActive server count (typically 0 or 1)
clientsusizeKnown client count

Complete Example

use horus::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// Define a key-value store service
service! {
    /// Get or set values in a shared store.
    KeyValueStore {
        request {
            key: String,
            value: Option<String>,  // None = get, Some = set
        }
        response {
            value: Option<String>,
            found: bool,
        }
    }
}

fn main() -> Result<()> {
    let store = Arc::new(Mutex::new(HashMap::<String, String>::new()));
    let store_clone = store.clone();

    // Start server
    let _server = ServiceServerBuilder::<KeyValueStore>::new()
        .on_request(move |req| {
            let mut map = store_clone.lock().unwrap();
            match req.value {
                Some(val) => {
                    map.insert(req.key, val.clone());
                    Ok(KeyValueStoreResponse { value: Some(val), found: true })
                }
                None => {
                    let val = map.get(&req.key).cloned();
                    let found = val.is_some();
                    Ok(KeyValueStoreResponse { value: val, found })
                }
            }
        })
        .build()?;

    // Client: set a value
    let mut client = ServiceClient::<KeyValueStore>::new()?;
    client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: Some("arm_01".into()) },
        1_u64.secs(),
    )?;

    // Client: get it back
    let res = client.call(
        KeyValueStoreRequest { key: "robot_id".into(), value: None },
        1_u64.secs(),
    )?;
    assert_eq!(res.value, Some("arm_01".into()));
    assert!(res.found);

    Ok(())
}

See Also