Skip to main content

Overview

BioAgents uses WebSockets to provide real-time notifications following the “Notify + Fetch” pattern:
  • WebSocket sends lightweight notifications (job status changes)
  • Client fetches actual data via HTTP after notification
  • Reduces bandwidth and improves scalability

Architecture

┌─────────────┐
│   Client    │
│ (Browser)   │
└──────┬──────┘

       │ 1. WebSocket /api/ws
       │ 2. Auth with JWT
       │ 3. Subscribe to conversation

┌─────────────┐     ┌─────────────┐
│  API Server │────▶│    Redis    │
│  (Elysia)   │     │  (Pub/Sub)  │
└─────────────┘     └──────┬──────┘

                           │ Notifications

                    ┌─────────────┐
                    │   Worker    │
                    │  (BullMQ)   │
                    └─────────────┘
WebSockets are only available when USE_JOB_QUEUE=true (requires Redis).

WebSocket Handler

Connection Endpoint

ws://localhost:3000/api/ws

Authentication Flow

1

Connect to WebSocket

Client establishes WebSocket connection:
const ws = new WebSocket('ws://localhost:3000/api/ws');
2

Server Sends Ready Message

Server responds with ready message:
{
  "type": "ready",
  "message": "Send auth message with JWT token"
}
3

Client Sends Auth

Client must authenticate within 10 seconds:Production (JWT):
{
  "action": "auth",
  "token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
}
Development (AUTH_MODE=none):
{
  "action": "auth",
  "userId": "user-uuid-123"
}
4

Server Confirms Authentication

Server responds with confirmation:
{
  "type": "authenticated",
  "userId": "user-uuid-123"
}
Authentication Timeout: Connections that don’t authenticate within 10 seconds are automatically closed with code 4001.

Implementation

The WebSocket handler is implemented in Elysia:
// src/services/websocket/handler.ts
import { Elysia } from "elysia";
import { verifyJWT } from "../jwt";
import logger from "../../utils/logger";

const AUTH_TIMEOUT_MS = 10000;

export const websocketHandler = new Elysia().ws("/api/ws", {
  async open(ws) {
    // Initialize connection state
    (ws.data as any).userId = null;
    (ws.data as any).subscriptions = new Set<string>();

    // Set authentication timeout
    const timeout = setTimeout(() => {
      if (!(ws.data as any).userId) {
        ws.send(JSON.stringify({ 
          type: "error", 
          message: "Authentication timeout" 
        }));
        ws.close(4001, "Authentication timeout");
      }
    }, AUTH_TIMEOUT_MS);

    // Send ready message
    ws.send(JSON.stringify({ 
      type: "ready", 
      message: "Send auth message with JWT token" 
    }));
  },
  
  async message(ws, message) {
    const data = JSON.parse(message);
    
    // Handle authentication
    if (data.action === "auth") {
      // Verify JWT and set userId
      // ...
    }
    
    // Handle subscriptions (after auth)
    if (data.action === "subscribe") {
      // Subscribe to conversation
      // ...
    }
  },
  
  close(ws) {
    // Cleanup subscriptions
    // ...
  },
});

Subscribing to Conversations

Subscribe

After authentication, subscribe to conversation updates:
ws.send(JSON.stringify({
  action: 'subscribe',
  conversationId: 'conversation-uuid-123'
}));
Server confirms subscription:
{
  "type": "subscribed",
  "conversationId": "conversation-uuid-123"
}

Unsubscribe

ws.send(JSON.stringify({
  action: 'unsubscribe',
  conversationId: 'conversation-uuid-123'
}));
Server confirms:
{
  "type": "unsubscribed",
  "conversationId": "conversation-uuid-123"
}

Access Control

Server validates that user owns the conversation before allowing subscription:
// Check user access
const { getUserConversations } = await import("../../db/operations");
const userConversations = await getUserConversations(userId);
const allowedConversations = new Set(userConversations.map(c => c.id));

if (!allowedConversations.has(conversationId)) {
  ws.send(JSON.stringify({
    type: "error",
    message: "Access denied to conversation"
  }));
  return;
}

Notification Types

