Skip to main content

Overview

The ChannelManager multiplexes multiple Channel implementations into a single unified message stream. It uses sequential polling: channels are polled in registration order, and the first channel with a message wins. Location: crates/oneclaw-core/src/channel/manager.rs

ChannelManager Struct

pub struct ChannelManager {
    channels: Vec<Box<dyn Channel>>,
}
Design:
  • Sequential polling across registered channels
  • Each channel is polled in order until a message is found
  • If no channels have messages, returns None (caller should sleep briefly)
  • Supports sending to channels by index or name

Methods

new()

Creates a new empty channel manager.
pub fn new() -> Self
Returns: Empty ChannelManager with no registered channels Example:
let mut manager = ChannelManager::new();

add_channel()

Registers a channel with the manager.
pub fn add_channel(&mut self, channel: Box<dyn Channel>)
Parameters:
  • channel: Box<dyn Channel> - Boxed channel implementation to register
Behavior:
  • Channels are polled in registration order
  • Logs an info message when channel is registered
  • Channel name is obtained via channel.name()
Example:
use oneclaw_channels::CliChannel;
use oneclaw_core::channel::ChannelManager;

let mut manager = ChannelManager::new();
manager.add_channel(Box::new(CliChannel::new()));

receive_any()

Polls all channels sequentially and returns the first message found.
pub async fn receive_any(&self) -> Result<Option<(usize, IncomingMessage)>>
Returns:
  • Ok(Some((channel_idx, message))) - Message received from channel at index channel_idx
  • Ok(None) - No channel has a message ready (caller should sleep and retry)
  • Err(OneClawError) - Fatal error (rare; channel errors are logged and skipped)
Polling Strategy:
  1. Iterate through channels in registration order (index 0, 1, 2, …)
  2. Call channel.receive().await on each channel
  3. Return first Ok(Some(msg)) found along with channel index
  4. Skip channels that return Ok(None) or Err()
  5. If all channels return None or error, return Ok(None)
Error Handling:
  • Individual channel errors are logged (debug level) and skipped
  • Allows remaining channels to continue functioning
  • Only fatal errors propagate upward
Example:
loop {
    match manager.receive_any().await? {
        Some((idx, msg)) => {
            println!("Message from channel {}: {}", idx, msg.content);
            // Process message
        }
        None => {
            // No messages ready, sleep briefly
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }
}

send_to()

Sends a message via a specific channel index.
pub async fn send_to(&self, channel_idx: usize, msg: &OutgoingMessage) -> Result<()>
Parameters:
  • channel_idx: usize - Index of the channel (0-based, order of registration)
  • msg: &OutgoingMessage - Message to send
Returns:
  • Ok(()) - Message sent successfully
  • Err(OneClawError::Channel) - Channel index out of range or send failed
Example:
let response = OutgoingMessage {
    destination: msg.source.clone(),
    content: "Acknowledged".to_string(),
};
manager.send_to(channel_idx, &response).await?;

send_by_name()

Sends a message to a channel by name.
pub async fn send_by_name(&self, name: &str, msg: &OutgoingMessage) -> Result<()>
Parameters:
  • name: &str - Name of the channel (e.g., “cli”, “tcp”, “telegram”)
  • msg: &OutgoingMessage - Message to send
Returns:
  • Ok(()) - Message sent successfully
  • Err(OneClawError::Channel) - Channel not found or send failed
Example:
let alert = OutgoingMessage {
    destination: "telegram:12345".to_string(),
    content: "Alert: Threshold exceeded".to_string(),
};
manager.send_by_name("telegram", &alert).await?;

count()

Returns the number of registered channels.
pub fn count(&self) -> usize
Example:
info!("Managing {} channels", manager.count());

list()

Returns a list of all registered channel names.
pub fn list(&self) -> Vec<&str>
Returns: Vector of channel names in registration order Example:
let names = manager.list();
info!("Active channels: {:?}", names);
// Output: Active channels: ["cli", "tcp", "telegram", "mqtt"]

Usage Pattern

Basic Setup

use oneclaw_core::channel::ChannelManager;
use oneclaw_channels::{CliChannel, TcpChannel};

let mut manager = ChannelManager::new();

// Register channels in priority order
manager.add_channel(Box::new(CliChannel::new()));
manager.add_channel(Box::new(TcpChannel::new("0.0.0.0:8080").await?));

info!("Registered {} channels", manager.count());

Message Loop

use tokio::time::{sleep, Duration};

loop {
    match manager.receive_any().await {
        Ok(Some((channel_idx, incoming))) => {
            info!("Received from channel {}: {}", channel_idx, incoming.content);
            
            // Process message through OneClaw layers
            let response_text = process_message(&incoming.content).await?;
            
            // Send response back to same channel
            let response = OutgoingMessage {
                destination: incoming.source,
                content: response_text,
            };
            manager.send_to(channel_idx, &response).await?;
        }
        Ok(None) => {
            // No messages, sleep briefly to avoid busy-looping
            sleep(Duration::from_millis(100)).await;
        }
        Err(e) => {
            error!("Channel manager error: {}", e);
            break;
        }
    }
}

Multi-Channel Coordination

// Receive from any channel
let (idx, msg) = manager.receive_any().await?.unwrap();

// Route response based on source
let response = if msg.source.starts_with("telegram:") {
    // Send back via Telegram
    manager.send_by_name("telegram", &outgoing).await?
} else if msg.source.starts_with("mqtt:") {
    // Publish to MQTT topic
    manager.send_by_name("mqtt", &outgoing).await?
} else {
    // Send back to original channel
    manager.send_to(idx, &outgoing).await?
};

Sequential Polling Behavior

Order matters: Channels registered first have priority. If multiple channels have messages ready, the first one in registration order wins.
// CLI has priority (receives user input immediately)
manager.add_channel(Box::new(CliChannel::new()));

// TCP polled second (IoT sensor data)
manager.add_channel(Box::new(TcpChannel::new("0.0.0.0:8080").await?));

// Telegram polled third (bot messages)
manager.add_channel(Box::new(TelegramChannel::new(&token, &[], 30)));

// MQTT polled last (background sensor streams)
manager.add_channel(Box::new(MqttChannel::from_config(&mqtt_config).await?));
Performance: Sequential polling is simple and predictable. For high-throughput scenarios, keep the channel count low (typically 2-4 channels) and use short timeouts in channel implementations.

Error Handling

The ChannelManager is designed to be resilient:
  • Individual channel errors: Logged and skipped (other channels continue)
  • Channel index out of range: Returns Err(OneClawError::Channel)
  • Channel not found by name: Returns Err(OneClawError::Channel)
  • Fatal errors: Propagated upward (rare)
match manager.receive_any().await {
    Ok(Some((idx, msg))) => { /* process */ }
    Ok(None) => { /* no messages, continue polling */ }
    Err(e) => {
        error!("Fatal channel error: {}", e);
        // Typically shutdown or restart
    }
}

See Also

Build docs developers (and LLMs) love