Skip to main content

Overview

OneClaw provides four built-in channel implementations covering common I/O scenarios: interactive CLI, lightweight TCP sockets, Telegram bot integration, and MQTT pub/sub for IoT. Location: crates/oneclaw-channels/src/

CliChannel

Interactive command-line channel for terminal-based I/O. Always included and the simplest channel for testing and direct interaction. Location: crates/oneclaw-channels/src/cli.rs

Struct

pub struct CliChannel {
    prompt: String,
}

Constructor

new()

Creates a CLI channel with default prompt "oneclaw> ".
pub fn new() -> Self

with_prompt()

Creates a CLI channel with a custom prompt.
pub fn with_prompt(prompt: impl Into<String>) -> Self
Example:
let cli = CliChannel::with_prompt("ai> ");

Behavior

  • receive(): Displays prompt, reads line from stdin, returns trimmed input as IncomingMessage
    • Returns Ok(None) for empty lines
    • Returns Err(OneClawError::Channel("EOF")) on EOF (signals loop to stop)
    • Blocks until user input is available (async-friendly blocking via tokio)
  • send(): Writes message content to stdout with newline
  • name(): Returns "cli"

Usage

use oneclaw_channels::CliChannel;

let mut manager = ChannelManager::new();
manager.add_channel(Box::new(CliChannel::new()));

// Typical interaction:
// User types: "what is the weather?"
// receive() returns IncomingMessage { source: "cli", content: "what is the weather?", ... }
// send() outputs: "The weather is sunny."

Thread Safety

CLI channel reads/writes are serialized via tokio async I/O. Safe for concurrent use, but only one instance should be active per process (multiple CLI channels would interleave stdin/stdout).

TcpChannel

Line-based TCP socket channel for lightweight IoT sensor communication. Location: crates/oneclaw-channels/src/tcp.rs

Struct

pub struct TcpChannel {
    listener: TcpListener,
    client: Mutex<Option<TcpStream>>,
    buffer: Mutex<Vec<String>>,
    port: u16,
}

Constructor

new()

Creates a TCP channel listening on the given address.
pub async fn new(bind_addr: &str) -> Result<Self>
Parameters:
  • bind_addr - Address to bind (e.g., "0.0.0.0:8080", "127.0.0.1:0" for random port)
Returns: Result<TcpChannel> Example:
let tcp = TcpChannel::new("0.0.0.0:8080").await?;
info!("TCP channel listening on port {}", tcp.port());

Methods

port()

Returns the port this channel is listening on.
pub fn port(&self) -> u16

Protocol

Line-based text (UTF-8):
  • Each line = one message
  • Newline (\n) terminates each message
  • Simple protocol suitable for IoT sensors and embedded devices
Example Communication:
# Client connects and sends:
temp:23.5
humidity:60

# Server responds:
ok
ok

Behavior

  • receive(): Non-blocking receive with short timeouts
    1. Checks internal buffer first (for multi-line bursts)
    2. Tries to accept new TCP connection (1ms timeout)
    3. Reads available lines from current client (10ms timeout per line)
    4. Returns first line as IncomingMessage, buffers remainder
    5. Returns Ok(None) if no data available
  • send(): Writes message to currently connected client
    • Appends newline (\n) automatically
    • If client disconnected, logs warning and clears client (does not error)
    • If no client connected, message is dropped (logged at debug level)
  • name(): Returns "tcp"
  • source format: "tcp:{port}" (e.g., "tcp:8080")

Single Client Mode

TcpChannel handles one client at a time. If a new client connects, the previous connection is replaced. This design is suitable for simple IoT scenarios where sensors connect, send data, and disconnect.

Usage

use oneclaw_channels::TcpChannel;

let mut manager = ChannelManager::new();
let tcp = TcpChannel::new("0.0.0.0:8080").await?;
manager.add_channel(Box::new(tcp));

// IoT sensor connects via TCP:
// $ echo "temperature:23.5" | nc localhost 8080
// receive() returns IncomingMessage { source: "tcp:8080", content: "temperature:23.5", ... }

Thread Safety

Internal state (client, buffer) is protected by Mutex. Safe for concurrent polling from single async task (as done by ChannelManager).

TelegramChannel

Telegram Bot API channel using long-polling for message reception and sendMessage API for responses. Location: crates/oneclaw-channels/src/telegram.rs

