Learn how the Exchange streams real-time market data to clients using WebSocket and Redis pub/sub
The Exchange provides real-time market data to clients through WebSocket connections, enabling users to receive instant updates about trades, orderbook changes, and market tickers. The system uses Redis pub/sub as a message broker to distribute updates across multiple WebSocket server instances.
users: Maps user IDs to User structs containing their WebSocket connection
subscriptions: Maps user IDs to lists of subscription IDs they’re subscribed to
reverse_subscriptions: Maps subscription IDs to lists of user IDs subscribed to them
redis_connection: Connection to Redis for pub/sub operations
Why reverse subscriptions?
The reverse_subscriptions map enables efficient message delivery. When a Redis message arrives for trade.SOL_USDC, the manager can quickly look up all users subscribed to that channel and send them the update.Without this reverse mapping, we’d need to iterate through all users and check their subscriptions for every message, which would be O(n) instead of O(1).
pub async fn unsubscribe(&mut self, user_id: &str, message: WsMessage) { if message.method == "UNSUBSCRIBE" { let (subscription_type, asset_pair) = match message.parse_subscription() { Some(result) => result, None => { eprintln!("Invalid unsubscription format: {:?}", message.params); return; } }; let subscription_id = format!("{:?}.{:?}", subscription_type, asset_pair); if let Some(subscriptions) = self.subscriptions.get_mut(user_id) { subscriptions.retain(|id| id != &subscription_id); } if let Some(users) = self.reverse_subscriptions.get_mut(&subscription_id) { users.retain(|id| id != user_id); if users.is_empty() { self.reverse_subscriptions.remove(&subscription_id); self.redis_connection .unsubscribe(subscription_id.as_str()) .await .expect("Failed to unsubscribe in redis"); } } }}
The system automatically unsubscribes from Redis channels when the last user unsubscribes. This prevents unnecessary Redis subscriptions and reduces server load.
Orderbook depth changes are published after order matching:
engine/ws_stream.rs
async fn publish_ws_depth_updates( &mut self, market: String, price: Decimal, side: OrderSide, fills: &Vec<Fill>, redis_conn: &RedisManager,) { let orderbook = match self .orderbooks .iter_mut() .find(|orderbook| orderbook.ticker() == market) { Some(ob) => ob, None => { eprintln!("No matching orderbook found for market: {}", market); return; } }; let depth = orderbook.get_depth(); let depth_bids = depth.0; let depth_asks = depth.1; match side { OrderSide::BUY => { let updated_asks = depth_asks .into_iter() .filter(|ask| fills.iter().any(|fill| fill.price == ask.0)) .collect::<Vec<(Decimal, Decimal)>>(); let updated_bids = depth_bids .into_iter() .filter(|bid| bid.0 == price) .collect::<Vec<(Decimal, Decimal)>>(); let stream = format!("depth.{}", market); let data = serde_json::json!({ "e": "depth", "s": market, "b": updated_bids, "a": updated_asks, }); let ws_response = WsResponse { stream: stream.clone(), data, }; let ws_response_string = serde_json::to_string(&ws_response).unwrap(); let result = redis_conn .publish(stream.as_str(), ws_response_string) .await; if let Err(e) = result { eprintln!("Error publishing to redis: {}", e); } } // SELL side follows similar pattern }}
Depth updates only include price levels that changed. For buy orders, asks are filled (so their quantities change) and a new bid might be added at the order price. For sell orders, the opposite occurs.
Using Redis as a message broker provides several advantages:
Scalability: Multiple WebSocket servers can subscribe to the same channels
Decoupling: The matching engine doesn’t need to know about WebSocket connections
Reliability: Redis handles message buffering and delivery
Flexibility: Easy to add new message types or consumers
In a production deployment, you might run multiple instances of both the matching engine and WebSocket servers. Redis pub/sub enables this horizontal scaling without any code changes.