Skip to main content

Overview

Adding a new channel to nanobot allows users to interact with the agent through additional chat platforms. Channels are the interface between nanobot and external messaging services.

Channel Architecture

All channels implement the BaseChannel abstract class, which provides a consistent interface for:
  1. Starting the channel (connecting to the platform)
  2. Receiving messages from the platform
  3. Sending messages to the platform
  4. Stopping the channel (cleanup and disconnect)

The BaseChannel Interface

from nanobot.channels.base import BaseChannel
from nanobot.bus.queue import MessageBus
from nanobot.bus.events import OutboundMessage

class BaseChannel(ABC):
    name: str = "base"  # Override with your channel name
    
    def __init__(self, config: Any, bus: MessageBus):
        self.config = config
        self.bus = bus
        self._running = False
    
    @abstractmethod
    async def start(self) -> None:
        """Connect to the platform and start listening."""
    
    @abstractmethod
    async def send(self, msg: OutboundMessage) -> None:
        """Send a message to the platform."""
    
    @abstractmethod
    async def stop(self) -> None:
        """Clean up and disconnect."""

Step-by-Step Guide

Step 1: Create the Channel File

Create a new file in nanobot/channels/, e.g., mychannel.py.

Step 2: Define the Config Schema

Edit nanobot/config/schema.py and add a configuration class:
class MyChannelConfig(Base):
    """MyChannel configuration."""
    
    enabled: bool = False
    api_key: str = ""  # Or token, credentials, etc.
    api_secret: str = ""
    allow_from: list[str] = Field(default_factory=list)
    # Add other platform-specific settings
Then add it to ChannelsConfig:
class ChannelsConfig(Base):
    telegram: TelegramConfig = TelegramConfig()
    discord: DiscordConfig = DiscordConfig()
    mychannel: MyChannelConfig = MyChannelConfig()  # Add this
    # ...

Step 3: Implement the Channel Class

from loguru import logger
from nanobot.channels.base import BaseChannel
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import MyChannelConfig

class MyChannel(BaseChannel):
    """MyChannel integration."""
    
    name = "mychannel"
    
    def __init__(self, config: MyChannelConfig, bus: MessageBus):
        super().__init__(config, bus)
        self.config: MyChannelConfig = config
        # Initialize platform client/connection
        self.client = None
    
    async def start(self) -> None:
        """Connect to MyChannel platform."""
        if not self.config.api_key:
            logger.error("MyChannel API key not configured")
            return
        
        self._running = True
        logger.info("Starting MyChannel...")
        
        # 1. Initialize the platform client
        self.client = MyChannelClient(
            api_key=self.config.api_key,
            api_secret=self.config.api_secret
        )
        
        # 2. Connect to the platform
        await self.client.connect()
        
        # 3. Register event handlers
        self.client.on_message(self._on_message)
        
        # 4. Start listening (blocking or non-blocking)
        await self.client.listen()
        
        logger.info("MyChannel connected")
    
    async def stop(self) -> None:
        """Disconnect from MyChannel."""
        self._running = False
        if self.client:
            await self.client.disconnect()
            logger.info("MyChannel disconnected")
    
    async def send(self, msg: OutboundMessage) -> None:
        """Send a message via MyChannel."""
        if not self.client:
            logger.warning("MyChannel client not initialized")
            return
        
        try:
            # Send text message
            if msg.content:
                await self.client.send_message(
                    chat_id=msg.chat_id,
                    text=msg.content
                )
            
            # Send media files
            for media_path in (msg.media or []):
                await self.client.send_file(
                    chat_id=msg.chat_id,
                    file_path=media_path
                )
        
        except Exception as e:
            logger.error("Failed to send message: {}", e)
    
    async def _on_message(self, event) -> None:
        """Handle incoming messages from the platform."""
        # Extract message data from platform event
        sender_id = str(event.user_id)
        chat_id = str(event.chat_id)
        content = event.text or ""
        
        # Check access control
        if not self.is_allowed(sender_id):
            logger.warning("Access denied for {}", sender_id)
            return
        
        # Forward to the message bus
        await self._handle_message(
            sender_id=sender_id,
            chat_id=chat_id,
            content=content,
            media=[],
            metadata={
                "user_id": event.user_id,
                "username": event.username,
            }
        )

Step 4: Register the Channel

Edit nanobot/channels/manager.py to register your channel:
from nanobot.channels.mychannel import MyChannel

class ChannelManager:
    async def start_all(self, config: ChannelsConfig) -> None:
        # ... existing channels ...
        
        if config.mychannel.enabled:
            channel = MyChannel(config.mychannel, self.bus)
            self.channels.append(channel)
            self.tasks.append(asyncio.create_task(channel.start()))

