Creating Custom Channels
Channels are OneClaw’s communication interfaces for receiving and sending messages. This guide shows how to build custom channels with async message handling and proper integration.Channel Trait
All channels implement the asyncChannel trait:
#[async_trait]
pub trait Channel: Send + Sync {
/// Return the name of this channel.
fn name(&self) -> &str;
/// Receive the next incoming message, if any (async).
async fn receive(&self) -> Result<Option<IncomingMessage>>;
/// Send an outgoing message through this channel (async).
async fn send(&self, message: &OutgoingMessage) -> Result<()>;
}
Message Types
IncomingMessage
pub struct IncomingMessage {
/// The source identifier of the message.
pub source: String,
/// The text content of the message.
pub content: String,
/// The timestamp when the message was received.
pub timestamp: chrono::DateTime<chrono::Utc>,
}
OutgoingMessage
pub struct OutgoingMessage {
/// The destination identifier for the message.
pub destination: String,
/// The text content of the message.
pub content: String,
}
Example: CLI Channel
Let’s look at the built-in CLI channel://! CLI Channel — Terminal-based I/O
//! Always included. The simplest channel for testing and direct interaction.
use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
/// CLI channel that reads from stdin and writes to stdout.
pub struct CliChannel {
prompt: String,
}
impl CliChannel {
/// Create a new CLI channel with default prompt "oneclaw> ".
pub fn new() -> Self {
Self { prompt: "oneclaw> ".to_string() }
}
/// Create a CLI channel with a custom prompt.
pub fn with_prompt(prompt: impl Into<String>) -> Self {
Self { prompt: prompt.into() }
}
}
impl Default for CliChannel {
fn default() -> Self { Self::new() }
}
#[async_trait]
impl Channel for CliChannel {
fn name(&self) -> &str { "cli" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
let mut stdout = tokio::io::stdout();
stdout.write_all(self.prompt.as_bytes()).await
.map_err(|e| OneClawError::Channel(format!("stdout write: {}", e)))?;
stdout.flush().await
.map_err(|e| OneClawError::Channel(format!("stdout flush: {}", e)))?;
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(0) => Err(OneClawError::Channel("EOF".into())), // EOF — signal loop to stop
Ok(_) => {
let content = line.trim().to_string();
if content.is_empty() {
return Ok(None);
}
Ok(Some(IncomingMessage {
source: "cli".into(),
content,
timestamp: chrono::Utc::now(),
}))
}
Err(e) => Err(OneClawError::Channel(format!("stdin read: {}", e))),
}
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
let mut stdout = tokio::io::stdout();
let data = format!("{}\n", message.content);
stdout.write_all(data.as_bytes()).await
.map_err(|e| OneClawError::Channel(format!("stdout write: {}", e)))?;
stdout.flush().await
.map_err(|e| OneClawError::Channel(format!("stdout flush: {}", e)))?;
Ok(())
}
}
Key Points
- Async trait: Use
#[async_trait]macro - Error handling: Return
Result<T>withOneClawError::Channel - EOF handling:
receive()returnsErron EOF to signal loop termination - Empty input: Return
Ok(None)for blank lines - Timestamps: Always set
timestampon incoming messages
Example: WebSocket Channel
A channel that communicates over WebSocket:use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
use tokio_tungstenite::tungstenite::Message;
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpStream;
pub struct WebSocketChannel {
ws: Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>,
url: String,
}
impl WebSocketChannel {
pub async fn connect(url: &str) -> Result<Self> {
let (ws_stream, _) = connect_async(url).await
.map_err(|e| OneClawError::Channel(format!("WebSocket connect failed: {}", e)))?;
Ok(Self {
ws: Mutex::new(ws_stream),
url: url.to_string(),
})
}
}
#[async_trait]
impl Channel for WebSocketChannel {
fn name(&self) -> &str { "websocket" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
let mut ws = self.ws.lock().await;
match ws.next().await {
Some(Ok(Message::Text(text))) => {
Ok(Some(IncomingMessage {
source: self.url.clone(),
content: text,
timestamp: chrono::Utc::now(),
}))
}
Some(Ok(Message::Close(_))) => {
Err(OneClawError::Channel("WebSocket closed".into()))
}
Some(Ok(_)) => {
// Ignore binary, ping, pong messages
Ok(None)
}
Some(Err(e)) => {
Err(OneClawError::Channel(format!("WebSocket error: {}", e)))
}
None => {
Err(OneClawError::Channel("WebSocket stream ended".into()))
}
}
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
let mut ws = self.ws.lock().await;
ws.send(Message::Text(message.content.clone())).await
.map_err(|e| OneClawError::Channel(format!("WebSocket send failed: {}", e)))?;
Ok(())
}
}
Example: MQTT Channel
A channel for MQTT pub/sub:use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
use tokio::sync::Mutex;
pub struct MqttChannel {
client: AsyncClient,
eventloop: Mutex<EventLoop>,
topic: String,
}
impl MqttChannel {
pub fn new(broker: &str, port: u16, client_id: &str, topic: &str) -> Result<Self> {
let mut options = MqttOptions::new(client_id, broker, port);
options.set_keep_alive(std::time::Duration::from_secs(30));
let (client, eventloop) = AsyncClient::new(options, 10);
Ok(Self {
client,
eventloop: Mutex::new(eventloop),
topic: topic.to_string(),
})
}
pub async fn connect(&self) -> Result<()> {
self.client.subscribe(&self.topic, QoS::AtLeastOnce).await
.map_err(|e| OneClawError::Channel(format!("MQTT subscribe failed: {}", e)))?;
Ok(())
}
}
#[async_trait]
impl Channel for MqttChannel {
fn name(&self) -> &str { "mqtt" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
let mut eventloop = self.eventloop.lock().await;
loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(publish))) => {
let content = String::from_utf8(publish.payload.to_vec())
.map_err(|e| OneClawError::Channel(format!("Invalid UTF-8: {}", e)))?;
return Ok(Some(IncomingMessage {
source: publish.topic,
content,
timestamp: chrono::Utc::now(),
}));
}
Ok(_) => {
// Ignore other events (connack, puback, etc.)
continue;
}
Err(e) => {
return Err(OneClawError::Channel(format!("MQTT poll error: {}", e)));
}
}
}
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
self.client
.publish(
&message.destination,
QoS::AtLeastOnce,
false,
message.content.as_bytes(),
)
.await
.map_err(|e| OneClawError::Channel(format!("MQTT publish failed: {}", e)))?;
Ok(())
}
}
Example: HTTP Webhook Channel
Receive messages via HTTP POST and send via webhooks:use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::sync::mpsc;
use warp::Filter;
use reqwest::Client;
pub struct WebhookChannel {
receiver: tokio::sync::Mutex<mpsc::Receiver<String>>,
client: Client,
webhook_url: String,
}
impl WebhookChannel {
pub async fn new(listen_port: u16, webhook_url: &str) -> Result<Self> {
let (tx, rx) = mpsc::channel(100);
// Spawn HTTP server
let routes = warp::post()
.and(warp::path("webhook"))
.and(warp::body::json())
.map(move |body: serde_json::Value| {
if let Some(text) = body.get("text").and_then(|v| v.as_str()) {
let _ = tx.blocking_send(text.to_string());
}
warp::reply::json(&serde_json::json!({"status": "ok"}))
});
tokio::spawn(async move {
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
});
Ok(Self {
receiver: tokio::sync::Mutex::new(rx),
client: Client::new(),
webhook_url: webhook_url.to_string(),
})
}
}
#[async_trait]
impl Channel for WebhookChannel {
fn name(&self) -> &str { "webhook" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
let mut rx = self.receiver.lock().await;
match rx.recv().await {
Some(content) => Ok(Some(IncomingMessage {
source: "webhook".into(),
content,
timestamp: chrono::Utc::now(),
})),
None => Err(OneClawError::Channel("Webhook receiver closed".into())),
}
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
let body = serde_json::json!({
"destination": message.destination,
"content": message.content,
});
self.client
.post(&self.webhook_url)
.json(&body)
.send()
.await
.map_err(|e| OneClawError::Channel(format!("Webhook send failed: {}", e)))?;
Ok(())
}
}
Message Handling Patterns
Buffered Receive
use tokio::sync::mpsc;
pub struct BufferedChannel {
buffer: Mutex<mpsc::Receiver<IncomingMessage>>,
}
#[async_trait]
impl Channel for BufferedChannel {
fn name(&self) -> &str { "buffered" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
let mut buffer = self.buffer.lock().await;
Ok(buffer.recv().await)
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
// Send implementation...
Ok(())
}
}
Retry Logic
use tokio::time::{sleep, Duration};
async fn send_with_retry(&self, message: &OutgoingMessage) -> Result<()> {
let max_retries = 3;
let mut attempt = 0;
loop {
match self.send_inner(message).await {
Ok(()) => return Ok(()),
Err(e) if attempt < max_retries => {
attempt += 1;
tracing::warn!(
attempt = attempt,
error = %e,
"Send failed, retrying"
);
sleep(Duration::from_secs(2_u64.pow(attempt))).await;
}
Err(e) => return Err(e),
}
}
}
Batching
use tokio::time::interval;
pub struct BatchedChannel {
pending: Mutex<Vec<OutgoingMessage>>,
}
impl BatchedChannel {
async fn flush_batch(&self) -> Result<()> {
let mut pending = self.pending.lock().await;
if pending.is_empty() {
return Ok(());
}
// Send all pending messages
for message in pending.drain(..) {
self.send_inner(&message).await?;
}
Ok(())
}
pub async fn run_flush_loop(&self) {
let mut ticker = interval(Duration::from_secs(5));
loop {
ticker.tick().await;
let _ = self.flush_batch().await;
}
}
}
#[async_trait]
impl Channel for BatchedChannel {
fn name(&self) -> &str { "batched" }
async fn receive(&self) -> Result<Option<IncomingMessage>> {
// Receive implementation...
Ok(None)
}
async fn send(&self, message: &OutgoingMessage) -> Result<()> {
let mut pending = self.pending.lock().await;
pending.push(message.clone());
Ok(())
}
}
Channel Registration
Register channels with the channel manager:use oneclaw_core::channel::manager::ChannelManager;
use oneclaw_channels::cli::CliChannel;
#[tokio::main]
async fn main() -> Result<()> {
let mut manager = ChannelManager::new();
// Register channels
manager.register(Box::new(CliChannel::new()));
manager.register(Box::new(
WebSocketChannel::connect("ws://localhost:8080").await?
));
// Main loop
loop {
// Receive from any channel
if let Some(message) = manager.receive_any().await? {
println!("[{}] {}", message.source, message.content);
// Process message and send response
let response = OutgoingMessage {
destination: message.source.clone(),
content: format!("Echo: {}", message.content),
};
manager.send("cli", &response).await?;
}
}
}
Testing Your Channel
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_channel_name() {
let channel = MyCustomChannel::new();
assert_eq!(channel.name(), "my_channel");
}
#[tokio::test]
async fn test_send_receive() {
let channel = MyCustomChannel::new();
// Send a message
let out = OutgoingMessage {
destination: "test".into(),
content: "hello".into(),
};
channel.send(&out).await.unwrap();
// Receive it back
let received = timeout(
Duration::from_secs(5),
channel.receive()
).await
.expect("Timeout")
.unwrap()
.expect("No message");
assert_eq!(received.content, "hello");
}
#[tokio::test]
async fn test_receive_timeout() {
let channel = MyCustomChannel::new();
// Should timeout if no messages
let result = timeout(
Duration::from_millis(100),
channel.receive()
).await;
assert!(result.is_err()); // Timeout
}
#[tokio::test]
async fn test_error_handling() {
let channel = MyCustomChannel::with_bad_config();
let result = channel.send(&OutgoingMessage {
destination: "test".into(),
content: "test".into(),
}).await;
assert!(result.is_err());
}
}
Best Practices
- Async all the way: Use tokio’s async I/O primitives
- Handle EOF gracefully: Return
Erron connection close to signal loop termination - Use Mutex for shared state: Wrap mutable state in
tokio::sync::Mutex - Set timeouts: Network operations should have reasonable timeouts
- Implement retry logic: Handle transient network failures
- Buffer incoming messages: Use mpsc channels for decoupling
- Add reconnection logic: Auto-reconnect on connection loss
- Log errors: Use
tracingfor debugging - Test with real services: Use Docker for integration tests
- Document protocol details: Explain message formats and error codes
Error Handling
use oneclaw_core::error::OneClawError;
// Connection errors
Err(OneClawError::Channel("Connection refused".into()))
// Protocol errors
Err(OneClawError::Channel("Invalid message format".into()))
// Timeout errors
Err(OneClawError::Channel("Receive timeout after 30s".into()))
// Authentication errors
Err(OneClawError::Channel("Authentication failed".into()))
// EOF / connection closed
Err(OneClawError::Channel("Connection closed by remote".into()))
See Also
- Architecture - Channel layer design
- API Reference - Full trait documentation
- CLI Channel - Built-in CLI channel reference