Skip to main content

Overview

While Solace Agent Mesh primarily uses Server-Sent Events (SSE) for streaming, WebSocket support is available for bidirectional real-time communication scenarios.

WebSocket vs SSE

Pros:
  • Simpler protocol (HTTP-based)
  • Automatic reconnection
  • Built-in event typing
  • Works through most proxies/firewalls
  • Lower overhead
Use When:
  • Agent sends updates to client (primary use case)
  • One-way streaming is sufficient
  • Maximum compatibility needed

WebSocket

Pros:
  • True bidirectional communication
  • Lower latency for rapid exchanges
  • Full-duplex messaging
Use When:
  • Client needs to send frequent updates during streaming
  • Interactive real-time features required
  • Both parties send high-frequency messages

Connection

WebSocket URL

ws://localhost:8080/ws
Or with SSL:
wss://localhost:8443/ws

Authentication

Include auth token in query parameter:
wss://localhost:8443/ws?token=YOUR_AUTH_TOKEN

Example Connection

const ws = new WebSocket(
  'ws://localhost:8080/ws?token=YOUR_TOKEN'
);

ws.onopen = () => {
  console.log('Connected to agent mesh');
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  console.log('Received:', message);
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = () => {
  console.log('Disconnected');
};

Message Protocol

All messages use JSON format with A2A protocol structure.

Message Structure

interface WebSocketMessage {
  type: string;           // Message type
  id?: string;           // Message ID (for request/response)
  payload: any;          // Message payload
  timestamp: string;     // ISO 8601 timestamp
}

Client → Server Messages

Execute Task

{
  "type": "task.execute",
  "id": "req_123",
  "payload": {
    "agentName": "FileSystemAgent",
    "sessionId": "user123_session",
    "message": {
      "text": "List all files in the current directory"
    }
  }
}

Cancel Task

{
  "type": "task.cancel",
  "id": "req_124",
  "payload": {
    "taskId": "task_abc123"
  }
}

Send Message to Running Task

{
  "type": "task.message",
  "id": "req_125",
  "payload": {
    "taskId": "task_abc123",
    "message": {
      "text": "Please include hidden files too"
    }
  }
}

Subscribe to Task Updates

{
  "type": "task.subscribe",
  "id": "req_126",
  "payload": {
    "taskId": "task_abc123"
  }
}

Unsubscribe from Task Updates

{
  "type": "task.unsubscribe",
  "id": "req_127",
  "payload": {
    "taskId": "task_abc123"
  }
}

Server → Client Messages

Task Status Update

{
  "type": "task.status",
  "payload": {
    "taskId": "task_abc123",
    "status": "running",
    "timestamp": "2024-01-15T10:30:00Z"
  }
}

Task Message (Agent Response)

{
  "type": "task.message",
  "payload": {
    "taskId": "task_abc123",
    "message": {
      "text": "I found 5 files in the directory:",
      "role": "agent"
    },
    "timestamp": "2024-01-15T10:30:05Z"
  }
}

Artifact Created

{
  "type": "artifact.created",
  "payload": {
    "taskId": "task_abc123",
    "artifact": {
      "filename": "file_list.txt",
      "version": 1,
      "mimeType": "text/plain",
      "size": 1024,
      "downloadUrl": "/api/v1/artifacts/file_list.txt"
    }
  }
}

Artifact Progress

{
  "type": "artifact.progress",
  "payload": {
    "taskId": "task_abc123",
    "filename": "large_report.pdf",
    "status": "in-progress",
    "bytesTransferred": 512000,
    "totalBytes": 1048576,
    "percentage": 48.8
  }
}

Task Completed

{
  "type": "task.completed",
  "payload": {
    "taskId": "task_abc123",
    "result": {
      "message": {
        "text": "Task completed successfully."
      },
      "artifacts": [
        {
          "filename": "file_list.txt",
          "version": 1
        }
      ]
    },
    "timestamp": "2024-01-15T10:30:15Z"
  }
}

Error

{
  "type": "error",
  "id": "req_123",
  "payload": {
    "code": "AGENT_NOT_FOUND",
    "message": "Agent 'UnknownAgent' not found",
    "details": {
      "agentName": "UnknownAgent"
    }
  }
}

Complete Example

Interactive Task Execution

class AgentMeshClient {
  constructor(url, token) {
    this.ws = new WebSocket(`${url}?token=${token}`);
    this.requestId = 0;
    this.pendingRequests = new Map();
    
    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.handleMessage(message);
    };
  }
  
  // Send request and wait for response
  async request(type, payload) {
    return new Promise((resolve, reject) => {
      const id = `req_${++this.requestId}`;
      
      this.pendingRequests.set(id, { resolve, reject });
      
      this.ws.send(JSON.stringify({
        type,
        id,
        payload,
        timestamp: new Date().toISOString(),
      }));
      
      // Timeout after 30 seconds
      setTimeout(() => {
        if (this.pendingRequests.has(id)) {
          this.pendingRequests.delete(id);
          reject(new Error('Request timeout'));
        }
      }, 30000);
    });
  }
  
  // Handle incoming messages
  handleMessage(message) {
    // Handle responses to requests
    if (message.id && this.pendingRequests.has(message.id)) {
      const { resolve, reject } = this.pendingRequests.get(message.id);
      this.pendingRequests.delete(message.id);
      
      if (message.type === 'error') {
        reject(new Error(message.payload.message));
      } else {
        resolve(message.payload);
      }
      return;
    }
    
    // Handle push notifications
    switch (message.type) {
      case 'task.message':
        this.onTaskMessage(message.payload);
        break;
      case 'task.status':
        this.onTaskStatus(message.payload);
        break;
      case 'artifact.created':
        this.onArtifactCreated(message.payload);
        break;
      case 'task.completed':
        this.onTaskCompleted(message.payload);
        break;
    }
  }
  
  // Execute task
  async executeTask(agentName, sessionId, message) {
    return await this.request('task.execute', {
      agentName,
      sessionId,
      message,
    });
  }
  
  // Cancel task
  async cancelTask(taskId) {
    return await this.request('task.cancel', { taskId });
  }
  
  // Subscribe to task updates
  async subscribeToTask(taskId) {
    return await this.request('task.subscribe', { taskId });
  }
  
  // Event handlers (override these)
  onTaskMessage(payload) {
    console.log('Task message:', payload);
  }
  
  onTaskStatus(payload) {
    console.log('Task status:', payload.status);
  }
  
  onArtifactCreated(payload) {
    console.log('Artifact created:', payload.artifact.filename);
  }
  
  onTaskCompleted(payload) {
    console.log('Task completed:', payload.taskId);
  }
}

