Building Gateways
This guide teaches you how to build custom gateways that connect external platforms and interfaces to Solace Agent Mesh. Gateways handle the translation between platform-specific formats and the A2A protocol.Understanding Gateways
Gateways in Solace Agent Mesh:- Connect external platforms (Slack, REST APIs, custom UIs) to the agent mesh
- Translate platform-specific events into A2A tasks
- Stream agent responses back to the platform
- Handle authentication and authorization
- Manage platform-specific features (buttons, reactions, formatting)
Gateway Architecture
Gateways consist of two main components:- Gateway Adapter: Platform-specific logic (you implement this)
- Generic Gateway Component: A2A protocol handling (provided by SAM)
Creating a Gateway Adapter
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
GatewayContext,
SamTask,
SamTextPart,
SamFilePart,
SamUpdate,
ResponseContext,
AuthClaims,
)
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
import asyncio
import logging
log = logging.getLogger(__name__)
class MyGatewayConfig(BaseModel):
"""Configuration for my custom gateway."""
api_key: str = Field(description="API key for the platform")
webhook_url: str = Field(description="Webhook URL to listen on")
enable_notifications: bool = Field(default=True)
class MyGatewayAdapter(GatewayAdapter[dict, dict, MyGatewayConfig]):
"""Custom gateway adapter for MyPlatform."""
ConfigModel = MyGatewayConfig
def __init__(self):
super().__init__()
self.gateway_context: Optional[GatewayContext] = None
self.platform_client = None
async def init(self, context: GatewayContext) -> None:
"""Initialize the gateway adapter."""
self.gateway_context = context
config: MyGatewayConfig = context.adapter_config
log.info(f"Initializing MyGateway with webhook: {config.webhook_url}")
# Initialize your platform client
self.platform_client = MyPlatformClient(
api_key=config.api_key,
webhook_url=config.webhook_url
)
# Set up event listener
asyncio.create_task(self._listen_for_events())
log.info("MyGateway initialized successfully")
async def cleanup(self) -> None:
"""Clean up resources on shutdown."""
log.info("Shutting down MyGateway")
if self.platform_client:
await self.platform_client.close()
log.info("MyGateway shutdown complete")
async def prepare_task(
self,
external_input: dict,
endpoint_context: Optional[Dict[str, Any]] = None
) -> SamTask:
"""Prepare a SAM task from platform input.
Args:
external_input: The platform-specific event data
endpoint_context: Optional context about the endpoint
Returns:
A SamTask ready for agent processing
"""
# Extract platform-specific data
user_id = external_input.get("user_id")
message_text = external_input.get("text", "")
attachments = external_input.get("attachments", [])
# Build task parts
parts = []
# Add text content
if message_text:
parts.append(SamTextPart(text=message_text))
# Add file attachments
for attachment in attachments:
file_data = await self._download_attachment(attachment)
parts.append(SamFilePart(
filename=attachment["filename"],
mime_type=attachment["mime_type"],
data=file_data
))
# Create the task
task = SamTask(
task_id=external_input.get("message_id"),
user_id=user_id,
parts=parts,
platform_context={
"channel_id": external_input.get("channel_id"),
"thread_id": external_input.get("thread_id"),
}
)
return task
The
platform_context is preserved and passed back in ResponseContext so you know where to send responses.async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
"""Handle streaming text from the agent."""
# Get platform-specific context
channel_id = context.platform_context.get("channel_id")
thread_id = context.platform_context.get("thread_id")
# Send or update message on platform
if not hasattr(context, "platform_message_id"):
# First chunk - create new message
message_id = await self.platform_client.send_message(
channel_id=channel_id,
thread_id=thread_id,
text=text
)
context.platform_message_id = message_id
else:
# Subsequent chunks - update existing message
await self.platform_client.update_message(
message_id=context.platform_message_id,
text=text
)
async def handle_file(
self,
file_part: SamFilePart,
context: ResponseContext
) -> None:
"""Handle file artifacts from the agent."""
channel_id = context.platform_context.get("channel_id")
# Upload file to platform
await self.platform_client.upload_file(
channel_id=channel_id,
filename=file_part.filename,
file_data=file_part.data,
mime_type=file_part.mime_type
)
async def handle_status_update(
self,
status_text: str,
context: ResponseContext
) -> None:
"""Handle status/progress updates from the agent."""
channel_id = context.platform_context.get("channel_id")
# Show typing indicator or status message
await self.platform_client.set_status(
channel_id=channel_id,
status=status_text
)
async def handle_task_complete(self, context: ResponseContext) -> None:
"""Handle task completion."""
# Clear typing indicators, update UI, etc.
channel_id = context.platform_context.get("channel_id")
await self.platform_client.clear_status(channel_id)
async def handle_error(
self,
error: SamError,
context: ResponseContext
) -> None:
"""Handle errors from the agent."""
channel_id = context.platform_context.get("channel_id")
# Send error message to user
await self.platform_client.send_message(
channel_id=channel_id,
text=f"❌ Error: {error.message}"
)
async def extract_auth_claims(
self,
external_input: dict,
endpoint_context: Optional[Dict[str, Any]] = None
) -> Optional[AuthClaims]:
"""Extract authentication claims from platform input."""
user_id = external_input.get("user_id")
if not user_id:
return None
# Fetch user details from platform
user_info = await self.platform_client.get_user_info(user_id)
return AuthClaims(
user_id=user_id,
email=user_info.get("email"),
name=user_info.get("name"),
roles=user_info.get("roles", []),
metadata={
"platform_user_id": user_id,
"username": user_info.get("username")
}
)
async def _listen_for_events(self):
"""Listen for events from the platform and submit to SAM."""
log.info("Starting event listener")
async for event in self.platform_client.event_stream():
try:
# Only process message events
if event["type"] != "message":
continue
# Submit task to SAM
await self.gateway_context.submit_task(
external_input=event,
agent_name=None # Use default agent
)
except Exception as e:
log.error(f"Error processing event: {e}", exc_info=True)
Gateway Configuration
Create a YAML configuration file for your gateway:my_gateway.yaml
Complete Example: Simple HTTP Gateway
Here’s a complete example of a simple HTTP webhook gateway:simple_http_gateway.py
Advanced Features
Agent Registry Updates
Handle agent discovery events:Session Management
Manage user sessions:Testing Your Gateway
import pytest
from unittest.mock import Mock, AsyncMock
from my_gateway import MyGatewayAdapter
@pytest.mark.asyncio
async def test_prepare_task():
adapter = MyGatewayAdapter()
external_input = {
"user_id": "user123",
"text": "Hello, agent!",
"message_id": "msg456"
}
task = await adapter.prepare_task(external_input)
assert task.user_id == "user123"
assert len(task.parts) == 1
assert task.parts[0].text == "Hello, agent!"
Best Practices
Troubleshooting
Next Steps
- Slack Integration - Example of a production gateway
- Creating Custom Agents - Build agents for your gateway
- MCP Integration - Add MCP tools to agents
- Workflow Development - Orchestrate multi-agent workflows
Real-World Examples
Explore production gateway implementations:examples/gateways/slack_gateway_example.yaml- Full Slack integrationexamples/gateways/rest_gateway_example.yaml- REST API gatewayexamples/gateways/mcp_gateway_example.yaml- MCP server gatewayexamples/gateways/webui_gateway_example.yaml- Web UI gateway