Job Status Updates

{
  "type": "job-status",
  "jobId": "job-123",
  "status": "completed",
  "conversationId": "conversation-uuid-123",
  "messageId": "message-uuid-456"
}
Possible status values:
  • waiting - Job queued
  • active - Job processing
  • completed - Job finished successfully
  • failed - Job failed with error

Deep Research Progress

{
  "type": "research-progress",
  "jobId": "job-123",
  "conversationId": "conversation-uuid-123",
  "messageId": "message-uuid-456",
  "progress": {
    "currentLevel": 1,
    "completedTasks": 3,
    "totalTasks": 5,
    "phase": "execution"
  }
}

Task Completion

{
  "type": "task-completed",
  "jobId": "job-123",
  "conversationId": "conversation-uuid-123",
  "taskId": "task-abc",
  "taskType": "LITERATURE"
}

Client Implementation

React/Preact Example

import { useEffect, useState } from 'preact/hooks';

interface WebSocketNotification {
  type: string;
  jobId?: string;
  status?: string;
  conversationId?: string;
}

export function useWebSocket(conversationId: string, token: string) {
  const [notifications, setNotifications] = useState<WebSocketNotification[]>([]);
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    const ws = new WebSocket('ws://localhost:3000/api/ws');

    ws.onopen = () => {
      console.log('WebSocket connected');
    };

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      // Handle ready message
      if (data.type === 'ready') {
        // Send authentication
        ws.send(JSON.stringify({
          action: 'auth',
          token: token,
        }));
      }

      // Handle authenticated
      if (data.type === 'authenticated') {
        setConnected(true);
        
        // Subscribe to conversation
        ws.send(JSON.stringify({
          action: 'subscribe',
          conversationId: conversationId,
        }));
      }

      // Handle notifications
      if (data.type === 'job-status' || data.type === 'research-progress') {
        setNotifications(prev => [...prev, data]);
        
        // Fetch updated data via HTTP
        if (data.type === 'job-status' && data.status === 'completed') {
          fetchMessageUpdate(data.messageId);
        }
      }
    };

    ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    ws.onclose = () => {
      console.log('WebSocket disconnected');
      setConnected(false);
    };

    // Cleanup on unmount
    return () => {
      ws.close();
    };
  }, [conversationId, token]);

  return { notifications, connected };
}

async function fetchMessageUpdate(messageId: string) {
  const response = await fetch(`/api/messages/${messageId}`);
  const message = await response.json();
  // Update UI with fresh message data
}

Heartbeat/Ping-Pong

Keep connection alive with ping-pong:
// Client sends ping every 30 seconds
setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ action: 'ping' }));
  }
}, 30000);

// Server responds with pong
ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'pong') {
    console.log('Pong received');
  }
};

Redis Pub/Sub Integration

Workers publish notifications to Redis, which the API server subscribes to:

Worker Side (Publishing)

// src/services/queue/notify.ts
import { getPublisher } from "./connection";

export async function notifyJobStatus(
  conversationId: string,
  notification: object
) {
  const publisher = getPublisher();
  const channel = `conversation:${conversationId}`;
  
  await publisher.publish(channel, JSON.stringify(notification));
}
Example usage in worker:
// Worker job handler
await notifyJobStatus(conversationId, {
  type: 'job-status',
  jobId: job.id,
  status: 'completed',
  conversationId,
  messageId,
});

API Server Side (Subscribing)

// src/services/websocket/subscribe.ts
import { getSubscriber } from "../queue/connection";
import { broadcastToConversation } from "./handler";

export async function startRedisSubscription() {
  const subscriber = getSubscriber();

  // Subscribe to all conversation channels
  await subscriber.psubscribe("conversation:*");

  subscriber.on("pmessage", (pattern, channel, message) => {
    // channel = "conversation:abc123"
    const conversationId = channel.split(":")[1];
    const notification = JSON.parse(message);

    // Broadcast to all WebSocket clients in this conversation
    broadcastToConversation(conversationId, notification);
  });
}

Broadcasting to Clients

// src/services/websocket/handler.ts
const conversationClients = new Map<string, Set<WebSocket>>();