// Usage
const client = new AgentMeshClient(
  'ws://localhost:8080/ws',
  'YOUR_TOKEN'
);

// Override event handlers
client.onTaskMessage = (payload) => {
  console.log('Agent:', payload.message.text);
};

client.onArtifactCreated = (payload) => {
  console.log('Download:', payload.artifact.downloadUrl);
};

// Execute task
try {
  const result = await client.executeTask(
    'FileSystemAgent',
    'user123_session',
    { text: 'List all files' }
  );
  
  console.log('Task created:', result.taskId);
  
  // Subscribe to updates
  await client.subscribeToTask(result.taskId);
  
} catch (error) {
  console.error('Error:', error);
}

Connection Management

Heartbeat/Ping-Pong

WebSocket connection includes automatic ping/pong for keep-alive:
// Server sends ping every 30 seconds
// Client automatically responds with pong

// Manual ping (if needed)
ws.send(JSON.stringify({
  type: 'ping',
  timestamp: new Date().toISOString(),
}));

// Server responds with pong
{
  "type": "pong",
  "timestamp": "2024-01-15T10:30:00Z"
}

Reconnection

class ReconnectingWebSocket {
  constructor(url, token) {
    this.url = url;
    this.token = token;
    this.reconnectInterval = 1000;
    this.maxReconnectInterval = 30000;
    this.connect();
  }
  
  connect() {
    this.ws = new WebSocket(`${this.url}?token=${this.token}`);
    
    this.ws.onopen = () => {
      console.log('Connected');
      this.reconnectInterval = 1000; // Reset backoff
    };
    
    this.ws.onclose = () => {
      console.log('Disconnected, reconnecting...');
      setTimeout(() => {
        this.reconnectInterval = Math.min(
          this.reconnectInterval * 2,
          this.maxReconnectInterval
        );
        this.connect();
      }, this.reconnectInterval);
    };
  }
  
  send(data) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(data);
    } else {
      console.warn('WebSocket not ready, buffering message');
      // Buffer message for retry
    }
  }
}

Best Practices

  1. Use SSE for most scenarios: WebSocket adds complexity
  2. Handle reconnection: Implement exponential backoff
  3. Buffer messages: Queue sends when connection is down
  4. Validate messages: Always parse and validate incoming JSON
  5. Set timeouts: Don’t wait indefinitely for responses
  6. Clean up: Close connections when done
For most use cases, SSE provides a simpler alternative:
// Much simpler for one-way streaming
const eventSource = new EventSource(
  'http://localhost:8080/api/v1/tasks/task_abc123/stream'
);

eventSource.addEventListener('task.message', (event) => {
  const data = JSON.parse(event.data);
  console.log('Agent:', data.message.text);
});

// Automatic reconnection built-in
// No manual ping/pong needed
// Works through more proxies

See Also

Build docs developers (and LLMs) love