Skip to main content
The MultiplexWebSocket class allows multiple subscriptions to share a single WebSocket connection to the same URL, improving efficiency and reducing the number of open connections. It includes automatic reconnection with exponential backoff and optional heartbeat monitoring.

Import

import { MultiplexWebSocket } from '@drift-labs/common';

Core Concepts

Multiplexing

Multiple subscriptions to the same WebSocket URL share a single underlying connection:
  • Efficient: Only one connection per URL
  • Automatic: Connection management is handled transparently
  • Safe: Connection closes only when all subscriptions unsubscribe

Reconnection

Automatic reconnection with exponential backoff:
  • Backoff: 1s, 2s, 4s, 8s (capped at 8s)
  • Rate Limiting: Max 5 attempts within 60 seconds (configurable)
  • Graceful Failure: Notifies all subscribers on max attempts

Heartbeat Monitoring

Optional connection health monitoring:
  • Detection: Identifies dead connections
  • Timeout: Configurable (default 11 seconds)
  • Recovery: Automatic reconnection on timeout

Creating Subscriptions

createWebSocketSubscription

Create a new WebSocket subscription (or add to existing connection).
wsUrl
string
required
WebSocket URL to connect to
subscriptionId
string
required
Unique identifier for this subscription
subscribeMessage
string
required
Message to send when subscribing (typically JSON)
unsubscribeMessage
string
required
Message to send when unsubscribing
onMessage
(message: T) => void
required
Callback for receiving messages
onError
(error?: any) => void
required
Callback for handling errors
messageFilter
(message: T) => boolean
Optional filter to process only specific messages
errorMessageFilter
(message: T) => boolean
Optional filter to detect error messages
onClose
() => void
Optional callback when subscription closes
enableHeartbeatMonitoring
boolean
default:false
Enable heartbeat monitoring for connection health
heartbeatTimeoutMs
number
default:11000
Heartbeat timeout in milliseconds
unsubscribe
() => void
Function to call to unsubscribe

Basic Usage

Simple Subscription

import { MultiplexWebSocket } from '@drift-labs/common';

const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc',
  subscribeMessage: JSON.stringify({
    action: 'subscribe',
    channel: 'trades',
    market: 'BTC-PERP'
  }),
  unsubscribeMessage: JSON.stringify({
    action: 'unsubscribe',
    channel: 'trades',
    market: 'BTC-PERP'
  }),
  onMessage: (message) => {
    console.log('Received trade:', message);
  },
  onError: (error) => {
    console.error('WebSocket error:', error);
  }
});

// Later, unsubscribe
unsubscribe();

Multiple Subscriptions (Same URL)

import { MultiplexWebSocket } from '@drift-labs/common';

// First subscription - creates new WebSocket
const tradesUnsub = MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
  onMessage: (msg) => console.log('Trade:', msg),
  onError: (err) => console.error('Trade error:', err)
});

// Second subscription - reuses existing WebSocket!
const orderbookUnsub = MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws', // Same URL
  subscriptionId: 'orderbook-btc',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'orderbook' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'orderbook' }),
  onMessage: (msg) => console.log('Orderbook:', msg),
  onError: (err) => console.error('Orderbook error:', err)
});

// Both subscriptions share the same underlying WebSocket connection

Advanced Features

Message Filtering

Filter messages to process only relevant ones:
MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc-large',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
  
  // Only process messages for BTC-PERP with size > 10
  messageFilter: (message: any) => {
    return message.market === 'BTC-PERP' && message.size > 10;
  },
  
  onMessage: (message) => {
    console.log('Large BTC trade:', message);
  },
  
  onError: (error) => {
    console.error('Error:', error);
  }
});

Error Message Detection

Detect and handle error messages from the server:
MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
  
  // Detect error messages from server
  errorMessageFilter: (message: any) => {
    return message.type === 'error' || message.error !== undefined;
  },
  
  onMessage: (message) => {
    console.log('Trade:', message);
  },
  
  onError: (error) => {
    console.error('Error detected:', error);
    // Error callback will be triggered for error messages
  }
});

