Adapter Architecture
Every channel adapter is a TypeScript worker that:- Registers webhook functions to receive platform events
- Resolves agents based on platform identifiers
- Triggers agent chat with the incoming message
- Sends responses back through the platform API
- Emits audit events for security tracking
Basic Template
Here’s a minimal channel adapter structure:import { init } from "iii-sdk";
import { ENGINE_URL, createSecretGetter } from "../shared/config.js";
import { splitMessage, resolveAgent } from "../shared/utils.js";
const { registerFunction, registerTrigger, trigger, triggerVoid } = init(
ENGINE_URL,
{ workerName: "channel-myplatform" }
);
const getSecret = createSecretGetter(trigger);
// 1. Register webhook handler
registerFunction(
{
id: "channel::myplatform::webhook",
description: "Handle MyPlatform webhook events"
},
async (req) => {
const body = req.body || req;
// 2. Extract message and sender
const { senderId, message, conversationId } = body;
if (!message) return { status_code: 200, body: { ok: true } };
// 3. Resolve agent for this conversation
const agentId = await resolveAgent(
trigger,
"myplatform",
conversationId
);
// 4. Trigger agent chat
const response: any = await trigger("agent::chat", {
agentId,
message,
sessionId: `myplatform:${conversationId}`
});
// 5. Send response
await sendMessage(conversationId, response.content);
// 6. Audit log
triggerVoid("security::audit", {
type: "channel_message",
agentId,
detail: { channel: "myplatform", conversationId, senderId }
});
return { status_code: 200, body: { ok: true } };
}
);
// 7. Register HTTP trigger
registerTrigger({
type: "http",
function_id: "channel::myplatform::webhook",
config: { api_path: "webhook/myplatform", http_method: "POST" }
});
// 8. Send message to platform API
async function sendMessage(conversationId: string, text: string) {
const token = await getSecret("MYPLATFORM_TOKEN");
if (!token) throw new Error("MYPLATFORM_TOKEN not configured");
const chunks = splitMessage(text, 2000); // Platform's char limit
for (const chunk of chunks) {
await fetch("https://api.myplatform.com/v1/messages", {
method: "POST",
headers: {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
conversation_id: conversationId,
text: chunk
})
});
}
}
Real-World Examples
Let’s examine actual channel adapters from AgentOS:Example 1: Telegram (Simple)
Source:src/channels/telegram.ts:1
registerFunction(
{ id: "channel::telegram::webhook" },
async (req) => {
// Verify webhook signature
const secretToken = await getSecret("TELEGRAM_SECRET_TOKEN");
if (!verifyTelegramUpdate(secretToken, req)) {
return { status_code: 401, body: { error: "Invalid signature" } };
}
const update = req.body || req;
const message = update.message || update.edited_message;
if (!message?.text) return { status_code: 200, body: { ok: true } };
const chatId = message.chat.id;
const agentId = await resolveAgent(trigger, "telegram", String(chatId));
const response: any = await trigger("agent::chat", {
agentId,
message: message.text,
sessionId: `telegram:${chatId}`
});
await sendMessage(chatId, response.content);
triggerVoid("security::audit", {
type: "channel_message",
agentId,
detail: { channel: "telegram", chatId, userId: message.from?.id }
});
return { status_code: 200, body: { ok: true } };
}
);
async function sendMessage(chatId: number, text: string) {
const botToken = await getSecret("TELEGRAM_BOT_TOKEN");
const chunks = splitMessage(text, 4096);
for (const chunk of chunks) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 10_000);
try {
const res = await fetch(
`https://api.telegram.org/bot${botToken}/sendMessage`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
chat_id: chatId,
text: chunk,
parse_mode: "Markdown"
}),
signal: controller.signal
}
);
if (!res.ok) {
throw new Error(`Telegram send failed (${res.status})`);
}
} finally {
clearTimeout(timeoutId);
}
}
}
- Webhook signature verification
- Message/edited_message handling
- Markdown formatting
- Request timeout (10s)
- Error handling
Example 2: Bluesky (Authentication)
Source:src/channels/bluesky.ts:1
let session: { accessJwt: string; did: string } | null = null;
registerFunction(
{ id: "channel::bluesky::webhook" },
async (req) => {
const { did, text, uri, cid } = req.body;
// Skip self-mentions
if (!text || did === session?.did) {
return { status_code: 200, body: { ok: true } };
}
const agentId = await resolveAgent(trigger, "bluesky", did);
const response: any = await trigger("agent::chat", {
agentId,
message: text,
sessionId: `bluesky:${did}`
});
await sendMessage(response.content, { uri, cid });
return { status_code: 200, body: { ok: true } };
}
);
async function authenticate() {
const handle = await getSecret("BLUESKY_HANDLE");
const password = await getSecret("BLUESKY_PASSWORD");
const res = await fetch(
"https://bsky.social/xrpc/com.atproto.server.createSession",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ identifier: handle, password })
}
);
session = await res.json() as { accessJwt: string; did: string };
}
async function sendMessage(
text: string,
parent?: { uri: string; cid: string }
) {
if (!session) await authenticate();
const chunks = splitMessage(text, 300);
for (const chunk of chunks) {
await fetch(
"https://bsky.social/xrpc/com.atproto.repo.createRecord",
{
method: "POST",
headers: {
"Authorization": `Bearer ${session!.accessJwt}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
repo: session!.did,
collection: "app.bsky.feed.post",
record: {
text: chunk,
createdAt: new Date().toISOString(),
...(parent ? { reply: { root: parent, parent } } : {})
}
})
}
);
}
}
- Session management
- Self-mention filtering
- Reply threading
- JWT authentication
Example 3: Slack (Signature Verification)
Source:src/channels/slack.ts:1
registerFunction(
{ id: "channel::slack::events" },
async (req) => {
const event = req.body || req;
// Handle URL verification challenge
if (event.type === "url_verification") {
return { status_code: 200, body: { challenge: event.challenge } };
}
// Verify request signature
const signingSecret = await getSecret("SLACK_SIGNING_SECRET");
try {
verifySlackSignature(req, signingSecret);
} catch (e: any) {
return { status_code: 401, body: { error: e.message } };
}
// Process message event
if (event.event?.type === "message" && !event.event.bot_id) {
const msg = event.event;
const agentId = await resolveAgent(trigger, "slack", msg.channel);
const response: any = await trigger("agent::chat", {
agentId,
message: msg.text,
sessionId: `slack:${msg.channel}:${msg.thread_ts || msg.ts}`
});
await sendMessage(
msg.channel,
response.content,
msg.thread_ts || msg.ts
);
}
return { status_code: 200, body: { ok: true } };
}
);
async function sendMessage(
channel: string,
text: string,
threadTs?: string
) {
const botToken = await getSecret("SLACK_BOT_TOKEN");
const chunks = splitMessage(text, 4000);
for (const chunk of chunks) {
await fetch("https://slack.com/api/chat.postMessage", {
method: "POST",
headers: {
"Authorization": `Bearer ${botToken}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
channel,
text: chunk,
...(threadTs ? { thread_ts: threadTs } : {})
})
});
}
}
- URL verification challenge
- HMAC signature verification
- Bot message filtering
- Thread support
Example 4: Generic Webhook
Source:src/channels/webhook.ts:1
registerFunction(
{ id: "channel::webhook::inbound" },
async (req) => {
const { body, query_params, path_params } = req;
const channelId = path_params?.channelId || query_params?.channel || "default";
const agentId = await resolveAgent(trigger, "webhook", channelId);
const message = body?.message || body?.text || body?.content ||
"[Unrecognized webhook payload]";
const response: any = await trigger("agent::chat", {
agentId,
message,
sessionId: `webhook:${channelId}`
});
// Optional callback URL
const callbackUrl = body?.callback_url || body?.response_url;
if (callbackUrl) {
await assertNoSsrf(callbackUrl); // Prevent SSRF attacks
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 30_000);
try {
await fetch(callbackUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: response.content }),
signal: controller.signal
});
} catch {
// Ignore callback failures
} finally {
clearTimeout(timer);
}
}
return { status_code: 200, body: { response: response.content } };
}
);
registerTrigger({
type: "http",
function_id: "channel::webhook::inbound",
config: { api_path: "webhook/:channelId", http_method: "POST" }
});
- Dynamic channel ID from URL path
- Flexible payload parsing
- SSRF protection
- Optional callback URL
- Timeout handling
Common Patterns
Pattern 1: Signature Verification
Many platforms require webhook signature verification:import crypto from "crypto";
function verifyWebhookSignature(
req: any,
secret: string,
headerName: string = "X-Signature"
): boolean {
const signature = req.headers[headerName];
const body = JSON.stringify(req.body);
const hmac = crypto.createHmac("sha256", secret);
hmac.update(body);
const expectedSignature = hmac.digest("hex");
// Timing-safe comparison
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
Pattern 2: OAuth2 Token Refresh
Platforms using OAuth2 need token refresh logic:let accessToken = "";
let tokenExpiry = 0;
async function getAccessToken(): Promise<string> {
if (accessToken && Date.now() < tokenExpiry) {
return accessToken;
}
const clientId = await getSecret("PLATFORM_CLIENT_ID");
const clientSecret = await getSecret("PLATFORM_CLIENT_SECRET");
const refreshToken = await getSecret("PLATFORM_REFRESH_TOKEN");
const res = await fetch("https://platform.com/oauth/token", {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "refresh_token",
client_id: clientId,
client_secret: clientSecret,
refresh_token: refreshToken
})
});
const data = await res.json() as {
access_token: string;
expires_in: number;
};
accessToken = data.access_token;
tokenExpiry = Date.now() + (data.expires_in * 1000) - 60_000; // 1min buffer
return accessToken;
}
Pattern 3: Message Chunking
Handle platform character limits:import { splitMessage } from "../shared/utils.js";
async function sendLongMessage(
recipientId: string,
text: string,
limit: number
) {
const chunks = splitMessage(text, limit);
for (const chunk of chunks) {
await sendChunk(recipientId, chunk);
// Optional: Rate limit delay between chunks
await new Promise(resolve => setTimeout(resolve, 100));
}
}
Pattern 4: Self-Message Filtering
Avoid infinite loops by ignoring bot’s own messages:let botUserId: string | null = null;
async function getBotUserId(): Promise<string> {
if (botUserId) return botUserId;
const token = await getSecret("PLATFORM_TOKEN");
const res = await fetch("https://api.platform.com/me", {
headers: { Authorization: `Bearer ${token}` }
});
const data = await res.json();
botUserId = data.id;
return botUserId;
}
// In webhook handler:
if (message.from.id === await getBotUserId()) {
return { status_code: 200, body: { ok: true } };
}
Pattern 5: Retry Logic
async function sendWithRetry(
url: string,
options: RequestInit,
maxRetries = 3
) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const res = await fetch(url, options);
if (res.ok) return res;
if (res.status >= 500) {
// Retry on server errors
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt) * 1000)
);
continue;
}
// Don't retry client errors
throw new Error(`Request failed: ${res.status}`);
} catch (error) {
if (attempt === maxRetries - 1) throw error;
}
}
}
Shared Utilities
AgentOS provides shared utilities insrc/shared/utils.js:
resolveAgent()
export async function resolveAgent(
trigger: TriggerFunction,
channel: string,
platformId: string
): Promise<string> {
const result = await trigger("state::get", {
scope: "channel_agents",
key: `${channel}:${platformId}`
});
return result?.value?.agentId || "default";
}
splitMessage()
export function splitMessage(text: string, maxLength: number): string[] {
if (text.length <= maxLength) return [text];
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
chunks.push(remaining);
break;
}
// Find last space before limit
let splitIndex = remaining.lastIndexOf(" ", maxLength);
if (splitIndex === -1) splitIndex = maxLength;
chunks.push(remaining.slice(0, splitIndex));
remaining = remaining.slice(splitIndex).trim();
}
return chunks;
}
createSecretGetter()
export function createSecretGetter(trigger: TriggerFunction) {
return async (key: string): Promise<string | null> => {
const result = await trigger("vault::get", { key });
return result?.value || null;
};
}
Testing Your Adapter
1. Unit Tests
import { describe, it, expect, vi } from "vitest";
import { splitMessage } from "../shared/utils.js";
describe("channel::myplatform", () => {
it("splits long messages", () => {
const text = "a".repeat(5000);
const chunks = splitMessage(text, 2000);
expect(chunks.length).toBe(3);
expect(chunks[0].length).toBeLessThanOrEqual(2000);
});
it("handles webhook payload", async () => {
const req = {
body: {
senderId: "user123",
message: "Hello",
conversationId: "conv456"
}
};
// Mock trigger functions
const trigger = vi.fn().mockResolvedValue({
content: "Hi there!"
});
// Test webhook handler
// ...
});
});
2. Integration Tests
# Start AgentOS
agentos start
# Test webhook endpoint
curl -X POST http://localhost:3111/webhook/myplatform \
-H "Content-Type: application/json" \
-d '{
"senderId": "test-user",
"message": "test message",
"conversationId": "test-conv"
}'
# Check logs
agentos logs | grep myplatform
# Verify audit trail
agentos security audit --type channel_message | grep myplatform
3. CLI Testing
# Add your channel to CLI
agentos channel setup myplatform
# Test connection
agentos channel test myplatform
# Enable for production
agentos channel enable myplatform
Deployment Checklist
- Webhook signature verification implemented
- Rate limiting considered
- Message chunking for platform limits
- Error handling and retries
- Timeout protection (10-30s)
- Self-message filtering
- Audit events emitted
- Secrets stored in vault
- Unit tests written
- Integration tests passing
- Documentation added
Advanced: Multi-Step Workflows
Some platforms require multi-step setup (e.g., OAuth2 flow):registerFunction(
{ id: "channel::myplatform::oauth_callback" },
async (req) => {
const { code } = req.query_params;
// Exchange code for token
const res = await fetch("https://platform.com/oauth/token", {
method: "POST",
body: JSON.stringify({
code,
client_id: await getSecret("PLATFORM_CLIENT_ID"),
client_secret: await getSecret("PLATFORM_CLIENT_SECRET"),
grant_type: "authorization_code"
})
});
const { access_token, refresh_token } = await res.json();
// Store tokens in vault
await trigger("vault::set", {
key: "PLATFORM_ACCESS_TOKEN",
value: access_token
});
await trigger("vault::set", {
key: "PLATFORM_REFRESH_TOKEN",
value: refresh_token
});
return {
status_code: 200,
body: { success: true, message: "Authentication complete" }
};
}
);
registerTrigger({
type: "http",
function_id: "channel::myplatform::oauth_callback",
config: { api_path: "oauth/myplatform/callback", http_method: "GET" }
});
Example: Complete Custom Adapter
Let’s build a complete adapter for a fictional “ChatHub” platform:// src/channels/chathub.ts
import { init } from "iii-sdk";
import { ENGINE_URL, createSecretGetter } from "../shared/config.js";
import { splitMessage, resolveAgent } from "../shared/utils.js";
import crypto from "crypto";
const { registerFunction, registerTrigger, trigger, triggerVoid } = init(
ENGINE_URL,
{ workerName: "channel-chathub" }
);
const getSecret = createSecretGetter(trigger);
const API_URL = "https://api.chathub.example.com/v1";
// Webhook handler
registerFunction(
{
id: "channel::chathub::webhook",
description: "Handle ChatHub webhook events"
},
async (req) => {
// Verify signature
const signature = req.headers["X-ChatHub-Signature"];
const secret = await getSecret("CHATHUB_WEBHOOK_SECRET");
if (!verifySignature(req.body, signature, secret)) {
return { status_code: 401, body: { error: "Invalid signature" } };
}
const event = req.body;
// Handle different event types
if (event.type === "message.created") {
const { room_id, user_id, text } = event.data;
// Skip bot's own messages
const botUserId = await getSecret("CHATHUB_BOT_USER_ID");
if (user_id === botUserId) {
return { status_code: 200, body: { ok: true } };
}
// Resolve agent
const agentId = await resolveAgent(trigger, "chathub", room_id);
// Trigger agent chat
const response: any = await trigger("agent::chat", {
agentId,
message: text,
sessionId: `chathub:${room_id}`
});
// Send response
await sendMessage(room_id, response.content);
// Audit log
triggerVoid("security::audit", {
type: "channel_message",
agentId,
detail: { channel: "chathub", room_id, user_id }
});
}
return { status_code: 200, body: { ok: true } };
}
);
registerTrigger({
type: "http",
function_id: "channel::chathub::webhook",
config: { api_path: "webhook/chathub", http_method: "POST" }
});
// Helper: Send message to ChatHub
async function sendMessage(roomId: string, text: string) {
const token = await getSecret("CHATHUB_API_TOKEN");
if (!token) throw new Error("CHATHUB_API_TOKEN not configured");
const chunks = splitMessage(text, 5000); // ChatHub limit
for (const chunk of chunks) {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 15_000);
try {
const res = await fetch(`${API_URL}/rooms/${roomId}/messages`, {
method: "POST",
headers: {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json"
},
body: JSON.stringify({ text: chunk }),
signal: controller.signal
});
if (!res.ok) {
const error = await res.text();
throw new Error(`ChatHub API error (${res.status}): ${error}`);
}
} finally {
clearTimeout(timer);
}
}
}
// Helper: Verify webhook signature
function verifySignature(
payload: any,
signature: string,
secret: string
): boolean {
const hmac = crypto.createHmac("sha256", secret);
hmac.update(JSON.stringify(payload));
const expected = hmac.digest("hex");
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
Next Steps
Messaging Platforms
See production examples: Telegram, Discord, Slack
Social Media
Learn from Bluesky, Reddit, Twitch adapters
Security
Implement signature verification and audit logging
API Reference
Full iii-sdk API documentation