Skip to main content
The GatewayClient class provides a WebSocket client for connecting to OpenClaw Gateway, enabling external integrations and command routing.

Overview

GatewayClient handles:
  • WebSocket connection management with automatic reconnection
  • Message sending via JSON-RPC 2.0 protocol
  • Heartbeat keepalive mechanism
  • Message queuing during disconnection
  • Exponential backoff for reconnection attempts

Constructor

from agenticai.gateway.client import GatewayClient

gateway = GatewayClient(
    url="ws://127.0.0.1:18789",
    max_reconnect_attempts=10,
    reconnect_base_delay=1.0,
    reconnect_max_delay=60.0
)
url
str
default:"ws://127.0.0.1:18789"
WebSocket URL for the OpenClaw Gateway.
max_reconnect_attempts
int
default:"10"
Maximum number of reconnection attempts before giving up.
reconnect_base_delay
float
default:"1.0"
Base delay in seconds for exponential backoff.
reconnect_max_delay
float
default:"60.0"
Maximum delay in seconds between reconnection attempts.

Properties

is_connected

@property
def is_connected(self) -> bool
Returns True if currently connected to the gateway.
is_connected
bool
Connection status.
Example:
if gateway.is_connected:
    print("Connected to gateway")
else:
    print("Not connected")

Methods

connect

async def connect(self) -> None
Connects to the gateway with automatic reconnection. This method runs continuously and handles reconnection logic. Example:
import asyncio
from agenticai.gateway.client import GatewayClient

async def main():
    gateway = GatewayClient(url="ws://localhost:18789")
    
    # Start connection (runs in background)
    connect_task = asyncio.create_task(gateway.connect())
    
    # Wait for connection
    await asyncio.sleep(1)
    
    if gateway.is_connected:
        print("Successfully connected!")
    
    # Do work...
    
    # Cleanup
    await gateway.disconnect()
    connect_task.cancel()

asyncio.run(main())

disconnect

async def disconnect(self) -> None
Disconnects from the gateway and cancels all background tasks. Example:
await gateway.disconnect()
print("Disconnected from gateway")

send_message

async def send_message(self, message: GatewayMessage) -> None
Sends a message to the gateway. If disconnected, the message is queued for later delivery.
message
GatewayMessage
required
Gateway message to send (e.g., CallStartedMessage, CallEndedMessage, HeartbeatMessage).
Example:
from agenticai.gateway.messages import CallStartedMessage

message = CallStartedMessage(
    call_id="call-123",
    to_number="+1234567890",
    prompt="You are a helpful assistant.",
    metadata={"purpose": "customer_support"}
)

await gateway.send_message(message)

Gateway Messages

CallStartedMessage

Sent when a call begins:
from agenticai.gateway.messages import CallStartedMessage

message = CallStartedMessage(
    call_id="550e8400-e29b-41d4-a716-446655440000",
    to_number="+1234567890",
    prompt="You are a helpful AI assistant.",
    metadata={
        "direction": "outbound",
        "campaign": "customer_outreach"
    }
)

await gateway.send_message(message)

CallEndedMessage

Sent when a call completes:
from agenticai.gateway.messages import CallEndedMessage

message = CallEndedMessage(
    call_id="550e8400-e29b-41d4-a716-446655440000",
    duration=125.5,
    outcome="completed",
    full_transcript="User: Hello\nAssistant: Hi! How can I help you?"
)

await gateway.send_message(message)

HeartbeatMessage

Sent automatically every 30 seconds to keep the connection alive:
from agenticai.gateway.messages import HeartbeatMessage

message = HeartbeatMessage()
await gateway.send_message(message)

JSON-RPC Protocol

All messages are sent using JSON-RPC 2.0 format via the sessions_send method:
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "sessions_send",
  "params": {
    "message": {
      "type": "call_started",
      "call_id": "call-123",
      "to_number": "+1234567890",
      "prompt": "You are a helpful assistant.",
      "metadata": {}
    }
  }
}