Struct

pub struct TelegramChannel {
    client: reqwest::Client,
    bot_token: String,
    whitelist: HashSet<i64>,
    last_update_id: Mutex<Option<i64>>,
    buffer: Mutex<Vec<IncomingMessage>>,
    last_chat_id: Mutex<Option<i64>>,
    polling_timeout: u64,
}

Constructor

new()

Creates a Telegram channel. No network call — connection is lazy (verified on first poll).
pub fn new(bot_token: &str, allowed_chat_ids: &[i64], polling_timeout: u64) -> Self
Parameters:
  • bot_token - Telegram Bot API token (format: "123456:ABC-DEF1234...")
  • allowed_chat_ids - Whitelist of allowed chat IDs (empty = allow all)
  • polling_timeout - Long-polling timeout in seconds (recommended: 30)
Whitelist: If non-empty, only messages from listed chat IDs are accepted. Messages from other chats are ignored (logged at debug level). Example:
let telegram = TelegramChannel::new(
    &bot_token,
    &[12345, 67890], // Only accept from these chat IDs
    30,              // 30-second long-polling
);

Methods

verify_token()

Verifies bot token by calling Telegram getMe API. Returns bot username on success.
pub async fn verify_token(&self) -> Result<String>
Example:
match telegram.verify_token().await {
    Ok(username) => info!("Telegram bot @{} ready", username),
    Err(e) => error!("Invalid Telegram token: {}", e),
}

Behavior

  • receive(): Long-polling via getUpdates API
    1. Checks internal buffer first (for multi-message updates)
    2. Calls Telegram getUpdates with offset (acknowledges previous updates)
    3. Filters messages by whitelist (if configured)
    4. Tracks last_chat_id for reply routing
    5. Returns first message, buffers remainder
    6. Returns Ok(None) if no messages (long-poll timed out)
  • send(): Sends message via sendMessage API
    • Auto-splits messages >4000 chars at newline boundaries
    • Routes to chat_id parsed from destination (format: "telegram:{chat_id}")
    • Falls back to last_chat_id if destination is not telegram-specific
    • If no chat_id available, message is dropped (logged at debug level)
  • name(): Returns "telegram"
  • source format: "telegram:{chat_id}" (e.g., "telegram:12345")

Message Auto-Splitting

Telegram has a 4096-character message limit. TelegramChannel automatically splits long messages:
  • Splits at newline boundaries when possible
  • If a single line exceeds 4000 chars, hard-cuts at 4000
  • Each part is sent sequentially as a separate message

Usage

use oneclaw_channels::TelegramChannel;

let telegram = TelegramChannel::new(&bot_token, &[], 30);
telegram.verify_token().await?;

let mut manager = ChannelManager::new();
manager.add_channel(Box::new(telegram));

// User sends message via Telegram:
// receive() returns IncomingMessage { source: "telegram:12345", content: "Hello bot", ... }
// send() routes response back to chat_id 12345

Standalone Alert Function

send_telegram_alert()

Sends a one-shot alert message without a full TelegramChannel instance. Useful for event handlers or pipelines.
pub async fn send_telegram_alert(
    bot_token: &str,
    chat_id: i64,
    message: &str,
) -> Result<()>
Example:
use oneclaw_channels::send_telegram_alert;

send_telegram_alert(
    &bot_token,
    12345,
    "Alert: Temperature threshold exceeded!",
).await?;

Thread Safety

Internal state (last_update_id, buffer, last_chat_id) is protected by Mutex. Safe for concurrent use.

MqttChannel

MQTT pub/sub channel for IoT sensor communication via MQTT broker (uses rumqttc async client). Location: crates/oneclaw-channels/src/mqtt.rs

Struct

pub struct MqttChannel {
    client: AsyncClient,
    eventloop: Mutex<rumqttc::EventLoop>,
    buffer: Mutex<Vec<IncomingMessage>>,
    publish_prefix: String,
}

Constructor

new()

