Services API
HORUS services provide synchronous request/response communication between nodes. Define a service with the service! macro, run a server with ServiceServerBuilder, and call it with ServiceClient or AsyncServiceClient.
Defining a Service
Use the service! macro to define request and response types:
use horus::prelude::*;
service! {
/// Look up a robot's current pose by name.
GetRobotPose {
request {
robot_name: String,
}
response {
x: f64,
y: f64,
theta: f64,
timestamp_ns: u64,
}
}
}
Service Trait
All services implement the Service trait:
| Method | Returns | Description |
|---|---|---|
name() | &'static str | Service name (used as topic prefix) |
request_topic() | String | Request channel name ("{name}.request") |
response_topic() | String | Response channel name ("{name}.response") |
request_type_name() | &'static str | Human-readable request type name |
response_type_name() | &'static str | Human-readable response type name |
ServiceClient (Blocking)
Synchronous client that blocks until a response arrives or the timeout elapses.
Constructor
| Method | Returns | Description |
|---|---|---|
ServiceClient::<S>::new() | Result<Self> | Create a client with default 1ms poll interval |
ServiceClient::<S>::with_poll_interval(interval) | Result<Self> | Create a client with custom poll interval |
Calling
| Method | Returns | Description |
|---|---|---|
call(request, timeout) | ServiceResult<S::Response> | Block until response or timeout |
call_resilient(request, timeout) | ServiceResult<S::Response> | Auto-retry on transient errors (3 retries, 10ms backoff, 2x multiplier) |
call_resilient_with(request, timeout, config) | ServiceResult<S::Response> | Auto-retry with custom RetryConfig |
call_optional(request, timeout) | ServiceResult<Option<S::Response>> | Returns Ok(None) on timeout instead of Err |
Only transient errors (Timeout, Transport) are retried. Permanent errors (ServiceFailed, NoServer) propagate immediately.
Detailed Method Reference
call
pub fn call(&self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>
Send a request and block until a response arrives or the timeout elapses.
Parameters:
request: S::Request— The typed request message.Sis the service type implementingServiceDef.timeout: Duration— Maximum time to wait for a response. Use100_u64.ms(),1_u64.secs(), etc.
Returns: ServiceResult<S::Response> — Ok(response) or Err(ServiceError).
Errors:
ServiceError::Timeout— No response within timeoutServiceError::NoServer— No server is registered for this serviceServiceError::ServiceFailed(msg)— Server returned an error
Example:
let response = client.call(GetPose { frame: "base" }, 100.ms())?;
println!("x={}, y={}", response.x, response.y);
call_resilient
pub fn call_resilient(&self, request: S::Request, timeout: Duration) -> ServiceResult<S::Response>
Like call(), but auto-retries on transient errors (timeout, transport). Uses default retry config: 3 retries, 10ms backoff, 2x multiplier.
Retry behavior: Only Timeout and Transport errors trigger retries. NoServer and ServiceFailed propagate immediately. Total time can exceed timeout due to retries.
call_optional
pub fn call_optional(&self, request: S::Request, timeout: Duration) -> ServiceResult<Option<S::Response>>
Like call(), but returns Ok(None) on timeout instead of Err(Timeout). Useful for non-critical lookups where missing data is acceptable.
Returns: Ok(Some(response)) on success, Ok(None) on timeout, Err(e) on other errors.
Example
use horus::prelude::*;
// Create client for the GetRobotPose service
let mut client = ServiceClient::<GetRobotPose>::new()?;
// Blocking call with 1-second timeout
let response = client.call(
GetRobotPoseRequest { robot_name: "arm_1".into() },
1_u64.secs(),
)?;
println!("Robot at ({:.2}, {:.2})", response.x, response.y);
// Resilient call — retries on transient failures
let response = client.call_resilient(
GetRobotPoseRequest { robot_name: "arm_1".into() },
2_u64.secs(),
)?;
// Optional call — Ok(None) on timeout
match client.call_optional(
GetRobotPoseRequest { robot_name: "arm_1".into() },
100_u64.ms(),
)? {
Some(res) => println!("Pose: ({:.2}, {:.2})", res.x, res.y),
None => println!("Pose server not responding"),
}
AsyncServiceClient (Non-Blocking)
Non-blocking client that returns a PendingServiceCall handle. Check the handle each tick without blocking the scheduler.
Constructor
| Method | Returns | Description |
|---|---|---|
AsyncServiceClient::<S>::new() | Result<Self> | Create with default 1ms poll interval |
AsyncServiceClient::<S>::with_poll_interval(interval) | Result<Self> | Create with custom poll interval |
Calling
| Method | Returns | Description |
|---|---|---|
call_async(request, timeout) | PendingServiceCall<S::Response> | Send request, return pending handle immediately |
PendingServiceCall
| Method | Returns | Description |
|---|---|---|
check() | ServiceResult<Option<Res>> | Non-blocking check: Ok(Some(res)) if ready, Ok(None) if waiting, Err on timeout/failure |
wait() | ServiceResult<Res> | Block until response arrives or timeout |
is_expired() | bool | Whether the deadline has passed |
Example
use horus::prelude::*;
service! {
GetRobotPose {
request { robot_name: String }
response { x: f64, y: f64, theta: f64, timestamp_ns: u64 }
}
}
struct PlannerNode {
client: AsyncServiceClient<GetRobotPose>,
pending: Option<PendingServiceCall<GetRobotPoseResponse>>,
}
impl Node for PlannerNode {
fn name(&self) -> &str { "Planner" }
fn tick(&mut self) {
// Send request if none pending
if self.pending.is_none() {
self.pending = Some(self.client.call_async(
GetRobotPoseRequest { robot_name: "arm_0".into() },
500_u64.ms(),
));
}
// Check for response (non-blocking)
if let Some(ref mut call) = self.pending {
match call.check() {
Ok(Some(pose)) => {
hlog!(info, "Robot at ({:.2}, {:.2})", pose.x, pose.y);
self.pending = None;
}
Ok(None) => {} // Still waiting
Err(e) => {
hlog!(warn, "Service call failed: {}", e);
self.pending = None;
}
}
}
}
}
ServiceServerBuilder
Fluent builder for creating a service server.
| Method | Returns | Description |
|---|---|---|
ServiceServerBuilder::<S>::new() | Self | Create a new builder |
on_request(handler) | Self | Register the request handler (Fn(Req) -> Result<Res, String>) |
poll_interval(interval) | Self | Override poll interval (default: 5ms) |
build() | Result<ServiceServer<S>> | Build and start the server (spawns background thread) |
The handler receives the request payload and returns either a response (Ok) or an error message (Err).
The handler's type is:
type RequestHandler<Req, Res> = Box<dyn Fn(Req) -> Result<Res, String> + Send + Sync + 'static>;
ServiceServer
| Method | Returns | Description |
|---|---|---|
stop() | () | Stop the server (also happens automatically on drop) |
The server runs in a background thread. Dropping the ServiceServer handle shuts it down.
Example
use horus::prelude::*;
use std::collections::HashMap;
// Build and start server
let poses: HashMap<String, (f64, f64, f64)> = HashMap::from([
("arm_1".into(), (1.5, 2.0, 0.0)),
("arm_2".into(), (3.0, 1.0, 1.57)),
]);
let server = ServiceServerBuilder::<GetRobotPose>::new()
.on_request(move |req| {
match poses.get(&req.robot_name) {
Some(&(x, y, theta)) => Ok(GetRobotPoseResponse {
x, y, theta, timestamp_ns: horus::timestamp_now(),
}),
None => Err(format!("Unknown robot: {}", req.robot_name)),
}
})
.poll_interval(1_u64.ms())
.build()?;
// Server runs in background thread until dropped
ServiceRequest / ServiceResponse
Wrapper types that flow over the wire:
ServiceRequest<Req>
| Field | Type | Description |
|---|---|---|
request_id | u64 | Unique correlation ID (auto-assigned by client) |
payload | Req | The actual request data |
ServiceResponse<Res>
| Field | Type | Description |
|---|---|---|
request_id | u64 | Echoes the request's correlation ID |
ok | bool | true if handled successfully |
payload | Option<Res> | Response data (Some when ok == true) |
error | Option<String> | Error message (Some when ok == false) |
| Method | Returns | Description |
|---|---|---|
ServiceResponse::success(request_id, payload) | Self | Create a successful response |
ServiceResponse::failure(request_id, error) | Self | Create an error response |
ServiceError
| Variant | Description | Transient? |
|---|---|---|
Timeout | Call timed out waiting for response | Yes |
ServiceFailed(String) | Server returned an error | No |
NoServer | No server registered for this service | No |
Transport(String) | Topic I/O error | Yes |
| Method | Returns | Description |
|---|---|---|
is_transient() | bool | Whether a retry may succeed (Timeout and Transport are transient) |
ServiceInfo
Metadata returned by horus service list:
| Field | Type | Description |
|---|---|---|
name | String | Service name |
request_type | String | Rust type name of request |
response_type | String | Rust type name of response |
servers | usize | Active server count (typically 0 or 1) |
clients | usize | Known client count |
Complete Example
use horus::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// Define a key-value store service
service! {
/// Get or set values in a shared store.
KeyValueStore {
request {
key: String,
value: Option<String>, // None = get, Some = set
}
response {
value: Option<String>,
found: bool,
}
}
}
fn main() -> Result<()> {
let store = Arc::new(Mutex::new(HashMap::<String, String>::new()));
let store_clone = store.clone();
// Start server
let _server = ServiceServerBuilder::<KeyValueStore>::new()
.on_request(move |req| {
let mut map = store_clone.lock().unwrap();
match req.value {
Some(val) => {
map.insert(req.key, val.clone());
Ok(KeyValueStoreResponse { value: Some(val), found: true })
}
None => {
let val = map.get(&req.key).cloned();
let found = val.is_some();
Ok(KeyValueStoreResponse { value: val, found })
}
}
})
.build()?;
// Client: set a value
let mut client = ServiceClient::<KeyValueStore>::new()?;
client.call(
KeyValueStoreRequest { key: "robot_id".into(), value: Some("arm_01".into()) },
1_u64.secs(),
)?;
// Client: get it back
let res = client.call(
KeyValueStoreRequest { key: "robot_id".into(), value: None },
1_u64.secs(),
)?;
assert_eq!(res.value, Some("arm_01".into()));
assert!(res.found);
Ok(())
}
See Also
- Services Concepts — Architecture and design patterns
- Actions API — Long-running tasks with feedback and cancellation
- Topic API — Streaming pub/sub communication
- Error Handling — RetryConfig for resilient calls