Skip to main content
The Exchange provides real-time market data to clients through WebSocket connections, enabling users to receive instant updates about trades, orderbook changes, and market tickers. The system uses Redis pub/sub as a message broker to distribute updates across multiple WebSocket server instances.

Architecture overview

The WebSocket streaming system consists of three main components:
  1. Matching Engine: Publishes market data updates to Redis channels
  2. Redis Pub/Sub: Acts as a message broker for distributing updates
  3. WebSocket Manager: Maintains client connections and subscriptions
┌─────────────────┐
│ Matching Engine │
│                 │
│ - Creates fills │
│ - Calculates    │
│   depth changes │
└────────┬────────┘
         │ Publish

┌─────────────────┐
│  Redis Pub/Sub  │
│                 │
│ - trade.SOL_USDC│
│ - depth.SOL_USDC│
└────────┬────────┘
         │ Subscribe

┌─────────────────┐
│  WS Manager     │
│                 │
│ - User conns    │
│ - Subscriptions │
└─────────────────┘

WebSocket manager

The WsManager struct manages all WebSocket connections and subscription mappings:
ws_manager.rs
pub struct WsManager {
    pub users: HashMap<String, User>,
    pub subscriptions: HashMap<String, Vec<String>>,
    pub reverse_subscriptions: HashMap<String, Vec<String>>,
    pub redis_connection: RedisManager,
}

Data structure explanation

  • users: Maps user IDs to User structs containing their WebSocket connection
  • subscriptions: Maps user IDs to lists of subscription IDs they’re subscribed to
  • reverse_subscriptions: Maps subscription IDs to lists of user IDs subscribed to them
  • redis_connection: Connection to Redis for pub/sub operations
The reverse_subscriptions map enables efficient message delivery. When a Redis message arrives for trade.SOL_USDC, the manager can quickly look up all users subscribed to that channel and send them the update.Without this reverse mapping, we’d need to iterate through all users and check their subscriptions for every message, which would be O(n) instead of O(1).

User connections

Each connected user is represented by a User struct:
user.rs
pub struct User {
    pub id: String,
    pub ws_stream: SplitSink<WebSocketStream<TcpStream>, Message>,
}

impl User {
    pub fn new(id: String, ws_stream: SplitSink<WebSocketStream<TcpStream>, Message>) -> Self {
        Self { id, ws_stream }
    }
}
The ws_stream is a SplitSink, which is the sending half of a WebSocket connection. This allows the manager to send messages to the client at any time.

Subscription mechanism

Clients subscribe to market data streams using a JSON message format:
{"method":"SUBSCRIBE","params":["trade.BTC_USDT"],"id":1}

Subscribe implementation

ws_manager.rs
pub async fn subscribe(&mut self, user_id: &str, message: WsMessage) {
    if message.method == "SUBSCRIBE" {
        let (subscription_type, asset_pair) = match message.parse_subscription() {
            Some(result) => result,
            None => {
                eprintln!("Invalid subscription format: {:?}", message.params);
                return;
            }
        };
        let subscription_id = format!("{:?}.{:?}", subscription_type, asset_pair);

        if let Some(subscriptions) = self.subscriptions.get_mut(user_id) {
            subscriptions.push(subscription_id.clone());
        } else {
            self.subscriptions
                .insert(user_id.to_string(), vec![subscription_id.clone()]);
        }

        if let Some(users) = self.reverse_subscriptions.get_mut(&subscription_id) {
            users.push(user_id.to_string());
        } else {
            self.reverse_subscriptions
                .insert(subscription_id.clone(), vec![user_id.to_string()]);

            self.redis_connection
                .subscribe(subscription_id.as_str())
                .await
                .expect("Failed to subscribe in redis");
        }
    }
}
  1. Parse the subscription message to extract type (trade/depth/ticker) and asset pair
  2. Create subscription ID by combining type and pair (e.g., “trade.BTC_USDT”)
  3. Add to user’s subscriptions in the forward mapping
  4. Add user to reverse subscriptions for this channel
  5. Subscribe to Redis channel if this is the first user subscribing to this channel (optimization)

Message parsing

The WsMessage struct provides parsing logic:
types.rs
pub struct WsMessage {
    pub method: String,
    pub params: Vec<String>,
    pub id: u32,
}

