Overview
BioAgents uses WebSockets to provide real-time notifications following the “Notify + Fetch” pattern :
WebSocket sends lightweight notifications (job status changes)
Client fetches actual data via HTTP after notification
Reduces bandwidth and improves scalability
Architecture
┌─────────────┐
│ Client │
│ (Browser) │
└──────┬──────┘
│
│ 1. WebSocket /api/ws
│ 2. Auth with JWT
│ 3. Subscribe to conversation
▼
┌─────────────┐ ┌─────────────┐
│ API Server │────▶│ Redis │
│ (Elysia) │ │ (Pub/Sub) │
└─────────────┘ └──────┬──────┘
│
│ Notifications
▼
┌─────────────┐
│ Worker │
│ (BullMQ) │
└─────────────┘
WebSockets are only available when USE_JOB_QUEUE=true (requires Redis).
WebSocket Handler
Connection Endpoint
ws://localhost:3000/api/ws
Authentication Flow
Connect to WebSocket
Client establishes WebSocket connection: const ws = new WebSocket ( 'ws://localhost:3000/api/ws' );
Server Sends Ready Message
Server responds with ready message: {
"type" : "ready" ,
"message" : "Send auth message with JWT token"
}
Client Sends Auth
Client must authenticate within 10 seconds: Production (JWT): {
"action" : "auth" ,
"token" : "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
}
Development (AUTH_MODE=none): {
"action" : "auth" ,
"userId" : "user-uuid-123"
}
Server Confirms Authentication
Server responds with confirmation: {
"type" : "authenticated" ,
"userId" : "user-uuid-123"
}
Authentication Timeout : Connections that don’t authenticate within 10 seconds are automatically closed with code 4001.
Implementation
The WebSocket handler is implemented in Elysia:
// src/services/websocket/handler.ts
import { Elysia } from "elysia" ;
import { verifyJWT } from "../jwt" ;
import logger from "../../utils/logger" ;
const AUTH_TIMEOUT_MS = 10000 ;
export const websocketHandler = new Elysia (). ws ( "/api/ws" , {
async open ( ws ) {
// Initialize connection state
( ws . data as any ). userId = null ;
( ws . data as any ). subscriptions = new Set < string >();
// Set authentication timeout
const timeout = setTimeout (() => {
if ( ! ( ws . data as any ). userId ) {
ws . send ( JSON . stringify ({
type: "error" ,
message: "Authentication timeout"
}));
ws . close ( 4001 , "Authentication timeout" );
}
}, AUTH_TIMEOUT_MS );
// Send ready message
ws . send ( JSON . stringify ({
type: "ready" ,
message: "Send auth message with JWT token"
}));
},
async message ( ws , message ) {
const data = JSON . parse ( message );
// Handle authentication
if ( data . action === "auth" ) {
// Verify JWT and set userId
// ...
}
// Handle subscriptions (after auth)
if ( data . action === "subscribe" ) {
// Subscribe to conversation
// ...
}
},
close ( ws ) {
// Cleanup subscriptions
// ...
},
});
Subscribing to Conversations
Subscribe
After authentication, subscribe to conversation updates:
ws . send ( JSON . stringify ({
action: 'subscribe' ,
conversationId: 'conversation-uuid-123'
}));
Server confirms subscription:
{
"type" : "subscribed" ,
"conversationId" : "conversation-uuid-123"
}
Unsubscribe
ws . send ( JSON . stringify ({
action: 'unsubscribe' ,
conversationId: 'conversation-uuid-123'
}));
Server confirms:
{
"type" : "unsubscribed" ,
"conversationId" : "conversation-uuid-123"
}
Access Control
Server validates that user owns the conversation before allowing subscription:
// Check user access
const { getUserConversations } = await import ( "../../db/operations" );
const userConversations = await getUserConversations ( userId );
const allowedConversations = new Set ( userConversations . map ( c => c . id ));
if ( ! allowedConversations . has ( conversationId )) {
ws . send ( JSON . stringify ({
type: "error" ,
message: "Access denied to conversation"
}));
return ;
}
Notification Types
Job Status Updates
{
"type" : "job-status" ,
"jobId" : "job-123" ,
"status" : "completed" ,
"conversationId" : "conversation-uuid-123" ,
"messageId" : "message-uuid-456"
}
Possible status values:
waiting - Job queued
active - Job processing
completed - Job finished successfully
failed - Job failed with error
Deep Research Progress
{
"type" : "research-progress" ,
"jobId" : "job-123" ,
"conversationId" : "conversation-uuid-123" ,
"messageId" : "message-uuid-456" ,
"progress" : {
"currentLevel" : 1 ,
"completedTasks" : 3 ,
"totalTasks" : 5 ,
"phase" : "execution"
}
}
Task Completion
{
"type" : "task-completed" ,
"jobId" : "job-123" ,
"conversationId" : "conversation-uuid-123" ,
"taskId" : "task-abc" ,
"taskType" : "LITERATURE"
}
Client Implementation
React/Preact Example
import { useEffect , useState } from 'preact/hooks' ;
interface WebSocketNotification {
type : string ;
jobId ?: string ;
status ?: string ;
conversationId ?: string ;
}
export function useWebSocket ( conversationId : string , token : string ) {
const [ notifications , setNotifications ] = useState < WebSocketNotification []>([]);
const [ connected , setConnected ] = useState ( false );
useEffect (() => {
const ws = new WebSocket ( 'ws://localhost:3000/api/ws' );
ws . onopen = () => {
console . log ( 'WebSocket connected' );
};
ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
// Handle ready message
if ( data . type === 'ready' ) {
// Send authentication
ws . send ( JSON . stringify ({
action: 'auth' ,
token: token ,
}));
}
// Handle authenticated
if ( data . type === 'authenticated' ) {
setConnected ( true );
// Subscribe to conversation
ws . send ( JSON . stringify ({
action: 'subscribe' ,
conversationId: conversationId ,
}));
}
// Handle notifications
if ( data . type === 'job-status' || data . type === 'research-progress' ) {
setNotifications ( prev => [ ... prev , data ]);
// Fetch updated data via HTTP
if ( data . type === 'job-status' && data . status === 'completed' ) {
fetchMessageUpdate ( data . messageId );
}
}
};
ws . onerror = ( error ) => {
console . error ( 'WebSocket error:' , error );
};
ws . onclose = () => {
console . log ( 'WebSocket disconnected' );
setConnected ( false );
};
// Cleanup on unmount
return () => {
ws . close ();
};
}, [ conversationId , token ]);
return { notifications , connected };
}
async function fetchMessageUpdate ( messageId : string ) {
const response = await fetch ( `/api/messages/ ${ messageId } ` );
const message = await response . json ();
// Update UI with fresh message data
}
Heartbeat/Ping-Pong
Keep connection alive with ping-pong:
// Client sends ping every 30 seconds
setInterval (() => {
if ( ws . readyState === WebSocket . OPEN ) {
ws . send ( JSON . stringify ({ action: 'ping' }));
}
}, 30000 );
// Server responds with pong
ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
if ( data . type === 'pong' ) {
console . log ( 'Pong received' );
}
};
Redis Pub/Sub Integration
Workers publish notifications to Redis, which the API server subscribes to:
Worker Side (Publishing)
// src/services/queue/notify.ts
import { getPublisher } from "./connection" ;
export async function notifyJobStatus (
conversationId : string ,
notification : object
) {
const publisher = getPublisher ();
const channel = `conversation: ${ conversationId } ` ;
await publisher . publish ( channel , JSON . stringify ( notification ));
}
Example usage in worker:
// Worker job handler
await notifyJobStatus ( conversationId , {
type: 'job-status' ,
jobId: job . id ,
status: 'completed' ,
conversationId ,
messageId ,
});
API Server Side (Subscribing)
// src/services/websocket/subscribe.ts
import { getSubscriber } from "../queue/connection" ;
import { broadcastToConversation } from "./handler" ;
export async function startRedisSubscription () {
const subscriber = getSubscriber ();
// Subscribe to all conversation channels
await subscriber . psubscribe ( "conversation:*" );
subscriber . on ( "pmessage" , ( pattern , channel , message ) => {
// channel = "conversation:abc123"
const conversationId = channel . split ( ":" )[ 1 ];
const notification = JSON . parse ( message );
// Broadcast to all WebSocket clients in this conversation
broadcastToConversation ( conversationId , notification );
});
}
Broadcasting to Clients
// src/services/websocket/handler.ts
const conversationClients = new Map < string , Set < WebSocket >>();
export function broadcastToConversation (
conversationId : string ,
message : object
) {
const clients = conversationClients . get ( conversationId );
if ( ! clients ) return ;
const payload = JSON . stringify ( message );
for ( const client of clients ) {
try {
client . send ( payload );
} catch ( e ) {
// Client disconnected, will be cleaned up
}
}
}
Error Handling
Client-Side Reconnection
function connectWebSocket ( conversationId , token , maxRetries = 5 ) {
let retries = 0 ;
let ws ;
function connect () {
ws = new WebSocket ( 'ws://localhost:3000/api/ws' );
ws . onopen = () => {
console . log ( 'Connected' );
retries = 0 ; // Reset retry count
};
ws . onclose = ( event ) => {
if ( event . code === 4001 ) {
console . error ( 'Authentication timeout' );
return ; // Don't reconnect on auth failure
}
// Exponential backoff reconnection
if ( retries < maxRetries ) {
const delay = Math . min ( 1000 * Math . pow ( 2 , retries ), 30000 );
console . log ( `Reconnecting in ${ delay } ms...` );
setTimeout (() => {
retries ++ ;
connect ();
}, delay );
}
};
ws . onerror = ( error ) => {
console . error ( 'WebSocket error:' , error );
};
}
connect ();
return () => ws ?. close ();
}
Server-Side Cleanup
Periodically clean up dead connections:
// src/index.ts
import { cleanupDeadConnections } from "./services/websocket/handler" ;
// Clean up every 60 seconds
setInterval (() => {
cleanupDeadConnections ();
}, 60000 );
// src/services/websocket/handler.ts
export function cleanupDeadConnections () {
for ( const [ conversationId , clients ] of conversationClients ) {
for ( const client of clients ) {
// Check if client is still connected (readyState 1 = OPEN)
if (( client as any ). readyState !== 1 ) {
clients . delete ( client );
}
}
// Remove empty conversation rooms
if ( clients . size === 0 ) {
conversationClients . delete ( conversationId );
}
}
}
Security Considerations
Security Best Practices:
Always require authentication before allowing subscriptions
Validate user ownership of conversations
Set connection timeouts to prevent resource exhaustion
Use WSS (WebSocket Secure) in production
Implement rate limiting for WebSocket messages
Clean up dead connections regularly
Monitoring
WebSocket events are logged with structured data:
// Connection events
logger . info ({ userId }, "ws_client_connected_awaiting_auth" );
logger . info ({ userId }, "ws_client_authenticated" );
// Subscription events
logger . info ({ userId , conversationId }, "ws_client_subscribed" );
logger . info ({ userId , conversationId }, "ws_client_unsubscribed" );
// Broadcast events
logger . info (
{ conversationId , successCount , errorCount },
"ws_broadcast_completed"
);
// Error events
logger . warn ( "ws_auth_timeout" );
logger . warn ({ error }, "ws_invalid_message" );
Testing WebSockets
Using wscat
# Install wscat
npm install -g wscat
# Connect to WebSocket
wscat -c ws://localhost:3000/api/ws
# Server sends ready message
# < {"type":"ready","message":"Send auth message with JWT token"}
# Send auth (dev mode)
# > {"action":"auth","userId":"user-123"}
# Server confirms
# < {"type":"authenticated","userId":"user-123"}
# Subscribe to conversation
# > {"action":"subscribe","conversationId":"conv-123"}
# Server confirms
# < {"type":"subscribed","conversationId":"conv-123"}
Browser Console
// Connect
const ws = new WebSocket ( 'ws://localhost:3000/api/ws' );
ws . onmessage = ( event ) => {
console . log ( 'Received:' , JSON . parse ( event . data ));
};
// Wait for ready message, then auth
ws . send ( JSON . stringify ({
action: 'auth' ,
userId: 'user-123'
}));
// Subscribe
ws . send ( JSON . stringify ({
action: 'subscribe' ,
conversationId: 'conv-123'
}));
// Ping
ws . send ( JSON . stringify ({ action: 'ping' }));
Next Steps
Rate Limiting Configure rate limits for WebSocket connections
Custom Agents Send WebSocket notifications from custom agents
Job Queue Learn about the BullMQ job queue system
API Routes Explore HTTP API endpoints