export function broadcastToConversation(
  conversationId: string,
  message: object
) {
  const clients = conversationClients.get(conversationId);
  if (!clients) return;

  const payload = JSON.stringify(message);
  
  for (const client of clients) {
    try {
      client.send(payload);
    } catch (e) {
      // Client disconnected, will be cleaned up
    }
  }
}

Error Handling

Client-Side Reconnection

function connectWebSocket(conversationId, token, maxRetries = 5) {
  let retries = 0;
  let ws;

  function connect() {
    ws = new WebSocket('ws://localhost:3000/api/ws');

    ws.onopen = () => {
      console.log('Connected');
      retries = 0; // Reset retry count
    };

    ws.onclose = (event) => {
      if (event.code === 4001) {
        console.error('Authentication timeout');
        return; // Don't reconnect on auth failure
      }

      // Exponential backoff reconnection
      if (retries < maxRetries) {
        const delay = Math.min(1000 * Math.pow(2, retries), 30000);
        console.log(`Reconnecting in ${delay}ms...`);
        
        setTimeout(() => {
          retries++;
          connect();
        }, delay);
      }
    };

    ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };
  }

  connect();
  return () => ws?.close();
}

Server-Side Cleanup

Periodically clean up dead connections:
// src/index.ts
import { cleanupDeadConnections } from "./services/websocket/handler";

// Clean up every 60 seconds
setInterval(() => {
  cleanupDeadConnections();
}, 60000);
// src/services/websocket/handler.ts
export function cleanupDeadConnections() {
  for (const [conversationId, clients] of conversationClients) {
    for (const client of clients) {
      // Check if client is still connected (readyState 1 = OPEN)
      if ((client as any).readyState !== 1) {
        clients.delete(client);
      }
    }
    
    // Remove empty conversation rooms
    if (clients.size === 0) {
      conversationClients.delete(conversationId);
    }
  }
}

Security Considerations

Security Best Practices:
  • Always require authentication before allowing subscriptions
  • Validate user ownership of conversations
  • Set connection timeouts to prevent resource exhaustion
  • Use WSS (WebSocket Secure) in production
  • Implement rate limiting for WebSocket messages
  • Clean up dead connections regularly

Monitoring

WebSocket events are logged with structured data:
// Connection events
logger.info({ userId }, "ws_client_connected_awaiting_auth");
logger.info({ userId }, "ws_client_authenticated");

// Subscription events
logger.info({ userId, conversationId }, "ws_client_subscribed");
logger.info({ userId, conversationId }, "ws_client_unsubscribed");

// Broadcast events
logger.info(
  { conversationId, successCount, errorCount },
  "ws_broadcast_completed"
);

// Error events
logger.warn("ws_auth_timeout");
logger.warn({ error }, "ws_invalid_message");

Testing WebSockets

Using wscat

# Install wscat
npm install -g wscat

# Connect to WebSocket
wscat -c ws://localhost:3000/api/ws

# Server sends ready message
# < {"type":"ready","message":"Send auth message with JWT token"}

# Send auth (dev mode)
# > {"action":"auth","userId":"user-123"}

# Server confirms
# < {"type":"authenticated","userId":"user-123"}

# Subscribe to conversation
# > {"action":"subscribe","conversationId":"conv-123"}

# Server confirms
# < {"type":"subscribed","conversationId":"conv-123"}

Browser Console

// Connect
const ws = new WebSocket('ws://localhost:3000/api/ws');

ws.onmessage = (event) => {
  console.log('Received:', JSON.parse(event.data));
};

// Wait for ready message, then auth
ws.send(JSON.stringify({
  action: 'auth',
  userId: 'user-123'
}));

// Subscribe
ws.send(JSON.stringify({
  action: 'subscribe',
  conversationId: 'conv-123'
}));

// Ping
ws.send(JSON.stringify({ action: 'ping' }));

Next Steps

Rate Limiting

Configure rate limits for WebSocket connections

Custom Agents

Send WebSocket notifications from custom agents

Job Queue

Learn about the BullMQ job queue system

API Routes

Explore HTTP API endpoints

Build docs developers (and LLMs) love