Heartbeat Monitoring

Enable heartbeat monitoring to detect dead connections:
MultiplexWebSocket.createWebSocketSubscription({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
  
  // Enable heartbeat monitoring
  enableHeartbeatMonitoring: true,
  heartbeatTimeoutMs: 15000, // 15 seconds
  
  onMessage: (message) => {
    console.log('Trade:', message);
  },
  
  onError: (error) => {
    console.error('Error or timeout:', error);
  }
});
Note: Heartbeat monitoring expects the server to send heartbeat messages with { channel: 'heartbeat' }.

Typed Messages

Use TypeScript generics for type-safe messages:
interface TradeMessage {
  type: 'trade';
  market: string;
  price: number;
  size: number;
  timestamp: number;
}

const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription<TradeMessage>({
  wsUrl: 'wss://api.example.com/ws',
  subscriptionId: 'trades-btc',
  subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
  unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
  
  onMessage: (message) => {
    // message is typed as TradeMessage
    console.log(`Trade at ${message.price} for ${message.size}`);
  },
  
  onError: (error) => {
    console.error('Error:', error);
  }
});

Complete Examples

Drift WebSocket Subscription

import { MultiplexWebSocket } from '@drift-labs/common';

interface DriftTradeMessage {
  channel: 'trades';
  marketIndex: number;
  marketType: 'perp' | 'spot';
  data: {
    price: string;
    size: string;
    side: 'buy' | 'sell';
    timestamp: number;
  };
}

class DriftTradesSubscriber {
  private unsubscribe?: () => void;

  subscribe(marketIndex: number, marketType: 'perp' | 'spot') {
    const subscriptionId = `trades-${marketType}-${marketIndex}`;
    
    const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription<DriftTradeMessage>({
      wsUrl: 'wss://master.dlob.drift.trade/ws',
      subscriptionId,
      
      subscribeMessage: JSON.stringify({
        type: 'subscribe',
        marketType,
        channel: 'trades',
        market: `${marketType}-${marketIndex}`
      }),
      
      unsubscribeMessage: JSON.stringify({
        type: 'unsubscribe',
        marketType,
        channel: 'trades',
        market: `${marketType}-${marketIndex}`
      }),
      
      messageFilter: (message) => {
        return (
          message.channel === 'trades' &&
          message.marketIndex === marketIndex &&
          message.marketType === marketType
        );
      },
      
      onMessage: (message) => {
        this.handleTrade(message.data);
      },
      
      onError: (error) => {
        console.error('Trade subscription error:', error);
        this.reconnect(marketIndex, marketType);
      },
      
      enableHeartbeatMonitoring: true,
      heartbeatTimeoutMs: 11000
    });
    
    this.unsubscribe = unsubscribe;
  }

  private handleTrade(trade: any) {
    console.log('New trade:', trade);
    // Process trade data
  }

  private reconnect(marketIndex: number, marketType: 'perp' | 'spot') {
    // Reconnection is automatic, but you can add custom logic here
    console.log('Reconnecting...');
  }

  close() {
    if (this.unsubscribe) {
      this.unsubscribe();
      this.unsubscribe = undefined;
    }
  }
}

// Usage
const subscriber = new DriftTradesSubscriber();
subscriber.subscribe(0, 'perp'); // Subscribe to SOL-PERP trades

// Later
subscriber.close();

Multi-Channel Subscription Manager

import { MultiplexWebSocket } from '@drift-labs/common';

class WebSocketManager {
  private subscriptions = new Map<string, () => void>();
  private wsUrl: string;

  constructor(wsUrl: string) {
    this.wsUrl = wsUrl;
  }