Response Format

Success:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {"status": "ok"}
}
Error:
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32600,
    "message": "Invalid request"
  }
}

Reconnection Logic

The client uses exponential backoff for reconnection:
delay = min(
    reconnect_base_delay * (2 ** attempt_number),
    reconnect_max_delay
)
Example progression (base=1.0, max=60.0):
  • Attempt 1: 1 second
  • Attempt 2: 2 seconds
  • Attempt 3: 4 seconds
  • Attempt 4: 8 seconds
  • Attempt 5: 16 seconds
  • Attempt 6: 32 seconds
  • Attempt 7+: 60 seconds (capped)

Message Queue

When disconnected, messages are queued (up to 1000 messages) and sent automatically upon reconnection:
# Messages are queued if disconnected
await gateway.send_message(message1)  # Queued
await gateway.send_message(message2)  # Queued

# Connection restored
# → All queued messages sent automatically

Complete Example

import asyncio
from agenticai.gateway.client import GatewayClient
from agenticai.gateway.messages import (
    CallStartedMessage,
    CallEndedMessage,
    HeartbeatMessage
)
from agenticai.core.config import get_config

async def gateway_example():
    # Load configuration
    config = get_config("config.yaml")
    
    # Initialize gateway client
    gateway = GatewayClient(
        url=config.gateway.url,
        max_reconnect_attempts=config.gateway.reconnect_max_attempts,
        reconnect_base_delay=config.gateway.reconnect_base_delay,
        reconnect_max_delay=config.gateway.reconnect_max_delay,
    )
    
    # Start connection in background
    connect_task = asyncio.create_task(gateway.connect())
    
    # Wait for connection
    await asyncio.sleep(2)
    
    if gateway.is_connected:
        print("✓ Connected to gateway")
        
        # Send call started event
        await gateway.send_message(CallStartedMessage(
            call_id="demo-call-001",
            to_number="+1234567890",
            prompt="You are a sales assistant.",
            metadata={"campaign": "q1_outreach"}
        ))
        print("→ Sent call started event")
        
        # Simulate call duration
        await asyncio.sleep(5)
        
        # Send call ended event
        await gateway.send_message(CallEndedMessage(
            call_id="demo-call-001",
            duration=5.2,
            outcome="completed",
            full_transcript="User: Hello\nAssistant: Hi there!"
        ))
        print("→ Sent call ended event")
        
        # Manual heartbeat (optional - sent automatically)
        await gateway.send_message(HeartbeatMessage())
        print("→ Sent heartbeat")
    else:
        print("⚠ Connection failed")
    
    # Cleanup
    await gateway.disconnect()
    connect_task.cancel()
    try:
        await connect_task
    except asyncio.CancelledError:
        pass
    
    print("✓ Disconnected")

asyncio.run(gateway_example())

Integration with CallManager

CallManager automatically creates and manages a GatewayClient instance:
from agenticai.core import CallManager
from agenticai.core.config import get_config

async def main():
    config = get_config("config.yaml")
    call_manager = CallManager(config)
    
    # Gateway client is initialized automatically
    await call_manager.start()
    
    # Gateway events are sent automatically:
    # - call_started when call initiates
    # - call_ended when call completes
    
    call_id = await call_manager.initiate_call(
        to_number="+1234567890",
        prompt="You are helpful.",
        webhook_base_url="https://example.com"
    )
    # → CallStartedMessage sent to gateway
    
    # Wait for call...
    
    await call_manager.end_call(call_id)
    # → CallEndedMessage sent to gateway
    
    await call_manager.stop()

Error Handling

try:
    await gateway.send_message(message)
except Exception as e:
    print(f"Failed to send message: {e}")
    # Message is automatically queued if disconnected
Connection errors are logged but don’t raise exceptions. The client automatically attempts to reconnect.

Build docs developers (and LLMs) love