The GatewayClient class provides a WebSocket client for connecting to OpenClaw Gateway, enabling external integrations and command routing.
Overview
GatewayClient handles:
- WebSocket connection management with automatic reconnection
- Message sending via JSON-RPC 2.0 protocol
- Heartbeat keepalive mechanism
- Message queuing during disconnection
- Exponential backoff for reconnection attempts
Constructor
from agenticai.gateway.client import GatewayClient
gateway = GatewayClient(
url="ws://127.0.0.1:18789",
max_reconnect_attempts=10,
reconnect_base_delay=1.0,
reconnect_max_delay=60.0
)
url
str
default:"ws://127.0.0.1:18789"
WebSocket URL for the OpenClaw Gateway.
Maximum number of reconnection attempts before giving up.
Base delay in seconds for exponential backoff.
Maximum delay in seconds between reconnection attempts.
Properties
is_connected
@property
def is_connected(self) -> bool
Returns True if currently connected to the gateway.
Example:
if gateway.is_connected:
print("Connected to gateway")
else:
print("Not connected")
Methods
connect
async def connect(self) -> None
Connects to the gateway with automatic reconnection. This method runs continuously and handles reconnection logic.
Example:
import asyncio
from agenticai.gateway.client import GatewayClient
async def main():
gateway = GatewayClient(url="ws://localhost:18789")
# Start connection (runs in background)
connect_task = asyncio.create_task(gateway.connect())
# Wait for connection
await asyncio.sleep(1)
if gateway.is_connected:
print("Successfully connected!")
# Do work...
# Cleanup
await gateway.disconnect()
connect_task.cancel()
asyncio.run(main())
disconnect
async def disconnect(self) -> None
Disconnects from the gateway and cancels all background tasks.
Example:
await gateway.disconnect()
print("Disconnected from gateway")
send_message
async def send_message(self, message: GatewayMessage) -> None
Sends a message to the gateway. If disconnected, the message is queued for later delivery.
Gateway message to send (e.g., CallStartedMessage, CallEndedMessage, HeartbeatMessage).
Example:
from agenticai.gateway.messages import CallStartedMessage
message = CallStartedMessage(
call_id="call-123",
to_number="+1234567890",
prompt="You are a helpful assistant.",
metadata={"purpose": "customer_support"}
)
await gateway.send_message(message)
Gateway Messages
CallStartedMessage
Sent when a call begins:
from agenticai.gateway.messages import CallStartedMessage
message = CallStartedMessage(
call_id="550e8400-e29b-41d4-a716-446655440000",
to_number="+1234567890",
prompt="You are a helpful AI assistant.",
metadata={
"direction": "outbound",
"campaign": "customer_outreach"
}
)
await gateway.send_message(message)
CallEndedMessage
Sent when a call completes:
from agenticai.gateway.messages import CallEndedMessage
message = CallEndedMessage(
call_id="550e8400-e29b-41d4-a716-446655440000",
duration=125.5,
outcome="completed",
full_transcript="User: Hello\nAssistant: Hi! How can I help you?"
)
await gateway.send_message(message)
HeartbeatMessage
Sent automatically every 30 seconds to keep the connection alive:
from agenticai.gateway.messages import HeartbeatMessage
message = HeartbeatMessage()
await gateway.send_message(message)
JSON-RPC Protocol
All messages are sent using JSON-RPC 2.0 format via the sessions_send method:
{
"jsonrpc": "2.0",
"id": 1,
"method": "sessions_send",
"params": {
"message": {
"type": "call_started",
"call_id": "call-123",
"to_number": "+1234567890",
"prompt": "You are a helpful assistant.",
"metadata": {}
}
}
}
Success:
{
"jsonrpc": "2.0",
"id": 1,
"result": {"status": "ok"}
}
Error:
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32600,
"message": "Invalid request"
}
}
Reconnection Logic
The client uses exponential backoff for reconnection:
delay = min(
reconnect_base_delay * (2 ** attempt_number),
reconnect_max_delay
)
Example progression (base=1.0, max=60.0):
- Attempt 1: 1 second
- Attempt 2: 2 seconds
- Attempt 3: 4 seconds
- Attempt 4: 8 seconds
- Attempt 5: 16 seconds
- Attempt 6: 32 seconds
- Attempt 7+: 60 seconds (capped)
Message Queue
When disconnected, messages are queued (up to 1000 messages) and sent automatically upon reconnection:
# Messages are queued if disconnected
await gateway.send_message(message1) # Queued
await gateway.send_message(message2) # Queued
# Connection restored
# → All queued messages sent automatically
Complete Example
import asyncio
from agenticai.gateway.client import GatewayClient
from agenticai.gateway.messages import (
CallStartedMessage,
CallEndedMessage,
HeartbeatMessage
)
from agenticai.core.config import get_config
async def gateway_example():
# Load configuration
config = get_config("config.yaml")
# Initialize gateway client
gateway = GatewayClient(
url=config.gateway.url,
max_reconnect_attempts=config.gateway.reconnect_max_attempts,
reconnect_base_delay=config.gateway.reconnect_base_delay,
reconnect_max_delay=config.gateway.reconnect_max_delay,
)
# Start connection in background
connect_task = asyncio.create_task(gateway.connect())
# Wait for connection
await asyncio.sleep(2)
if gateway.is_connected:
print("✓ Connected to gateway")
# Send call started event
await gateway.send_message(CallStartedMessage(
call_id="demo-call-001",
to_number="+1234567890",
prompt="You are a sales assistant.",
metadata={"campaign": "q1_outreach"}
))
print("→ Sent call started event")
# Simulate call duration
await asyncio.sleep(5)
# Send call ended event
await gateway.send_message(CallEndedMessage(
call_id="demo-call-001",
duration=5.2,
outcome="completed",
full_transcript="User: Hello\nAssistant: Hi there!"
))
print("→ Sent call ended event")
# Manual heartbeat (optional - sent automatically)
await gateway.send_message(HeartbeatMessage())
print("→ Sent heartbeat")
else:
print("⚠ Connection failed")
# Cleanup
await gateway.disconnect()
connect_task.cancel()
try:
await connect_task
except asyncio.CancelledError:
pass
print("✓ Disconnected")
asyncio.run(gateway_example())
Integration with CallManager
CallManager automatically creates and manages a GatewayClient instance:
from agenticai.core import CallManager
from agenticai.core.config import get_config
async def main():
config = get_config("config.yaml")
call_manager = CallManager(config)
# Gateway client is initialized automatically
await call_manager.start()
# Gateway events are sent automatically:
# - call_started when call initiates
# - call_ended when call completes
call_id = await call_manager.initiate_call(
to_number="+1234567890",
prompt="You are helpful.",
webhook_base_url="https://example.com"
)
# → CallStartedMessage sent to gateway
# Wait for call...
await call_manager.end_call(call_id)
# → CallEndedMessage sent to gateway
await call_manager.stop()
Error Handling
try:
await gateway.send_message(message)
except Exception as e:
print(f"Failed to send message: {e}")
# Message is automatically queued if disconnected
Connection errors are logged but don’t raise exceptions. The client automatically attempts to reconnect.