Skip to main content

Building Gateways

This guide teaches you how to build custom gateways that connect external platforms and interfaces to Solace Agent Mesh. Gateways handle the translation between platform-specific formats and the A2A protocol.

Understanding Gateways

Gateways in Solace Agent Mesh:
  • Connect external platforms (Slack, REST APIs, custom UIs) to the agent mesh
  • Translate platform-specific events into A2A tasks
  • Stream agent responses back to the platform
  • Handle authentication and authorization
  • Manage platform-specific features (buttons, reactions, formatting)

Gateway Architecture

Gateways consist of two main components:
  1. Gateway Adapter: Platform-specific logic (you implement this)
  2. Generic Gateway Component: A2A protocol handling (provided by SAM)
┌─────────────────┐
│  External       │
│  Platform       │
│  (Slack, etc)   │
└────────┬────────┘


┌─────────────────┐
│  Gateway        │  ◄── You implement this
│  Adapter        │
└────────┬────────┘


┌─────────────────┐
│  Generic        │  ◄── Provided by SAM
│  Gateway        │
│  Component      │
└────────┬────────┘


┌─────────────────┐
│  A2A Protocol   │
│  (Agent Mesh)   │
└─────────────────┘

Creating a Gateway Adapter

1
Step 1: Define the Adapter Class
2
Create a Python class that extends GatewayAdapter:
3
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
    GatewayContext,
    SamTask,
    SamTextPart,
    SamFilePart,
    SamUpdate,
    ResponseContext,
    AuthClaims,
)
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
import asyncio
import logging

log = logging.getLogger(__name__)

class MyGatewayConfig(BaseModel):
    """Configuration for my custom gateway."""
    api_key: str = Field(description="API key for the platform")
    webhook_url: str = Field(description="Webhook URL to listen on")
    enable_notifications: bool = Field(default=True)

class MyGatewayAdapter(GatewayAdapter[dict, dict, MyGatewayConfig]):
    """Custom gateway adapter for MyPlatform."""
    
    ConfigModel = MyGatewayConfig
    
    def __init__(self):
        super().__init__()
        self.gateway_context: Optional[GatewayContext] = None
        self.platform_client = None
4
Step 2: Implement Lifecycle Methods
5
Handle initialization and cleanup:
6
async def init(self, context: GatewayContext) -> None:
    """Initialize the gateway adapter."""
    self.gateway_context = context
    config: MyGatewayConfig = context.adapter_config
    
    log.info(f"Initializing MyGateway with webhook: {config.webhook_url}")
    
    # Initialize your platform client
    self.platform_client = MyPlatformClient(
        api_key=config.api_key,
        webhook_url=config.webhook_url
    )
    
    # Set up event listener
    asyncio.create_task(self._listen_for_events())
    
    log.info("MyGateway initialized successfully")

async def cleanup(self) -> None:
    """Clean up resources on shutdown."""
    log.info("Shutting down MyGateway")
    
    if self.platform_client:
        await self.platform_client.close()
    
    log.info("MyGateway shutdown complete")
7
Step 3: Handle Inbound Messages
8
Convert platform events to SAM tasks:
9
async def prepare_task(
    self,
    external_input: dict,
    endpoint_context: Optional[Dict[str, Any]] = None
) -> SamTask:
    """Prepare a SAM task from platform input.
    
    Args:
        external_input: The platform-specific event data
        endpoint_context: Optional context about the endpoint
    
    Returns:
        A SamTask ready for agent processing
    """
    # Extract platform-specific data
    user_id = external_input.get("user_id")
    message_text = external_input.get("text", "")
    attachments = external_input.get("attachments", [])
    
    # Build task parts
    parts = []
    
    # Add text content
    if message_text:
        parts.append(SamTextPart(text=message_text))
    
    # Add file attachments
    for attachment in attachments:
        file_data = await self._download_attachment(attachment)
        parts.append(SamFilePart(
            filename=attachment["filename"],
            mime_type=attachment["mime_type"],
            data=file_data
        ))
    
    # Create the task
    task = SamTask(
        task_id=external_input.get("message_id"),
        user_id=user_id,
        parts=parts,
        platform_context={
            "channel_id": external_input.get("channel_id"),
            "thread_id": external_input.get("thread_id"),
        }
    )
    
    return task