impl WsMessage {
    pub fn parse_subscription(&self) -> Option<(SubscriptionType, SupportedAssetPairs)> {
        if self.params.is_empty() {
            return None;
        }

        let subscription_id = &self.params[0];
        let parts: Vec<&str> = subscription_id.split('.').collect();

        if parts.len() != 2 {
            return None;
        }

        let subscription_type_str = parts[0];
        let asset_pair_str = parts[1];

        let subscription_type = SubscriptionType::from_str(subscription_type_str)?;
        let asset_pair = SupportedAssetPairs::from_str(asset_pair_str).ok()?;

        Some((subscription_type, asset_pair))
    }
}

Subscription types

The Exchange supports three types of market data subscriptions:
types.rs
pub enum SubscriptionType {
    depth,   // Orderbook depth updates
    trade,   // Trade executions
    ticker,  // Price ticker/statistics
}

impl SubscriptionType {
    pub fn from_str(s: &str) -> Option<Self> {
        match s {
            "depth" => Some(SubscriptionType::depth),
            "trade" => Some(SubscriptionType::trade),
            "ticker" => Some(SubscriptionType::ticker),
            _ => None,
        }
    }
}

Unsubscribe mechanism

Clients can unsubscribe from streams:
ws_manager.rs
pub async fn unsubscribe(&mut self, user_id: &str, message: WsMessage) {
    if message.method == "UNSUBSCRIBE" {
        let (subscription_type, asset_pair) = match message.parse_subscription() {
            Some(result) => result,
            None => {
                eprintln!("Invalid unsubscription format: {:?}", message.params);
                return;
            }
        };
        let subscription_id = format!("{:?}.{:?}", subscription_type, asset_pair);

        if let Some(subscriptions) = self.subscriptions.get_mut(user_id) {
            subscriptions.retain(|id| id != &subscription_id);
        }

        if let Some(users) = self.reverse_subscriptions.get_mut(&subscription_id) {
            users.retain(|id| id != user_id);

            if users.is_empty() {
                self.reverse_subscriptions.remove(&subscription_id);
                self.redis_connection
                    .unsubscribe(subscription_id.as_str())
                    .await
                    .expect("Failed to unsubscribe in redis");
            }
        }
    }
}
The system automatically unsubscribes from Redis channels when the last user unsubscribes. This prevents unnecessary Redis subscriptions and reduces server load.

Connection management

Users are added and removed from the manager as they connect and disconnect:
ws_manager.rs
pub fn add_user(&mut self, user: User) {
    self.users.insert(user.id.clone(), user);
}

pub fn remove_user(&mut self, id: &str) {
    self.users.remove(id);
    self.subscriptions.remove(id);

    for (_, subscriptions) in self.reverse_subscriptions.iter_mut() {
        subscriptions.retain(|user_id| user_id != id);
    }
}
When a user disconnects, they’re removed from all subscription lists to prevent sending messages to closed connections.

Publishing trade updates

The matching engine publishes trade events to Redis:
engine/ws_stream.rs
async fn publish_ws_trades(
    &self,
    market: String,
    user_id: String,
    fills: &Vec<Fill>,
    timestamp: i64,
    redis_conn: &RedisManager,
) {
    for fill in fills.iter() {
        let stream = format!("trade.{}", market);
        let data = serde_json::json!({
            "e": "trade",
            "t": fill.trade_id,
            "m": fill.other_user_id == user_id,
            "p": fill.price,
            "q": fill.quantity,
            "s": market,
            "T": timestamp,
        });

        let ws_response = WsResponse {
            stream: stream.clone(),
            data,
        };
        let ws_response_string = serde_json::to_string(&ws_response).unwrap();

        let result = redis_conn
            .publish(stream.as_str(), ws_response_string)
            .await;

        if let Err(e) = result {
            eprintln!("Error publishing to redis: {}", e);
        }
    }
}

Trade message format

Each trade generates a message with:
  • e: Event type (“trade”)
  • t: Trade ID
  • m: Maker flag (true if the counterparty was the maker)
  • p: Execution price
  • q: Filled quantity
  • s: Symbol/market
  • T: Timestamp

Publishing depth updates