Step 5: Test Your Channel

  1. Configure in ~/.nanobot/config.json:
{
  "channels": {
    "mychannel": {
      "enabled": true,
      "apiKey": "your-api-key",
      "apiSecret": "your-api-secret",
      "allowFrom": ["your-user-id"]
    }
  }
}
  1. Run the gateway:
nanobot gateway
  1. Send a test message from the platform.
  2. Verify nanobot responds.

Real-World Example: Telegram Channel

Let’s examine the Telegram channel implementation as a reference.

Key Features

  1. Long polling (no webhook needed)
  2. Media handling (photos, voice, documents)
  3. Typing indicators
  4. Command handling (/start, /new, /help)
  5. Media groups (multiple images in one message)
  6. Voice transcription (via Groq Whisper)

Code Structure

class TelegramChannel(BaseChannel):
    name = "telegram"
    
    def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""):
        super().__init__(config, bus)
        self.config: TelegramConfig = config
        self.groq_api_key = groq_api_key
        self._app: Application | None = None
        self._typing_tasks: dict[str, asyncio.Task] = {}
    
    async def start(self) -> None:
        """Start the Telegram bot with long polling."""
        # Build application
        self._app = Application.builder().token(self.config.token).build()
        
        # Register handlers
        self._app.add_handler(CommandHandler("start", self._on_start))
        self._app.add_handler(CommandHandler("new", self._forward_command))
        self._app.add_handler(MessageHandler(
            filters.TEXT | filters.PHOTO | filters.VOICE,
            self._on_message
        ))
        
        # Start polling
        await self._app.initialize()
        await self._app.start()
        await self._app.updater.start_polling()
        
        # Keep running
        while self._running:
            await asyncio.sleep(1)
    
    async def send(self, msg: OutboundMessage) -> None:
        """Send message via Telegram."""
        chat_id = int(msg.chat_id)
        
        # Stop typing indicator
        if not msg.metadata.get("_progress", False):
            self._stop_typing(msg.chat_id)
        
        # Send media files
        for media_path in (msg.media or []):
            with open(media_path, 'rb') as f:
                await self._app.bot.send_photo(chat_id=chat_id, photo=f)
        
        # Send text content
        if msg.content:
            html = _markdown_to_telegram_html(msg.content)
            await self._app.bot.send_message(
                chat_id=chat_id,
                text=html,
                parse_mode="HTML"
            )
    
    async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
        """Handle incoming messages."""
        message = update.message
        user = update.effective_user
        
        sender_id = str(user.id)
        chat_id = str(message.chat_id)
        content = message.text or message.caption or ""
        
        # Handle media
        media_paths = []
        if message.photo:
            file = await self._app.bot.get_file(message.photo[-1].file_id)
            file_path = await file.download_to_drive()
            media_paths.append(str(file_path))
        
        # Start typing indicator
        self._start_typing(chat_id)
        
        # Forward to bus
        await self._handle_message(
            sender_id=sender_id,
            chat_id=chat_id,
            content=content,
            media=media_paths,
            metadata={"message_id": message.message_id}
        )

Typing Indicator Implementation

def _start_typing(self, chat_id: str) -> None:
    """Start sending 'typing...' indicator."""
    self._stop_typing(chat_id)
    self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))

def _stop_typing(self, chat_id: str) -> None:
    """Stop the typing indicator."""
    task = self._typing_tasks.pop(chat_id, None)
    if task and not task.done():
        task.cancel()

async def _typing_loop(self, chat_id: str) -> None:
    """Repeatedly send 'typing' action."""
    try:
        while self._app:
            await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
            await asyncio.sleep(4)
    except asyncio.CancelledError:
        pass

Common Patterns

1. Access Control

Use the built-in is_allowed() method:
if not self.is_allowed(sender_id):
    logger.warning("Access denied for {}", sender_id)
    return

2. Media Handling

Download media to ~/.nanobot/media/:
from pathlib import Path

media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)

file_path = media_dir / f"{file_id}.jpg"
await download_file(url, file_path)

media_paths.append(str(file_path))

3. Voice Transcription

Use Groq for voice-to-text:
from nanobot.providers.transcription import GroqTranscriptionProvider

transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
transcription = await transcriber.transcribe(file_path)

if transcription:
    content = f"[transcription: {transcription}]"

4. Long Polling vs WebSocket

Long Polling (Telegram):
await self._app.updater.start_polling()
while self._running:
    await asyncio.sleep(1)
WebSocket (Discord):
await self.client.connect(ws_url)
while self._running:
    event = await self.client.receive()
    await self._handle_event(event)

5. Message Formatting

Convert markdown to platform-specific format:
def _markdown_to_platform_format(text: str) -> str:
    # Bold
    text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
    # Italic
    text = re.sub(r'_(.+?)_', r'<i>\1</i>', text)
    # Code blocks
    text = re.sub(r'```([\w]*)\n?([\s\S]*?)```', r'<pre>\2</pre>', text)
    return text