Creates and connects MQTT channel, subscribing to given topics.
pub async fn new(
    host: &str,
    port: u16,
    client_id: &str,
    subscribe_topics: &[String],
    publish_prefix: String,
    username: Option<&str>,
    password: Option<&str>,
    keep_alive_secs: u64,
) -> Result<Self>
Parameters:
  • host - MQTT broker host (e.g., "mqtt.example.com")
  • port - MQTT broker port (e.g., 1883 for plain, 8883 for TLS)
  • client_id - Unique client identifier
  • subscribe_topics - Topics to subscribe to (e.g., ["sensors/+/temp", "sensors/+/humidity"])
  • publish_prefix - Prefix for outgoing messages (e.g., "oneclaw/alerts")
  • username / password - Optional broker authentication
  • keep_alive_secs - Keep-alive interval (recommended: 60)
Example:
let mqtt = MqttChannel::new(
    "mqtt.example.com",
    1883,
    "oneclaw-instance-1",
    &["sensors/+/temp".to_string(), "sensors/+/humidity".to_string()],
    "oneclaw/alerts".to_string(),
    Some("user"),
    Some("pass"),
    60,
).await?;

from_config()

Creates MQTT channel from MqttConfig struct.
pub async fn from_config(config: &oneclaw_core::config::MqttConfig) -> Result<Self>
Example:
let mqtt = MqttChannel::from_config(&config.mqtt).await?;

Methods

clone_client()

Returns a clone of the AsyncClient for independent publishing (e.g., alert dispatch from event handlers).
pub fn clone_client(&self) -> AsyncClient
Usage: AsyncClient is cheap to clone — it’s just a channel sender handle. Use this for parallel alert publishing from pipelines or event handlers without blocking the main channel loop. Example:
let mqtt_client = mqtt_channel.clone_client();

// In event handler:
mqtt_client.publish(
    "oneclaw/alerts",
    rumqttc::QoS::AtLeastOnce,
    false,
    b"Threshold exceeded",
).await?;

Behavior

  • receive(): Polls MQTT event loop for incoming publish packets
    1. Checks internal buffer first (for multi-message batches)
    2. Polls event loop with 100ms timeout
    3. Extracts topic and payload from Publish packets
    4. Skips empty payloads
    5. Buffers messages for sequential delivery
    6. Returns Ok(None) if no messages (timeout or no publish events)
  • send(): Publishes message to MQTT topic
    • Routing:
      • destination == "" or "mqtt" → use publish_prefix
      • destination.starts_with("mqtt:") → extract topic (e.g., "mqtt:sensors/response""sensors/response")
      • Otherwise → "{publish_prefix}/{destination}"
    • QoS: AtLeastOnce (reliable delivery)
    • Retain: false (messages not retained by broker)
  • name(): Returns "mqtt"
  • source format: "mqtt:{topic}" (e.g., "mqtt:sensors/garage/temp")

Topic Routing Examples

Incoming:
// Subscribed to "sensors/+/temp"
// Sensor publishes to "sensors/garage/temp"
// receive() returns IncomingMessage { source: "mqtt:sensors/garage/temp", content: "23.5", ... }
Outgoing:
// publish_prefix = "oneclaw/alerts"

// destination = "" → publish to "oneclaw/alerts"
OutgoingMessage { destination: "".into(), content: "Alert!".into() }

// destination = "mqtt:sensors/garage/control" → publish to "sensors/garage/control"
OutgoingMessage { destination: "mqtt:sensors/garage/control".into(), content: "ON".into() }

// destination = "threshold" → publish to "oneclaw/alerts/threshold"
OutgoingMessage { destination: "threshold".into(), content: "Exceeded".into() }

Usage

use oneclaw_channels::MqttChannel;

let mqtt = MqttChannel::from_config(&config.mqtt).await?;

let mut manager = ChannelManager::new();
manager.add_channel(Box::new(mqtt));

// Sensor publishes to "sensors/garage/temp":
// receive() returns IncomingMessage { source: "mqtt:sensors/garage/temp", content: "23.5", ... }

Thread Safety

Internal state (eventloop, buffer) is protected by Mutex. AsyncClient is Clone and safe to share. The event loop must be polled from a single task (as done by ChannelManager).

Channel Comparison

ChannelUse CaseProtocolBlockingConnection Model
CliChannelInteractive terminal I/Ostdin/stdoutYes (user input)Always connected
TcpChannelLightweight IoT sensorsLine-based TCPNo (1-10ms timeouts)Single client
TelegramChannelBot messagingTelegram Bot APINo (long-polling)Many users (whitelist)
MqttChannelIoT pub/subMQTT 3.1.1No (100ms poll)Persistent broker

See Also

Build docs developers (and LLMs) love