The MultiplexWebSocket class allows multiple subscriptions to share a single WebSocket connection to the same URL, improving efficiency and reducing the number of open connections. It includes automatic reconnection with exponential backoff and optional heartbeat monitoring.
Import
import { MultiplexWebSocket } from '@drift-labs/common';
Core Concepts
Multiplexing
Multiple subscriptions to the same WebSocket URL share a single underlying connection:
- Efficient: Only one connection per URL
- Automatic: Connection management is handled transparently
- Safe: Connection closes only when all subscriptions unsubscribe
Reconnection
Automatic reconnection with exponential backoff:
- Backoff: 1s, 2s, 4s, 8s (capped at 8s)
- Rate Limiting: Max 5 attempts within 60 seconds (configurable)
- Graceful Failure: Notifies all subscribers on max attempts
Heartbeat Monitoring
Optional connection health monitoring:
- Detection: Identifies dead connections
- Timeout: Configurable (default 11 seconds)
- Recovery: Automatic reconnection on timeout
Creating Subscriptions
createWebSocketSubscription
Create a new WebSocket subscription (or add to existing connection).
WebSocket URL to connect to
Unique identifier for this subscription
Message to send when subscribing (typically JSON)
Message to send when unsubscribing
onMessage
(message: T) => void
required
Callback for receiving messages
onError
(error?: any) => void
required
Callback for handling errors
Optional filter to process only specific messages
Optional filter to detect error messages
Optional callback when subscription closes
enableHeartbeatMonitoring
Enable heartbeat monitoring for connection health
Heartbeat timeout in milliseconds
Function to call to unsubscribe
Basic Usage
Simple Subscription
import { MultiplexWebSocket } from '@drift-labs/common';
const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc',
subscribeMessage: JSON.stringify({
action: 'subscribe',
channel: 'trades',
market: 'BTC-PERP'
}),
unsubscribeMessage: JSON.stringify({
action: 'unsubscribe',
channel: 'trades',
market: 'BTC-PERP'
}),
onMessage: (message) => {
console.log('Received trade:', message);
},
onError: (error) => {
console.error('WebSocket error:', error);
}
});
// Later, unsubscribe
unsubscribe();
Multiple Subscriptions (Same URL)
import { MultiplexWebSocket } from '@drift-labs/common';
// First subscription - creates new WebSocket
const tradesUnsub = MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
onMessage: (msg) => console.log('Trade:', msg),
onError: (err) => console.error('Trade error:', err)
});
// Second subscription - reuses existing WebSocket!
const orderbookUnsub = MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws', // Same URL
subscriptionId: 'orderbook-btc',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'orderbook' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'orderbook' }),
onMessage: (msg) => console.log('Orderbook:', msg),
onError: (err) => console.error('Orderbook error:', err)
});
// Both subscriptions share the same underlying WebSocket connection
Advanced Features
Message Filtering
Filter messages to process only relevant ones:
MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc-large',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
// Only process messages for BTC-PERP with size > 10
messageFilter: (message: any) => {
return message.market === 'BTC-PERP' && message.size > 10;
},
onMessage: (message) => {
console.log('Large BTC trade:', message);
},
onError: (error) => {
console.error('Error:', error);
}
});
Error Message Detection
Detect and handle error messages from the server:
MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
// Detect error messages from server
errorMessageFilter: (message: any) => {
return message.type === 'error' || message.error !== undefined;
},
onMessage: (message) => {
console.log('Trade:', message);
},
onError: (error) => {
console.error('Error detected:', error);
// Error callback will be triggered for error messages
}
});
Heartbeat Monitoring
Enable heartbeat monitoring to detect dead connections:
MultiplexWebSocket.createWebSocketSubscription({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
// Enable heartbeat monitoring
enableHeartbeatMonitoring: true,
heartbeatTimeoutMs: 15000, // 15 seconds
onMessage: (message) => {
console.log('Trade:', message);
},
onError: (error) => {
console.error('Error or timeout:', error);
}
});
Note: Heartbeat monitoring expects the server to send heartbeat messages with { channel: 'heartbeat' }.
Typed Messages
Use TypeScript generics for type-safe messages:
interface TradeMessage {
type: 'trade';
market: string;
price: number;
size: number;
timestamp: number;
}
const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription<TradeMessage>({
wsUrl: 'wss://api.example.com/ws',
subscriptionId: 'trades-btc',
subscribeMessage: JSON.stringify({ action: 'subscribe', channel: 'trades' }),
unsubscribeMessage: JSON.stringify({ action: 'unsubscribe', channel: 'trades' }),
onMessage: (message) => {
// message is typed as TradeMessage
console.log(`Trade at ${message.price} for ${message.size}`);
},
onError: (error) => {
console.error('Error:', error);
}
});
Complete Examples
Drift WebSocket Subscription
import { MultiplexWebSocket } from '@drift-labs/common';
interface DriftTradeMessage {
channel: 'trades';
marketIndex: number;
marketType: 'perp' | 'spot';
data: {
price: string;
size: string;
side: 'buy' | 'sell';
timestamp: number;
};
}
class DriftTradesSubscriber {
private unsubscribe?: () => void;
subscribe(marketIndex: number, marketType: 'perp' | 'spot') {
const subscriptionId = `trades-${marketType}-${marketIndex}`;
const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription<DriftTradeMessage>({
wsUrl: 'wss://master.dlob.drift.trade/ws',
subscriptionId,
subscribeMessage: JSON.stringify({
type: 'subscribe',
marketType,
channel: 'trades',
market: `${marketType}-${marketIndex}`
}),
unsubscribeMessage: JSON.stringify({
type: 'unsubscribe',
marketType,
channel: 'trades',
market: `${marketType}-${marketIndex}`
}),
messageFilter: (message) => {
return (
message.channel === 'trades' &&
message.marketIndex === marketIndex &&
message.marketType === marketType
);
},
onMessage: (message) => {
this.handleTrade(message.data);
},
onError: (error) => {
console.error('Trade subscription error:', error);
this.reconnect(marketIndex, marketType);
},
enableHeartbeatMonitoring: true,
heartbeatTimeoutMs: 11000
});
this.unsubscribe = unsubscribe;
}
private handleTrade(trade: any) {
console.log('New trade:', trade);
// Process trade data
}
private reconnect(marketIndex: number, marketType: 'perp' | 'spot') {
// Reconnection is automatic, but you can add custom logic here
console.log('Reconnecting...');
}
close() {
if (this.unsubscribe) {
this.unsubscribe();
this.unsubscribe = undefined;
}
}
}
// Usage
const subscriber = new DriftTradesSubscriber();
subscriber.subscribe(0, 'perp'); // Subscribe to SOL-PERP trades
// Later
subscriber.close();
Multi-Channel Subscription Manager
import { MultiplexWebSocket } from '@drift-labs/common';
class WebSocketManager {
private subscriptions = new Map<string, () => void>();
private wsUrl: string;
constructor(wsUrl: string) {
this.wsUrl = wsUrl;
}
subscribe(
channel: string,
params: Record<string, any>,
onMessage: (msg: any) => void,
onError: (err: any) => void
) {
const subscriptionId = `${channel}-${JSON.stringify(params)}`;
if (this.subscriptions.has(subscriptionId)) {
console.warn('Already subscribed to', subscriptionId);
return;
}
const { unsubscribe } = MultiplexWebSocket.createWebSocketSubscription({
wsUrl: this.wsUrl,
subscriptionId,
subscribeMessage: JSON.stringify({
action: 'subscribe',
channel,
...params
}),
unsubscribeMessage: JSON.stringify({
action: 'unsubscribe',
channel,
...params
}),
messageFilter: (message: any) => {
return message.channel === channel;
},
onMessage,
onError,
enableHeartbeatMonitoring: true
});
this.subscriptions.set(subscriptionId, unsubscribe);
}
unsubscribe(channel: string, params: Record<string, any>) {
const subscriptionId = `${channel}-${JSON.stringify(params)}`;
const unsubscribe = this.subscriptions.get(subscriptionId);
if (unsubscribe) {
unsubscribe();
this.subscriptions.delete(subscriptionId);
}
}
unsubscribeAll() {
for (const unsubscribe of this.subscriptions.values()) {
unsubscribe();
}
this.subscriptions.clear();
}
}
// Usage
const manager = new WebSocketManager('wss://api.example.com/ws');
// Subscribe to multiple channels
manager.subscribe(
'trades',
{ market: 'BTC-PERP' },
(msg) => console.log('BTC Trade:', msg),
(err) => console.error('BTC Error:', err)
);
manager.subscribe(
'trades',
{ market: 'ETH-PERP' },
(msg) => console.log('ETH Trade:', msg),
(err) => console.error('ETH Error:', err)
);
manager.subscribe(
'orderbook',
{ market: 'BTC-PERP', depth: 10 },
(msg) => console.log('BTC Orderbook:', msg),
(err) => console.error('Orderbook Error:', err)
);
// All three subscriptions share the same WebSocket connection!
// Later, clean up
manager.unsubscribeAll();
Connection Lifecycle
States
- CONNECTING: Initial connection being established
- CONNECTED: Connection open and ready
- DISCONNECTING: Connection closing
- DISCONNECTED: Connection closed
Automatic Reconnection
Reconnection occurs when:
- Connection closes unexpectedly
- Connection error occurs
- Heartbeat timeout (if enabled)
Reconnection strategy:
// Attempt 1: Reconnect after 1 second
// Attempt 2: Reconnect after 2 seconds
// Attempt 3: Reconnect after 4 seconds
// Attempt 4: Reconnect after 8 seconds
// Attempt 5: Reconnect after 8 seconds
// After 5 attempts in 60s window: Give up and notify subscribers
Delayed Close
When the last subscription unsubscribes, the connection waits 2 seconds before closing. This allows for quick resubscription without creating a new connection.
// User unsubscribes
unsubscribe();
// Connection stays open for 2 seconds
// If a new subscription is created within 2 seconds, connection is reused
// Otherwise, connection closes after 2 seconds
Reconnection Configuration
The ReconnectionManager can be configured:
// Default configuration
const maxAttemptsCount = 5; // Max 5 attempts
const maxAttemptsWindowMs = 60 * 1000; // Within 60 seconds
Connection Pooling
Subscriptions to the same URL share connections:
// ✅ Good: One connection
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });
// ❌ Bad: Two connections (different URLs)
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api.com/ws', ... });
MultiplexWebSocket.createWebSocketSubscription({ wsUrl: 'wss://api2.com/ws', ... });
Message Filtering
Use messageFilter for efficient message routing:
// ✅ Good: Filter at subscription level
messageFilter: (msg) => msg.market === 'BTC-PERP'
// ❌ Less efficient: Filter in callback
onMessage: (msg) => {
if (msg.market === 'BTC-PERP') {
// process
}
}
Source Code
Location: ~/workspace/source/common-ts/src/utils/MultiplexWebSocket.ts
Key classes:
MultiplexWebSocket (line 136) - Main multiplexing class
ReconnectionManager (line 53) - Handles reconnection logic