Skip to main content

Overview

RestAI uses Bun’s native WebSocket implementation combined with Redis Pub/Sub to deliver real-time updates across the system. This enables instant notifications for orders, table status changes, and kitchen operations.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                    WebSocket Clients                        │
├──────────────┬──────────────┬──────────────┬───────────────┤
│   Kitchen    │   Waiters    │   Cashiers   │   Customers   │
│   Display    │   (Mobile)   │    (POS)     │   (Web/QR)    │
└──────┬───────┴──────┬───────┴──────┬───────┴───────┬───────┘
       │              │              │               │
       └──────────────┴──────────────┴───────────────┘
                      │ WebSocket

       ┌──────────────────────────────────────┐
       │      API Server (:3001/ws)           │
       │  ┌────────────────────────────────┐  │
       │  │   WebSocketManager             │  │
       │  │  • Client tracking             │  │
       │  │  • Room management             │  │
       │  │  • Message routing             │  │
       │  └────────────┬───────────────────┘  │
       └───────────────┼──────────────────────┘
                       │ Pub/Sub

       ┌──────────────────────────────────────┐
       │         Redis Pub/Sub                │
       │  • branch:uuid                       │
       │  • table:uuid                        │
       │  • session:uuid                      │
       └──────────────────────────────────────┘

WebSocket Manager

The WebSocket manager handles client connections, room subscriptions, and message broadcasting.
import { redis, createSubscriber } from "../lib/redis.js";
import { logger } from "../lib/logger.js";

export interface WsClient {
  ws: any;
  rooms: Set<string>;
  userId?: string;
  sessionId?: string;
}

export class WebSocketManager {
  private clients = new Map<string, WsClient>();
  private rooms = new Map<string, Set<string>>();
  private subscriber;

  constructor() {
    this.subscriber = createSubscriber();
    this.setupSubscriber();
  }

  private setupSubscriber() {
    this.subscriber.on("message", (channel: string, message: string) => {
      this.broadcastToRoom(channel, message);
    });
  }

  getClient(id: string): WsClient | undefined {
    return this.clients.get(id);
  }

  addClient(id: string, ws: any, userId?: string, sessionId?: string) {
    this.clients.set(id, { ws, rooms: new Set(), userId, sessionId });
  }

  removeClient(id: string) {
    const client = this.clients.get(id);
    if (client) {
      for (const room of client.rooms) {
        this.leaveRoom(id, room);
      }
      this.clients.delete(id);
    }
  }

  async joinRoom(clientId: string, room: string) {
    const client = this.clients.get(clientId);
    if (!client) return;

    client.rooms.add(room);

    if (!this.rooms.has(room)) {
      this.rooms.set(room, new Set());
      await this.subscriber.subscribe(room);
    }
    this.rooms.get(room)!.add(clientId);
  }

  async leaveRoom(clientId: string, room: string) {
    const client = this.clients.get(clientId);
    if (client) client.rooms.delete(room);

    const roomClients = this.rooms.get(room);
    if (roomClients) {
      roomClients.delete(clientId);
      if (roomClients.size === 0) {
        this.rooms.delete(room);
        await this.subscriber.unsubscribe(room);
      }
    }
  }

  private broadcastToRoom(room: string, message: string) {
    const roomClients = this.rooms.get(room);
    if (!roomClients) return;

    for (const clientId of roomClients) {
      const client = this.clients.get(clientId);
      if (client?.ws?.readyState === 1) {
        client.ws.send(message);
      }
    }
  }

  async publish(room: string, data: object) {
    const payload = JSON.stringify(data);
    try {
      await redis.publish(room, payload);
    } catch (err: unknown) {
      const message = err instanceof Error ? err.message : "Unknown error";
      logger.error("WebSocket publish failed, falling back to local", {
        room,
        error: message,
      });
      // Fallback to local broadcast if Redis is unavailable
      this.broadcastToRoom(room, payload);
    }
  }
}

export const wsManager = new WebSocketManager();

Message Handlers

WebSocket messages are handled based on their type.
apps/api/src/ws/handlers.ts
import { verifyAccessToken } from "../lib/jwt.js";
import type { WebSocketManager } from "./manager.js";