Orderbook depth changes are published after order matching:
engine/ws_stream.rs
async fn publish_ws_depth_updates(
    &mut self,
    market: String,
    price: Decimal,
    side: OrderSide,
    fills: &Vec<Fill>,
    redis_conn: &RedisManager,
) {
    let orderbook = match self
        .orderbooks
        .iter_mut()
        .find(|orderbook| orderbook.ticker() == market)
    {
        Some(ob) => ob,
        None => {
            eprintln!("No matching orderbook found for market: {}", market);
            return;
        }
    };

    let depth = orderbook.get_depth();
    let depth_bids = depth.0;
    let depth_asks = depth.1;

    match side {
        OrderSide::BUY => {
            let updated_asks = depth_asks
                .into_iter()
                .filter(|ask| fills.iter().any(|fill| fill.price == ask.0))
                .collect::<Vec<(Decimal, Decimal)>>();
            let updated_bids = depth_bids
                .into_iter()
                .filter(|bid| bid.0 == price)
                .collect::<Vec<(Decimal, Decimal)>>();

            let stream = format!("depth.{}", market);
            let data = serde_json::json!({
                "e": "depth",
                "s": market,
                "b": updated_bids,
                "a": updated_asks,
            });

            let ws_response = WsResponse {
                stream: stream.clone(),
                data,
            };

            let ws_response_string = serde_json::to_string(&ws_response).unwrap();

            let result = redis_conn
                .publish(stream.as_str(), ws_response_string)
                .await;

            if let Err(e) = result {
                eprintln!("Error publishing to redis: {}", e);
            }
        }
        // SELL side follows similar pattern
    }
}
Depth updates only include price levels that changed. For buy orders, asks are filled (so their quantities change) and a new bid might be added at the order price. For sell orders, the opposite occurs.

Message delivery

When Redis messages arrive, the manager distributes them to subscribed clients:
ws_manager.rs
pub async fn send_to_ws_stream(&mut self, message: String) {
    let ws_message: WsResponse = serde_json::from_str(message.as_str()).unwrap();

    if let Some(users) = self.reverse_subscriptions.get(ws_message.stream.as_str()) {
        for user_id in users {
            if let Some(user) = self.users.get_mut(user_id) {
                let user_ws_stream = &mut user.ws_stream;
                user_ws_stream
                    .send(Message::Text(message.clone()))
                    .await
                    .unwrap();
            }
        }
    }
}
This function:
  1. Parses the Redis message to extract the stream name
  2. Looks up all users subscribed to that stream
  3. Sends the message to each user’s WebSocket connection

WebSocket response format

All messages sent to clients follow this structure:
types.rs
pub struct WsResponse {
    pub stream: String,
    pub data: serde_json::Value,
}
Example message:
{
  "stream": "trade.SOL_USDC",
  "data": {
    "e": "trade",
    "t": 12345,
    "m": false,
    "p": "150.25",
    "q": "10.5",
    "s": "SOL_USDC",
    "T": 1638360000000
  }
}

Redis pub/sub benefits

Using Redis as a message broker provides several advantages:
  • Scalability: Multiple WebSocket servers can subscribe to the same channels
  • Decoupling: The matching engine doesn’t need to know about WebSocket connections
  • Reliability: Redis handles message buffering and delivery
  • Flexibility: Easy to add new message types or consumers
In a production deployment, you might run multiple instances of both the matching engine and WebSocket servers. Redis pub/sub enables this horizontal scaling without any code changes.

Supported asset pairs

The system supports specific trading pairs:
types.rs
pub enum SupportedAssetPairs {
    BTC_USDT,
    ETH_USDT,
    SOL_USDT,
    SOL_USDC,
}

impl SupportedAssetPairs {
    pub fn from_str(asset_pair_str: &str) -> Result<SupportedAssetPairs, &'static str> {
        match asset_pair_str {
            "BTC_USDT" => Ok(SupportedAssetPairs::BTC_USDT),
            "ETH_USDT" => Ok(SupportedAssetPairs::ETH_USDT),
            "SOL_USDT" => Ok(SupportedAssetPairs::SOL_USDT),
            "SOL_USDC" => Ok(SupportedAssetPairs::SOL_USDC),
            _ => Err("Unsupported asset pair"),
        }
    }
}

Performance considerations

  • Message batching: The system sends individual messages for each trade/update. Production systems might batch updates for efficiency.
  • Subscription optimization: The manager only subscribes to Redis channels when at least one user needs them.
  • Connection pooling: Each user maintains a persistent WebSocket connection, avoiding connection overhead.
  • Async operations: All I/O operations are asynchronous, preventing blocking and enabling high concurrency.

Build docs developers (and LLMs) love