Skip to main content

Creating Custom Channels

Channels are OneClaw’s communication interfaces for receiving and sending messages. This guide shows how to build custom channels with async message handling and proper integration.

Channel Trait

All channels implement the async Channel trait:
#[async_trait]
pub trait Channel: Send + Sync {
    /// Return the name of this channel.
    fn name(&self) -> &str;
    /// Receive the next incoming message, if any (async).
    async fn receive(&self) -> Result<Option<IncomingMessage>>;
    /// Send an outgoing message through this channel (async).
    async fn send(&self, message: &OutgoingMessage) -> Result<()>;
}

Message Types

IncomingMessage

pub struct IncomingMessage {
    /// The source identifier of the message.
    pub source: String,
    /// The text content of the message.
    pub content: String,
    /// The timestamp when the message was received.
    pub timestamp: chrono::DateTime<chrono::Utc>,
}

OutgoingMessage

pub struct OutgoingMessage {
    /// The destination identifier for the message.
    pub destination: String,
    /// The text content of the message.
    pub content: String,
}

Example: CLI Channel

Let’s look at the built-in CLI channel:
//! CLI Channel — Terminal-based I/O
//! Always included. The simplest channel for testing and direct interaction.

use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

/// CLI channel that reads from stdin and writes to stdout.
pub struct CliChannel {
    prompt: String,
}

impl CliChannel {
    /// Create a new CLI channel with default prompt "oneclaw> ".
    pub fn new() -> Self {
        Self { prompt: "oneclaw> ".to_string() }
    }

    /// Create a CLI channel with a custom prompt.
    pub fn with_prompt(prompt: impl Into<String>) -> Self {
        Self { prompt: prompt.into() }
    }
}

impl Default for CliChannel {
    fn default() -> Self { Self::new() }
}

#[async_trait]
impl Channel for CliChannel {
    fn name(&self) -> &str { "cli" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        let mut stdout = tokio::io::stdout();
        stdout.write_all(self.prompt.as_bytes()).await
            .map_err(|e| OneClawError::Channel(format!("stdout write: {}", e)))?;
        stdout.flush().await
            .map_err(|e| OneClawError::Channel(format!("stdout flush: {}", e)))?;

        let stdin = tokio::io::stdin();
        let mut reader = BufReader::new(stdin);
        let mut line = String::new();
        match reader.read_line(&mut line).await {
            Ok(0) => Err(OneClawError::Channel("EOF".into())), // EOF — signal loop to stop
            Ok(_) => {
                let content = line.trim().to_string();
                if content.is_empty() {
                    return Ok(None);
                }
                Ok(Some(IncomingMessage {
                    source: "cli".into(),
                    content,
                    timestamp: chrono::Utc::now(),
                }))
            }
            Err(e) => Err(OneClawError::Channel(format!("stdin read: {}", e))),
        }
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        let mut stdout = tokio::io::stdout();
        let data = format!("{}\n", message.content);
        stdout.write_all(data.as_bytes()).await
            .map_err(|e| OneClawError::Channel(format!("stdout write: {}", e)))?;
        stdout.flush().await
            .map_err(|e| OneClawError::Channel(format!("stdout flush: {}", e)))?;
        Ok(())
    }
}

Key Points

  1. Async trait: Use #[async_trait] macro
  2. Error handling: Return Result<T> with OneClawError::Channel
  3. EOF handling: receive() returns Err on EOF to signal loop termination
  4. Empty input: Return Ok(None) for blank lines
  5. Timestamps: Always set timestamp on incoming messages

Example: WebSocket Channel

A channel that communicates over WebSocket:
use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
use tokio_tungstenite::tungstenite::Message;
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpStream;

pub struct WebSocketChannel {
    ws: Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>,
    url: String,
}

impl WebSocketChannel {
    pub async fn connect(url: &str) -> Result<Self> {
        let (ws_stream, _) = connect_async(url).await
            .map_err(|e| OneClawError::Channel(format!("WebSocket connect failed: {}", e)))?;

        Ok(Self {
            ws: Mutex::new(ws_stream),
            url: url.to_string(),
        })
    }
}

#[async_trait]
impl Channel for WebSocketChannel {
    fn name(&self) -> &str { "websocket" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        let mut ws = self.ws.lock().await;

        match ws.next().await {
            Some(Ok(Message::Text(text))) => {
                Ok(Some(IncomingMessage {
                    source: self.url.clone(),
                    content: text,
                    timestamp: chrono::Utc::now(),
                }))
            }
            Some(Ok(Message::Close(_))) => {
                Err(OneClawError::Channel("WebSocket closed".into()))
            }
            Some(Ok(_)) => {
                // Ignore binary, ping, pong messages
                Ok(None)
            }
            Some(Err(e)) => {
                Err(OneClawError::Channel(format!("WebSocket error: {}", e)))
            }
            None => {
                Err(OneClawError::Channel("WebSocket stream ended".into()))
            }
        }
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        let mut ws = self.ws.lock().await;
        ws.send(Message::Text(message.content.clone())).await
            .map_err(|e| OneClawError::Channel(format!("WebSocket send failed: {}", e)))?;
        Ok(())
    }
}

