Skip to main content

WebSocket Server

The WebSocket server is implemented using Bun’s native WebSocket support in src/index.ts.

Server Configuration

WebSocket Endpoint: ws://localhost:3000/ws The server upgrades HTTP connections to WebSocket when clients connect to the /ws path:
if (url.pathname === "/ws") {
  const success = server.upgrade(req);
  if (success) {
    return undefined;
  }
  return new Response("WebSocket upgrade failed", { status: 500 });
}
Source: src/index.ts:27-33

WebSocket Event Handlers

Connection Open (src/index.ts:54-67)

When a client connects:
open(ws: any) {
  clients.add(ws);
  console.log("WebSocket client connected. Total:", clients.size);

  // Send all existing changes to the new client
  if (allChanges.length > 0) {
    ws.send(
      JSON.stringify({
        type: "initial",
        data: allChanges,
      })
    );
  }
}
New clients immediately receive all accumulated changes via the “initial” message type, ensuring they see the complete history.

Message Received (src/index.ts:50-53)

message(ws: any, message: string | Buffer) {
  // Clients can send messages if needed
  console.log("Message received:", message);
}
Currently, the server only logs client messages. This handler could be extended to support client commands like filtering or requesting specific data ranges.

Connection Close (src/index.ts:68-71)

close(ws: any) {
  clients.delete(ws);
  console.log("WebSocket client disconnected. Total:", clients.size);
}

Broadcasting Changes

The broadcast() function sends changes to all connected clients (src/index.ts:12-19):
function broadcast(data: any) {
  const message = JSON.stringify(data);
  clients.forEach((client: any) => {
    if (client.readyState === 1) { // OPEN
      client.send(message);
    }
  });
}
The readyState === 1 check ensures messages are only sent to clients with an OPEN connection, preventing errors when clients are connecting or disconnecting.
Usage (src/index.ts:110-114):
broadcast({
  type: "change",
  data: newChanges,
  total: allChanges.length,
});

WebSocket Client

The React frontend implements a robust WebSocket client with auto-reconnection in src/App.tsx.

Connection Management

Initial Connection (src/App.tsx:44-46)

// Connect to WebSocket
ws = new WebSocket("ws://localhost:3000/ws");
wsRef.current = ws;

Connection State Tracking

The client maintains connection state via React hooks:
const [connected, setConnected] = useState(false);
const wsRef = useRef<WebSocket | null>(null);
Source: src/App.tsx:19-21

Event Handlers

onopen (src/App.tsx:48-53)

ws.onopen = () => {
  if (isMounted) {
    console.log("Connected to WebSocket server");
    setConnected(true);
  }
};

onmessage (src/App.tsx:55-75)

ws.onmessage = (event) => {
  if (!isMounted) return;
  
  try {
    const message: WebSocketMessage = JSON.parse(event.data);

    if (message.type === "initial") {
      // Initial load of all changes
      setChanges(message.data);
      setTotalChanges(message.data.length);
    } else if (message.type === "change") {
      // New changes - add to state
      setChanges((prev) => [...prev, ...message.data]);
      if (message.total) {
        setTotalChanges(message.total);
      }
    }
  } catch (error) {
    console.error("Error parsing WebSocket message:", error);
  }
};
The client handles two message types:
  • initial: Complete history sent on connection
  • change: Incremental updates for new database changes

onerror (src/App.tsx:77-82)

ws.onerror = (error) => {
  if (isMounted) {
    console.error("WebSocket error:", error);
    setConnected(false);
  }
};

onclose with Auto-Reconnection (src/App.tsx:84-99)

ws.onclose = (event) => {
  if (!isMounted) return;
  
  console.log("Disconnected from WebSocket server", event.code, event.reason);
  setConnected(false);
  wsRef.current = null;

  // Try to reconnect after 3 seconds only if it wasn't an intentional close
  if (event.code !== 1000 && isMounted) {
    reconnectTimeout = setTimeout(() => {
      if (isMounted && (!wsRef.current || wsRef.current.readyState === WebSocket.CLOSED)) {
        connect();
      }
    }, 3000);
  }
};
Auto-reconnection is triggered for all close events except code 1000 (normal closure), ensuring the client reconnects after network issues or server restarts.