10
The platform_context is preserved and passed back in ResponseContext so you know where to send responses.
11
Step 4: Handle Outbound Responses
12
Implement handlers for agent updates:
13
async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
    """Handle streaming text from the agent."""
    # Get platform-specific context
    channel_id = context.platform_context.get("channel_id")
    thread_id = context.platform_context.get("thread_id")
    
    # Send or update message on platform
    if not hasattr(context, "platform_message_id"):
        # First chunk - create new message
        message_id = await self.platform_client.send_message(
            channel_id=channel_id,
            thread_id=thread_id,
            text=text
        )
        context.platform_message_id = message_id
    else:
        # Subsequent chunks - update existing message
        await self.platform_client.update_message(
            message_id=context.platform_message_id,
            text=text
        )

async def handle_file(
    self,
    file_part: SamFilePart,
    context: ResponseContext
) -> None:
    """Handle file artifacts from the agent."""
    channel_id = context.platform_context.get("channel_id")
    
    # Upload file to platform
    await self.platform_client.upload_file(
        channel_id=channel_id,
        filename=file_part.filename,
        file_data=file_part.data,
        mime_type=file_part.mime_type
    )

async def handle_status_update(
    self,
    status_text: str,
    context: ResponseContext
) -> None:
    """Handle status/progress updates from the agent."""
    channel_id = context.platform_context.get("channel_id")
    
    # Show typing indicator or status message
    await self.platform_client.set_status(
        channel_id=channel_id,
        status=status_text
    )

async def handle_task_complete(self, context: ResponseContext) -> None:
    """Handle task completion."""
    # Clear typing indicators, update UI, etc.
    channel_id = context.platform_context.get("channel_id")
    await self.platform_client.clear_status(channel_id)

async def handle_error(
    self,
    error: SamError,
    context: ResponseContext
) -> None:
    """Handle errors from the agent."""
    channel_id = context.platform_context.get("channel_id")
    
    # Send error message to user
    await self.platform_client.send_message(
        channel_id=channel_id,
        text=f"❌ Error: {error.message}"
    )
14
Step 5: Handle Authentication (Optional)
15
Extract authentication claims from platform input:
16
async def extract_auth_claims(
    self,
    external_input: dict,
    endpoint_context: Optional[Dict[str, Any]] = None
) -> Optional[AuthClaims]:
    """Extract authentication claims from platform input."""
    user_id = external_input.get("user_id")
    
    if not user_id:
        return None
    
    # Fetch user details from platform
    user_info = await self.platform_client.get_user_info(user_id)
    
    return AuthClaims(
        user_id=user_id,
        email=user_info.get("email"),
        name=user_info.get("name"),
        roles=user_info.get("roles", []),
        metadata={
            "platform_user_id": user_id,
            "username": user_info.get("username")
        }
    )
17
Step 6: Set Up Event Listener
18
Listen for platform events and submit tasks:
19
async def _listen_for_events(self):
    """Listen for events from the platform and submit to SAM."""
    log.info("Starting event listener")
    
    async for event in self.platform_client.event_stream():
        try:
            # Only process message events
            if event["type"] != "message":
                continue
            
            # Submit task to SAM
            await self.gateway_context.submit_task(
                external_input=event,
                agent_name=None  # Use default agent
            )
        
        except Exception as e:
            log.error(f"Error processing event: {e}", exc_info=True)

Gateway Configuration

Create a YAML configuration file for your gateway:
my_gateway.yaml
log:
  stdout_log_level: INFO
  log_file_level: DEBUG
  log_file: my_gateway.log

!include shared_config.yaml

apps:
  - name: my_gateway_app
    app_base_path: .
    app_module: solace_agent_mesh.gateway.generic.app
    
    broker:
      <<: *broker_connection
    
    app_config:
      namespace: ${NAMESPACE}
      
      # Point to your adapter class
      gateway_adapter: my_gateway_package.adapter.MyGatewayAdapter
      
      # Adapter-specific configuration
      adapter_config:
        api_key: ${MY_PLATFORM_API_KEY}
        webhook_url: "https://myplatform.com/webhook"
        enable_notifications: true
      
      # Artifact service for file handling
      artifact_service:
        type: "filesystem"
        base_path: "/tmp/samv2"
        artifact_scope: "namespace"
      
      # Default agent to route tasks to
      default_agent_name: "OrchestratorAgent"
      
      # System instructions for agents
      system_purpose: |
        You are an AI assistant accessible via MyPlatform.
        Provide helpful, accurate responses and always return
        artifacts that users can download.
      
      response_format: |
        Format responses in markdown. Be concise and professional.
        Return files and artifacts to the user when created.
      
      # Embed resolution for dynamic content
      enable_embed_resolution: true
      gateway_artifact_content_limit_bytes: 10000000
      gateway_recursive_embed_depth: 3