Example: MQTT Channel

A channel for MQTT pub/sub:
use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
use tokio::sync::Mutex;

pub struct MqttChannel {
    client: AsyncClient,
    eventloop: Mutex<EventLoop>,
    topic: String,
}

impl MqttChannel {
    pub fn new(broker: &str, port: u16, client_id: &str, topic: &str) -> Result<Self> {
        let mut options = MqttOptions::new(client_id, broker, port);
        options.set_keep_alive(std::time::Duration::from_secs(30));

        let (client, eventloop) = AsyncClient::new(options, 10);

        Ok(Self {
            client,
            eventloop: Mutex::new(eventloop),
            topic: topic.to_string(),
        })
    }

    pub async fn connect(&self) -> Result<()> {
        self.client.subscribe(&self.topic, QoS::AtLeastOnce).await
            .map_err(|e| OneClawError::Channel(format!("MQTT subscribe failed: {}", e)))?;
        Ok(())
    }
}

#[async_trait]
impl Channel for MqttChannel {
    fn name(&self) -> &str { "mqtt" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        let mut eventloop = self.eventloop.lock().await;

        loop {
            match eventloop.poll().await {
                Ok(Event::Incoming(Incoming::Publish(publish))) => {
                    let content = String::from_utf8(publish.payload.to_vec())
                        .map_err(|e| OneClawError::Channel(format!("Invalid UTF-8: {}", e)))?;

                    return Ok(Some(IncomingMessage {
                        source: publish.topic,
                        content,
                        timestamp: chrono::Utc::now(),
                    }));
                }
                Ok(_) => {
                    // Ignore other events (connack, puback, etc.)
                    continue;
                }
                Err(e) => {
                    return Err(OneClawError::Channel(format!("MQTT poll error: {}", e)));
                }
            }
        }
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        self.client
            .publish(
                &message.destination,
                QoS::AtLeastOnce,
                false,
                message.content.as_bytes(),
            )
            .await
            .map_err(|e| OneClawError::Channel(format!("MQTT publish failed: {}", e)))?;
        Ok(())
    }
}

Example: HTTP Webhook Channel

Receive messages via HTTP POST and send via webhooks:
use oneclaw_core::channel::traits::{Channel, IncomingMessage, OutgoingMessage};
use oneclaw_core::error::{OneClawError, Result};
use async_trait::async_trait;
use tokio::sync::mpsc;
use warp::Filter;
use reqwest::Client;

pub struct WebhookChannel {
    receiver: tokio::sync::Mutex<mpsc::Receiver<String>>,
    client: Client,
    webhook_url: String,
}

impl WebhookChannel {
    pub async fn new(listen_port: u16, webhook_url: &str) -> Result<Self> {
        let (tx, rx) = mpsc::channel(100);

        // Spawn HTTP server
        let routes = warp::post()
            .and(warp::path("webhook"))
            .and(warp::body::json())
            .map(move |body: serde_json::Value| {
                if let Some(text) = body.get("text").and_then(|v| v.as_str()) {
                    let _ = tx.blocking_send(text.to_string());
                }
                warp::reply::json(&serde_json::json!({"status": "ok"}))
            });

        tokio::spawn(async move {
            warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
        });

        Ok(Self {
            receiver: tokio::sync::Mutex::new(rx),
            client: Client::new(),
            webhook_url: webhook_url.to_string(),
        })
    }
}

#[async_trait]
impl Channel for WebhookChannel {
    fn name(&self) -> &str { "webhook" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        let mut rx = self.receiver.lock().await;
        match rx.recv().await {
            Some(content) => Ok(Some(IncomingMessage {
                source: "webhook".into(),
                content,
                timestamp: chrono::Utc::now(),
            })),
            None => Err(OneClawError::Channel("Webhook receiver closed".into())),
        }
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        let body = serde_json::json!({
            "destination": message.destination,
            "content": message.content,
        });

        self.client
            .post(&self.webhook_url)
            .json(&body)
            .send()
            .await
            .map_err(|e| OneClawError::Channel(format!("Webhook send failed: {}", e)))?;

        Ok(())
    }
}

Message Handling Patterns

Buffered Receive

use tokio::sync::mpsc;

pub struct BufferedChannel {
    buffer: Mutex<mpsc::Receiver<IncomingMessage>>,
}

#[async_trait]
impl Channel for BufferedChannel {
    fn name(&self) -> &str { "buffered" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        let mut buffer = self.buffer.lock().await;
        Ok(buffer.recv().await)
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        // Send implementation...
        Ok(())
    }
}

Retry Logic