Connection Lifecycle Management

The client prevents multiple concurrent connections (src/App.tsx:30-42):
// Avoid creating multiple connections
if (wsRef.current?.readyState === WebSocket.OPEN || 
    wsRef.current?.readyState === WebSocket.CONNECTING) {
  return;
}

// Clean up previous connection if it exists
if (wsRef.current) {
  try {
    wsRef.current.close();
  } catch (e) {
    // Ignore errors when closing
  }
}

Cleanup on Unmount (src/App.tsx:105-122)

return () => {
  isMounted = false;
  
  // Clear reconnection timeout
  if (reconnectTimeout) {
    clearTimeout(reconnectTimeout);
  }

  // Close WebSocket connection
  if (wsRef.current) {
    try {
      wsRef.current.close(1000, "Component unmounting");
    } catch (e) {
      // Ignore errors
    }
    wsRef.current = null;
  }
};
The cleanup function uses close code 1000 to indicate normal closure, preventing auto-reconnection when the component unmounts.

Message Protocol

Server to Client Messages

Initial State Message

Sent when a client first connects:
{
  type: "initial",
  data: Change[]  // Array of all changes
}
Example:
{
  "type": "initial",
  "data": [
    {
      "operation": "INSERT",
      "table": "public.users",
      "id": 1,
      "name": "Alice",
      "email": "[email protected]"
    },
    {
      "operation": "UPDATE",
      "table": "public.users",
      "id": 1,
      "name": "Alice Smith",
      "email": "[email protected]"
    }
  ]
}

Change Event Message

Sent when database changes occur:
{
  type: "change",
  data: Change[],  // Array of new changes (usually one)
  total: number    // Total number of changes accumulated
}
Example:
{
  "type": "change",
  "data": [
    {
      "operation": "DELETE",
      "table": "public.users",
      "id": 1
    }
  ],
  "total": 3
}

Change Object Structure

Each change object includes:
interface Change {
  operation: "INSERT" | "UPDATE" | "DELETE";
  table: string;  // Format: "schema.table_name"
  [key: string]: any;  // All column values from the row
}
Field Details:
  • operation: The SQL command type (INSERT, UPDATE, DELETE)
  • table: Fully qualified table name with schema
  • …columns: All column values from the affected row
For UPDATE operations, only the new values are included. For DELETE operations, the row data may be limited based on PostgreSQL’s REPLICA IDENTITY setting.

Connection Status UI

The UI displays real-time connection status (src/App.tsx:137-160):
<div style={{
  width: "10px",
  height: "10px",
  borderRadius: "50%",
  backgroundColor: connected ? "#10b981" : "#ef4444",
  boxShadow: connected ? "0 0 10px #10b981" : "none",
  transition: "all 0.3s",
}} />
<span>{connected ? "Connected" : "Disconnected"}</span>

Performance Characteristics

Server-Side

  • Broadcasting Complexity: O(n) where n = number of connected clients
  • Memory Usage: O(m) where m = total number of changes (unbounded growth)
  • Connection Limit: Bounded by Bun’s WebSocket capacity

Client-Side

  • Message Parsing: O(1) for individual messages
  • State Update: O(k) where k = number of new changes per message
  • Re-render: Optimized by React’s virtual DOM diffing

Best Practices

Connection Management

Always clean up WebSocket connections in React’s cleanup function to prevent memory leaks

Error Handling

Implement exponential backoff for reconnection attempts in production

Message Validation

Validate incoming messages with proper TypeScript interfaces

State Management

Use functional updates (prev => ...) to avoid stale closure issues

Potential Enhancements

  1. Heartbeat Mechanism: Add periodic ping/pong to detect stale connections
  2. Message Acknowledgment: Implement ack system to ensure reliable delivery
  3. Client-Side Filtering: Allow clients to subscribe to specific tables or operations
  4. Compression: Use WebSocket compression for large change sets
  5. Authentication: Add token-based auth during WebSocket upgrade
  6. Rate Limiting: Prevent clients from overwhelming the server with messages

Architecture Overview

Understand the complete system architecture

Database Subscription

Learn how PostgreSQL changes are captured

Build docs developers (and LLMs) love