export async function handleWsMessage(
  ws: any,
  rawMessage: string,
  manager: WebSocketManager,
) {
  let data: any;
  try {
    data = JSON.parse(rawMessage);
  } catch {
    ws.send(JSON.stringify({ type: "error", message: "Invalid JSON" }));
    return;
  }

  const clientId = (ws.data as any)?.id;

  switch (data.type) {
    case "auth": {
      if (!data.token) {
        ws.send(JSON.stringify({ type: "error", message: "Token required" }));
        return;
      }

      try {
        const payload: any = await verifyAccessToken(data.token);
        
        // Associate user with client
        const client = manager.getClient(clientId);
        if (client) {
          client.userId = payload.sub;
          client.sessionId = payload.sub;
        }

        // Auto-join relevant rooms based on role
        if (payload.role === "customer") {
          // Customer joins their specific rooms
          await manager.joinRoom(clientId, `branch:${payload.branch}`);
          await manager.joinRoom(clientId, `table:${payload.table}`);
          await manager.joinRoom(clientId, `session:${payload.sub}`);
        } else if (payload.branches) {
          // Staff joins all their branches
          for (const branchId of payload.branches) {
            await manager.joinRoom(clientId, `branch:${branchId}`);
          }
        }

        ws.send(JSON.stringify({ 
          type: "auth:success", 
          userId: payload.sub,
          timestamp: Date.now() 
        }));
      } catch {
        ws.send(JSON.stringify({ 
          type: "auth:error", 
          message: "Invalid token",
          timestamp: Date.now() 
        }));
      }
      break;
    }

    case "ping": {
      ws.send(JSON.stringify({ type: "pong", timestamp: Date.now() }));
      break;
    }

    default:
      ws.send(JSON.stringify({ 
        type: "error", 
        message: `Unknown type: ${data.type}` 
      }));
  }
}

Room-Based Broadcasting

RestAI uses a room-based architecture for targeted message delivery.

Room Types

Pattern: branch:{branch_id}Subscribers:
  • All staff assigned to the branch
  • Kitchen displays
  • POS terminals
Events:
  • New orders created
  • Order status changes
  • Table status updates
  • Menu item availability changes
// Broadcast to all branch staff
await wsManager.publish(`branch:${branchId}`, {
  type: "order:created",
  orderId: order.id,
  orderNumber: order.order_number,
  status: order.status,
});

Client Integration

React Hook for WebSocket

import { useEffect, useRef, useState } from "react";
import { useAuthStore } from "@/stores/auth";

export function useWebSocket() {
  const wsRef = useRef<WebSocket | null>(null);
  const [isConnected, setIsConnected] = useState(false);
  const { accessToken } = useAuthStore();

  useEffect(() => {
    if (!accessToken) return;

    const ws = new WebSocket("ws://localhost:3001/ws");

    ws.onopen = () => {
      console.log("WebSocket connected");
      setIsConnected(true);

      // Authenticate immediately
      ws.send(JSON.stringify({ type: "auth", token: accessToken }));
    };

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      console.log("WebSocket message:", data);

      // Dispatch custom events for components to listen to
      window.dispatchEvent(
        new CustomEvent("ws-message", { detail: data })
      );
    };

    ws.onerror = (error) => {
      console.error("WebSocket error:", error);
    };

    ws.onclose = () => {
      console.log("WebSocket disconnected");
      setIsConnected(false);
    };

    wsRef.current = ws;

    return () => {
      ws.close();
    };
  }, [accessToken]);

  return { isConnected, ws: wsRef.current };
}

Publishing Events from API

When data changes on the server, publish WebSocket events to notify connected clients.
import { wsManager } from "../ws/manager.js";
import { db } from "@restai/db";
import { orders } from "@restai/db/schema";

app.post("/orders", async (c) => {
  const data = await c.req.json();
  
  // Create order in database
  const [order] = await db.insert(orders).values(data).returning();

  // Broadcast to branch room
  await wsManager.publish(`branch:${order.branch_id}`, {
    type: "order:created",
    orderId: order.id,
    orderNumber: order.order_number,
    status: order.status,
    total: order.total,
    timestamp: Date.now(),
  });

  return c.json(order, 201);
});