use tokio::time::{sleep, Duration};

async fn send_with_retry(&self, message: &OutgoingMessage) -> Result<()> {
    let max_retries = 3;
    let mut attempt = 0;

    loop {
        match self.send_inner(message).await {
            Ok(()) => return Ok(()),
            Err(e) if attempt < max_retries => {
                attempt += 1;
                tracing::warn!(
                    attempt = attempt,
                    error = %e,
                    "Send failed, retrying"
                );
                sleep(Duration::from_secs(2_u64.pow(attempt))).await;
            }
            Err(e) => return Err(e),
        }
    }
}

Batching

use tokio::time::interval;

pub struct BatchedChannel {
    pending: Mutex<Vec<OutgoingMessage>>,
}

impl BatchedChannel {
    async fn flush_batch(&self) -> Result<()> {
        let mut pending = self.pending.lock().await;
        if pending.is_empty() {
            return Ok(());
        }

        // Send all pending messages
        for message in pending.drain(..) {
            self.send_inner(&message).await?;
        }

        Ok(())
    }

    pub async fn run_flush_loop(&self) {
        let mut ticker = interval(Duration::from_secs(5));
        loop {
            ticker.tick().await;
            let _ = self.flush_batch().await;
        }
    }
}

#[async_trait]
impl Channel for BatchedChannel {
    fn name(&self) -> &str { "batched" }

    async fn receive(&self) -> Result<Option<IncomingMessage>> {
        // Receive implementation...
        Ok(None)
    }

    async fn send(&self, message: &OutgoingMessage) -> Result<()> {
        let mut pending = self.pending.lock().await;
        pending.push(message.clone());
        Ok(())
    }
}

Channel Registration

Register channels with the channel manager:
use oneclaw_core::channel::manager::ChannelManager;
use oneclaw_channels::cli::CliChannel;

#[tokio::main]
async fn main() -> Result<()> {
    let mut manager = ChannelManager::new();

    // Register channels
    manager.register(Box::new(CliChannel::new()));
    manager.register(Box::new(
        WebSocketChannel::connect("ws://localhost:8080").await?
    ));

    // Main loop
    loop {
        // Receive from any channel
        if let Some(message) = manager.receive_any().await? {
            println!("[{}] {}", message.source, message.content);

            // Process message and send response
            let response = OutgoingMessage {
                destination: message.source.clone(),
                content: format!("Echo: {}", message.content),
            };
            manager.send("cli", &response).await?;
        }
    }
}

Testing Your Channel

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time::{timeout, Duration};

    #[tokio::test]
    async fn test_channel_name() {
        let channel = MyCustomChannel::new();
        assert_eq!(channel.name(), "my_channel");
    }

    #[tokio::test]
    async fn test_send_receive() {
        let channel = MyCustomChannel::new();

        // Send a message
        let out = OutgoingMessage {
            destination: "test".into(),
            content: "hello".into(),
        };
        channel.send(&out).await.unwrap();

        // Receive it back
        let received = timeout(
            Duration::from_secs(5),
            channel.receive()
        ).await
            .expect("Timeout")
            .unwrap()
            .expect("No message");

        assert_eq!(received.content, "hello");
    }

    #[tokio::test]
    async fn test_receive_timeout() {
        let channel = MyCustomChannel::new();

        // Should timeout if no messages
        let result = timeout(
            Duration::from_millis(100),
            channel.receive()
        ).await;

        assert!(result.is_err()); // Timeout
    }

    #[tokio::test]
    async fn test_error_handling() {
        let channel = MyCustomChannel::with_bad_config();
        let result = channel.send(&OutgoingMessage {
            destination: "test".into(),
            content: "test".into(),
        }).await;

        assert!(result.is_err());
    }
}

Best Practices

  1. Async all the way: Use tokio’s async I/O primitives
  2. Handle EOF gracefully: Return Err on connection close to signal loop termination
  3. Use Mutex for shared state: Wrap mutable state in tokio::sync::Mutex
  4. Set timeouts: Network operations should have reasonable timeouts
  5. Implement retry logic: Handle transient network failures
  6. Buffer incoming messages: Use mpsc channels for decoupling
  7. Add reconnection logic: Auto-reconnect on connection loss
  8. Log errors: Use tracing for debugging
  9. Test with real services: Use Docker for integration tests
  10. Document protocol details: Explain message formats and error codes

Error Handling

use oneclaw_core::error::OneClawError;

// Connection errors
Err(OneClawError::Channel("Connection refused".into()))

// Protocol errors
Err(OneClawError::Channel("Invalid message format".into()))

// Timeout errors
Err(OneClawError::Channel("Receive timeout after 30s".into()))

// Authentication errors
Err(OneClawError::Channel("Authentication failed".into()))

// EOF / connection closed
Err(OneClawError::Channel("Connection closed by remote".into()))

See Also

Build docs developers (and LLMs) love