6. Group/Thread Support

For platforms with threads (Discord, Slack):
session_key = f"{chat_id}:{thread_id}"  # Unique per thread

await self._handle_message(
    sender_id=sender_id,
    chat_id=chat_id,
    content=content,
    session_key=session_key  # Isolate thread conversations
)

7. Mention Detection

For group chats, respond only when mentioned:
if chat_type == "group":
    if self.config.group_policy == "mention":
        if not self._is_mentioned(message):
            return  # Ignore non-mentions

await self._handle_message(...)

8. Error Handling

Log errors and continue:
try:
    await self.client.send_message(chat_id, content)
except RateLimitError:
    logger.warning("Rate limited, waiting...")
    await asyncio.sleep(5)
    await self.client.send_message(chat_id, content)
except Exception as e:
    logger.error("Send failed: {}", e)
    # Don't crash the channel

Platform-Specific Considerations

Telegram

  • Uses long polling (no public IP needed)
  • Supports media groups (multiple images)
  • Has command menu (BotFather registration)
  • Requires HTML escaping for special characters

Discord

  • Uses WebSocket gateway
  • Requires intents (Message Content intent)
  • Has typing indicators and reactions
  • Supports threads (session isolation)

WhatsApp

  • Requires bridge server (Node.js)
  • Uses QR code login
  • Media sent as base64 or URLs
  • No group support (DMs only)

Slack

  • Uses Socket Mode (no public URL)
  • Requires app-level token
  • Supports threads and reactions
  • Uses mrkdwn formatting (not markdown)

Email

  • Uses IMAP (receive) and SMTP (send)
  • Long poll interval (30s default)
  • Requires consent flag (mailbox access)
  • Supports attachments and HTML emails

Feishu/DingTalk/QQ

  • Use WebSocket long connection
  • No public IP required
  • Require app registration on platform
  • Use open_id / staff_id for users

Testing Checklist

Basic Functionality

  • Channel connects successfully
  • Receives text messages
  • Sends text responses
  • Access control works (allowFrom)
  • Channel disconnects cleanly

Media Support

  • Receives images
  • Sends images
  • Receives voice/audio (if applicable)
  • Transcribes voice (if Groq configured)
  • Receives documents/files

Advanced Features

  • Typing indicators (if applicable)
  • Reactions (if applicable)
  • Thread support (if applicable)
  • Group chat support
  • Mention detection (groups)
  • Command handling (/new, /help)

Error Handling

  • Handles network disconnects
  • Recovers from rate limits
  • Logs errors without crashing
  • Invalid chat_id handled gracefully

Configuration

  • Config schema documented
  • Required fields validated
  • Optional fields have defaults
  • README updated with setup instructions

Debugging Tips

Enable Debug Logging

from loguru import logger

logger.debug("Received message: {}", event)
logger.debug("Sending to chat_id: {}", chat_id)

Inspect Message Bus Events

logger.info("Publishing inbound: channel={}, sender={}, content={}",
            msg.channel, msg.sender_id, msg.content[:50])

Test with --logs Flag

nanobot gateway --logs
Shows detailed runtime logs.

Use Simple Test Message

await self._handle_message(
    sender_id="test_user",
    chat_id="test_chat",
    content="Hello from test",
)

Performance Considerations

Connection Pooling

Reuse HTTP connections:
self.session = aiohttp.ClientSession()

# Use self.session for all requests
await self.session.post(url, json=data)

Rate Limiting

Implement backoff:
import asyncio

max_retries = 3
for attempt in range(max_retries):
    try:
        await self.client.send(message)
        break
    except RateLimitError:
        await asyncio.sleep(2 ** attempt)

Message Batching

Batch multiple media files:
if len(msg.media) > 1:
    await self.client.send_media_group(
        chat_id=chat_id,
        media=msg.media
    )
else:
    for media in msg.media:
        await self.client.send_media(chat_id, media)

Security Best Practices

Validate Input

if not chat_id or not chat_id.isdigit():
    logger.error("Invalid chat_id: {}", chat_id)
    return

Sanitize Content

def _escape_html(text: str) -> str:
    return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")

Protect API Credentials

# Never log API keys
logger.debug("Connecting with token: ***")

# Store in config, not code
api_key = self.config.api_key

Implement Access Control

if not self.is_allowed(sender_id):
    # Don't reveal channel exists
    return  # Silent ignore

Contributing Your Channel

When submitting a PR:
  1. Implement all BaseChannel methods
  2. Add config schema to schema.py
  3. Register in ChannelManager
  4. Add README section with setup instructions
  5. Test thoroughly (see checklist above)
  6. Include example config in PR description
  7. Document special requirements (API keys, permissions, etc.)
See the Contributing Guide for more details.

Build docs developers (and LLMs) love