Complete Example: Simple HTTP Gateway

Here’s a complete example of a simple HTTP webhook gateway:
simple_http_gateway.py
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
    GatewayContext, SamTask, SamTextPart, SamFilePart,
    ResponseContext, SamUpdate, SamError
)
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from fastapi import FastAPI, Request, HTTPException
import uvicorn
import asyncio
import logging

log = logging.getLogger(__name__)

class HttpGatewayConfig(BaseModel):
    """Configuration for HTTP gateway."""
    host: str = Field(default="0.0.0.0")
    port: int = Field(default=8000)
    auth_token: Optional[str] = Field(default=None)

class SimpleHttpGatewayAdapter(GatewayAdapter[dict, dict, HttpGatewayConfig]):
    """Simple HTTP webhook gateway adapter."""
    
    ConfigModel = HttpGatewayConfig
    
    def __init__(self):
        super().__init__()
        self.gateway_context: Optional[GatewayContext] = None
        self.app = FastAPI()
        self.server = None
        self.pending_responses: Dict[str, list] = {}
    
    async def init(self, context: GatewayContext) -> None:
        """Initialize the HTTP server."""
        self.gateway_context = context
        config: HttpGatewayConfig = context.adapter_config
        
        # Set up routes
        self.app.post("/webhook")(self._handle_webhook)
        
        # Start server
        server_config = uvicorn.Config(
            self.app,
            host=config.host,
            port=config.port,
            log_level="info"
        )
        self.server = uvicorn.Server(server_config)
        asyncio.create_task(self.server.serve())
        
        log.info(f"HTTP Gateway listening on {config.host}:{config.port}")
    
    async def cleanup(self) -> None:
        """Shutdown the HTTP server."""
        if self.server:
            self.server.should_exit = True
    
    async def _handle_webhook(self, request: Request) -> dict:
        """Handle incoming webhook requests."""
        # Verify authentication
        config: HttpGatewayConfig = self.gateway_context.adapter_config
        if config.auth_token:
            auth_header = request.headers.get("Authorization")
            if auth_header != f"Bearer {config.auth_token}":
                raise HTTPException(status_code=401, detail="Unauthorized")
        
        # Parse request
        body = await request.json()
        
        # Generate task ID
        task_id = body.get("task_id") or str(uuid.uuid4())
        
        # Store response collector
        self.pending_responses[task_id] = []
        
        # Submit task to SAM
        await self.gateway_context.submit_task(
            external_input=body,
            agent_name=body.get("agent_name")
        )
        
        # Wait for completion (with timeout)
        try:
            await asyncio.wait_for(
                self._wait_for_completion(task_id),
                timeout=300
            )
        except asyncio.TimeoutError:
            return {"error": "Task timeout"}
        
        # Return collected responses
        responses = self.pending_responses.pop(task_id, [])
        return {
            "task_id": task_id,
            "responses": responses
        }
    
    async def _wait_for_completion(self, task_id: str):
        """Wait for task completion."""
        while task_id in self.pending_responses:
            await asyncio.sleep(0.1)
    
    async def prepare_task(
        self,
        external_input: dict,
        endpoint_context: Optional[Dict[str, Any]] = None
    ) -> SamTask:
        """Prepare SAM task from HTTP request."""
        task_id = external_input.get("task_id")
        message = external_input.get("message", "")
        user_id = external_input.get("user_id", "http_user")
        
        return SamTask(
            task_id=task_id,
            user_id=user_id,
            parts=[SamTextPart(text=message)],
            platform_context={"task_id": task_id}
        )
    
    async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
        """Collect text responses."""
        task_id = context.platform_context.get("task_id")
        if task_id in self.pending_responses:
            self.pending_responses[task_id].append({
                "type": "text",
                "content": text
            })
    
    async def handle_file(
        self,
        file_part: SamFilePart,
        context: ResponseContext
    ) -> None:
        """Collect file responses."""
        task_id = context.platform_context.get("task_id")
        if task_id in self.pending_responses:
            self.pending_responses[task_id].append({
                "type": "file",
                "filename": file_part.filename,
                "mime_type": file_part.mime_type,
                "size": len(file_part.data)
            })
    
    async def handle_task_complete(self, context: ResponseContext) -> None:
        """Mark task as complete."""
        task_id = context.platform_context.get("task_id")
        if task_id in self.pending_responses:
            # Signal completion by removing from pending
            # (the waiter will detect this)
            pass
    
    async def handle_error(
        self,
        error: SamError,
        context: ResponseContext
    ) -> None:
        """Handle errors."""
        task_id = context.platform_context.get("task_id")
        if task_id in self.pending_responses:
            self.pending_responses[task_id].append({
                "type": "error",
                "message": error.message
            })