app.patch("/orders/:id/status", async (c) => {
  const orderId = c.req.param("id");
  const { status } = await c.req.json();

  // Update order
  const [updated] = await db
    .update(orders)
    .set({ status, updated_at: new Date() })
    .where(eq(orders.id, orderId))
    .returning();

  // Broadcast status change
  await wsManager.publish(`branch:${updated.branch_id}`, {
    type: "order:updated",
    orderId: updated.id,
    status: updated.status,
    timestamp: Date.now(),
  });

  // If table session exists, notify table
  if (updated.table_session_id) {
    const session = await db.query.tableSessions.findFirst({
      where: eq(tableSessions.id, updated.table_session_id),
    });

    if (session) {
      await wsManager.publish(`table:${session.table_id}`, {
        type: "order:updated",
        orderId: updated.id,
        status: updated.status,
        timestamp: Date.now(),
      });
    }
  }

  return c.json(updated);
});

Event Types

Order Events

interface OrderCreatedEvent {
  type: "order:created";
  orderId: string;
  orderNumber: string;
  status: string;
  total: number;
  timestamp: number;
}

interface OrderUpdatedEvent {
  type: "order:updated";
  orderId: string;
  status: string;
  timestamp: number;
}

Table Events

interface TableOccupiedEvent {
  type: "table:occupied";
  tableId: string;
  sessionId: string;
  timestamp: number;
}

interface TableAvailableEvent {
  type: "table:available";
  tableId: string;
  timestamp: number;
}

Session Events

interface SessionApprovedEvent {
  type: "session:approved";
  sessionId: string;
  tableId: string;
  timestamp: number;
}

interface SessionRejectedEvent {
  type: "session:rejected";
  sessionId: string;
  reason?: string;
  timestamp: number;
}

Redis Configuration

apps/api/src/lib/redis.ts
import Redis from "ioredis";

export const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379", {
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
  retryStrategy(times) {
    const delay = Math.min(times * 50, 2000);
    return delay;
  },
});

export function createSubscriber() {
  return new Redis(process.env.REDIS_URL || "redis://localhost:6379", {
    lazyConnect: false,
  });
}

redis.on("error", (err) => {
  console.error("Redis error:", err);
});

redis.on("connect", () => {
  console.log("Redis connected");
});

Horizontal Scaling

Redis Pub/Sub enables multiple API server instances to share WebSocket events:
Client A ──► API Server 1 ──┐
                             ├──► Redis Pub/Sub ──┐
Client B ──► API Server 2 ──┘                     ├──► All Servers

Client C ──► API Server 3 ◄────────────────────────┘
When API Server 1 publishes an event, all servers (including 1, 2, and 3) receive it via Redis and broadcast to their connected clients.

Error Handling & Resilience

The system gracefully degrades if Redis is unavailable. Events are broadcast locally to clients connected to the same server instance.
// From WebSocketManager
async publish(room: string, data: object) {
  const payload = JSON.stringify(data);
  try {
    await redis.publish(room, payload);
  } catch (err: unknown) {
    // Fallback to local broadcast
    logger.error("Redis unavailable, using local broadcast");
    this.broadcastToRoom(room, payload);
  }
}

Best Practices

  1. Always Authenticate: Verify JWT tokens before joining rooms
  2. Room Scoping: Only join rooms the user has permission to access
  3. Heartbeat/Ping: Send periodic ping messages to keep connections alive
  4. Reconnection Logic: Implement exponential backoff for reconnections
  5. Event Versioning: Include version/timestamp in events for idempotency
  6. Clean Disconnects: Always unsubscribe from rooms on disconnect

Monitoring

// Track WebSocket metrics
const metrics = {
  totalClients: wsManager.clients.size,
  totalRooms: wsManager.rooms.size,
  messageRate: 0, // messages per second
};

// Log metrics periodically
setInterval(() => {
  logger.info("WebSocket metrics", metrics);
}, 60000);

Next Steps

System Overview

Learn about the overall system architecture

Database Schema

Understand the data model and relationships

API Reference

Explore REST API endpoints

Authentication

JWT authentication and WebSocket auth flow

Build docs developers (and LLMs) love