  subscribe(
    channel: string,
    params: Record<string, any>,
    onMessage: (msg: any) => void,
    onError: (err: any) => void
  ) {
    const subscriptionId = `${channel}-${JSON.stringify(params)}`;
    
    if (this.subscriptions.has(subscriptionId)) {
      console.warn('Already subscribed to', subscriptionId);
      return;
    }

    const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription({
      wsUrl: this.wsUrl,
      subscriptionId,
      
      subscribeMessage: JSON.stringify({
        action: 'subscribe',
        channel,
        ...params
      }),
      
      unsubscribeMessage: JSON.stringify({
        action: 'unsubscribe',
        channel,
        ...params
      }),
      
      messageFilter: (message: any) => {
        return message.channel === channel;
      },
      
      onMessage,
      onError,
      
      enableHeartbeatMonitoring: true
    });
    
    this.subscriptions.set(subscriptionId, unsubscribe);
  }

  unsubscribe(channel: string, params: Record<string, any>) {
    const subscriptionId = `${channel}-${JSON.stringify(params)}`;
    const unsubscribe = this.subscriptions.get(subscriptionId);
    
    if (unsubscribe) {
      unsubscribe();
      this.subscriptions.delete(subscriptionId);
    }
  }

  unsubscribeAll() {
    for (const unsubscribe of this.subscriptions.values()) {
      unsubscribe();
    }
    this.subscriptions.clear();
  }
}

// Usage
const manager = new WebSocketManager('wss://api.example.com/ws');

// Subscribe to multiple channels
manager.subscribe(
  'trades',
  { market: 'BTC-PERP' },
  (msg) => console.log('BTC Trade:', msg),
  (err) => console.error('BTC Error:', err)
);

manager.subscribe(
  'trades',
  { market: 'ETH-PERP' },
  (msg) => console.log('ETH Trade:', msg),
  (err) => console.error('ETH Error:', err)
);

manager.subscribe(
  'orderbook',
  { market: 'BTC-PERP', depth: 10 },
  (msg) => console.log('BTC Orderbook:', msg),
  (err) => console.error('Orderbook Error:', err)
);

// All three subscriptions share the same WebSocket connection!

// Later, clean up
manager.unsubscribeAll();

Connection Lifecycle

States

  1. CONNECTING: Initial connection being established
  2. CONNECTED: Connection open and ready
  3. DISCONNECTING: Connection closing
  4. DISCONNECTED: Connection closed

Automatic Reconnection

Reconnection occurs when:
  • Connection closes unexpectedly
  • Connection error occurs
  • Heartbeat timeout (if enabled)
Reconnection strategy:
// Attempt 1: Reconnect after 1 second
// Attempt 2: Reconnect after 2 seconds
// Attempt 3: Reconnect after 4 seconds
// Attempt 4: Reconnect after 8 seconds
// Attempt 5: Reconnect after 8 seconds
// After 5 attempts in 60s window: Give up and notify subscribers

Delayed Close

When the last subscription unsubscribes, the connection waits 2 seconds before closing. This allows for quick resubscription without creating a new connection.
// User unsubscribes
unsubscribe();

// Connection stays open for 2 seconds
// If a new subscription is created within 2 seconds, connection is reused
// Otherwise, connection closes after 2 seconds

Reconnection Configuration

The ReconnectionManager can be configured:
// Default configuration
const maxAttemptsCount = 5;           // Max 5 attempts
const maxAttemptsWindowMs = 60 * 1000; // Within 60 seconds

Performance Considerations

Connection Pooling

Subscriptions to the same URL share connections:
// ✅ Good: One connection
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });

// ❌ Bad: Two connections (different URLs)
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api2.com/ws', ... });

Message Filtering

Use messageFilter for efficient message routing:
// ✅ Good: Filter at subscription level
messageFilter: (msg) => msg.market === 'BTC-PERP'

// ❌ Less efficient: Filter in callback
onMessage: (msg) => {
  if (msg.market === 'BTC-PERP') {
    // process
  }
}

Source Code

Location: ~/workspace/source/common-ts/src/utils/MultiplexWebSocket.ts Key classes:
  • MultiplexWebSocket (line 136) - Main multiplexing class
  • ReconnectionManager (line 53) - Handles reconnection logic

Build docs developers (and LLMs) love