Advanced Features

Agent Registry Updates

Handle agent discovery events:
async def handle_agent_added(self, agent_card: AgentCard) -> None:
    """Called when a new agent joins the mesh."""
    log.info(f"New agent available: {agent_card.agent_name}")
    # Update UI, send notifications, etc.

async def handle_agent_removed(self, agent_name: str) -> None:
    """Called when an agent leaves the mesh."""
    log.info(f"Agent unavailable: {agent_name}")

Session Management

Manage user sessions:
async def prepare_task(
    self,
    external_input: dict,
    endpoint_context: Optional[Dict[str, Any]] = None
) -> SamTask:
    """Prepare task with session management."""
    user_id = external_input.get("user_id")
    channel_id = external_input.get("channel_id")
    
    # Generate session ID based on user and channel
    session_id = f"{user_id}:{channel_id}"
    
    return SamTask(
        task_id=external_input.get("message_id"),
        user_id=user_id,
        session_id=session_id,  # Maintains conversation context
        parts=[SamTextPart(text=external_input.get("text", ""))],
        platform_context={"channel_id": channel_id}
    )

Testing Your Gateway

1
Unit Tests
2
Test adapter methods in isolation:
3
import pytest
from unittest.mock import Mock, AsyncMock
from my_gateway import MyGatewayAdapter

@pytest.mark.asyncio
async def test_prepare_task():
    adapter = MyGatewayAdapter()
    
    external_input = {
        "user_id": "user123",
        "text": "Hello, agent!",
        "message_id": "msg456"
    }
    
    task = await adapter.prepare_task(external_input)
    
    assert task.user_id == "user123"
    assert len(task.parts) == 1
    assert task.parts[0].text == "Hello, agent!"
4
Integration Tests
5
Test the complete gateway:
6
# Start your gateway
sam run configs/gateways/my_gateway.yaml

# Send test request
curl -X POST http://localhost:8000/webhook \
  -H "Content-Type: application/json" \
  -d '{"message": "Test message", "user_id": "test_user"}'

Best Practices

Gateway Development Tips:
  1. Error Handling: Always catch and log exceptions; send user-friendly error messages
  2. Rate Limiting: Implement rate limits to prevent abuse
  3. Authentication: Validate all incoming requests
  4. Async Operations: Use async/await for all I/O operations
  5. Graceful Shutdown: Clean up resources in the cleanup method
  6. Logging: Log important events and errors for debugging
  7. Testing: Write comprehensive tests for all adapter methods
  8. Documentation: Document platform-specific quirks and limitations

Troubleshooting

Common Issues:
  1. Tasks not routing: Check default_agent_name in config
  2. Files not uploading: Verify artifact_service configuration
  3. Responses not streaming: Ensure platform supports streaming or buffer responses
  4. Authentication failing: Verify token extraction in extract_auth_claims
  5. Memory leaks: Clean up platform connections in cleanup

Next Steps

Real-World Examples

Explore production gateway implementations:
  • examples/gateways/slack_gateway_example.yaml - Full Slack integration
  • examples/gateways/rest_gateway_example.yaml - REST API gateway
  • examples/gateways/mcp_gateway_example.yaml - MCP server gateway
  • examples/gateways/webui_gateway_example.yaml - Web UI gateway

Build docs developers (and LLMs) love