Skip to main content

Overview

The Price Poller service is the market data gateway for the Exness Trading Platform. It connects to Binance’s WebSocket API to receive real-time trade data, calculates bid/ask prices with spreads, and distributes this information to other services via Redis Pub/Sub (for real-time updates) and Redis Streams (for Engine processing). Location: apps/Price_Poller/src/index.ts:1 Data Source: Binance WebSocket API Update Frequency: Real-time trades + 3-second batch updates

Key Features

  • Real-time Data: Connects to Binance WebSocket for live trade feeds
  • Multi-channel Distribution: Publishes via Redis Pub/Sub and Redis Streams
  • Bid/Ask Calculation: Computes spreads from trade prices (±0.5%)
  • Multiple Assets: Supports BTC, ETH, SOL perpetual contracts
  • Resilient: Automatic reconnection on WebSocket failures
  • Batched Updates: Aggregates and sends updates every 3 seconds

Architecture

Service Initialization

import { pubsubClient, config, redisClient } from "@repo/config";
import WebSocket from "ws";
import "dotenv/config";
import { redisStreams, constant } from "@repo/config";
import { type PriceUpdate } from "@repo/types";

// Connect to Binance WebSocket
const ws = new WebSocket(config.BINANCE_WS_URL);

// Connect Redis Pub/Sub
const PubsubClient = pubsubClient(config.REDIS_URL);
await PubsubClient.connect();

// Connect Redis Queue
const RedisClient = redisClient(config.REDIS_URL);
await RedisClient.connect();

// Connect Redis Streams
const RedisStreams = redisStreams(config.REDIS_URL);
await RedisStreams.connect();

const price_updates: PriceUpdate[] = [];
const crypto_trades = ["ETH_USDC_PERP", "SOL_USDC_PERP", "BTC_USDC_PERP"];
let bidAskMap: Record<string, { bid: number; ask: number }> = {};

WebSocket Subscription

ws.on("open", function open() {
  // Subscribe to trade streams for each asset
  crypto_trades.forEach((element) => {
    ws.send(`{"method":"SUBSCRIBE","params":["trade.${element}"],"id":4}`);
  });

  ws.on("message", async (data) => {
    try {
      const msg = JSON.parse(data.toString());

      // 1. Extract trade data
      const decimal = 4;
      const asset = msg.data.s.toString();
      const price = Math.floor(Number(msg.data.p) * 10 ** decimal);
      const bidValue = Math.floor((price - price * 0.005) * 10 ** decimal);
      const askValue = Math.floor((price + price * 0.005) * 10 ** decimal);

      // 2. Push raw trade to Redis queue (for Batch Upload)
      await RedisClient.pushData(constant.redisQueue, JSON.stringify(msg));

      // 3. Send bid/ask to WebSocket clients via Pub/Sub
      if (crypto_trades.includes(asset)) {
        const BidAsk = {
          asset,
          bid: bidValue,
          ask: askValue,
        };
        bidAskMap[asset] = BidAsk;
        await PubsubClient.publish(constant.pubsubKey, JSON.stringify(BidAsk));
      }

      // 4. Update price_updates array
      const idx = price_updates.findIndex((u) => u.asset === asset);
      if (idx !== -1 && price_updates[idx]) {
        price_updates[idx].price = price;
        price_updates[idx].bidValue = bidValue;
        price_updates[idx].askValue = askValue;
        price_updates[idx].decimal = decimal;
      } else {
        price_updates.push({ asset, price, bidValue, askValue, decimal });
      }
    } catch (err) {
      console.error("Error parsing message:", err);
    }
  });
});

ws.on("error", (err) => {
  console.error("WebSocket error:", err);
});

Batch Price Updates

// Send data to Engine via Redis Streams every 3 seconds
setInterval(async () => {
  console.log(JSON.stringify(price_updates));
  await RedisStreams.addToRedisStream(
    constant.redisStream,
    { 
      function: "pricePoller",
      message: JSON.stringify(price_updates) 
    }
  );
}, 3000);
Price updates are sent to the Engine every 3 seconds via Redis Streams. This allows the Engine to check take profit/stop loss conditions without being overwhelmed by individual trade updates.

Price Calculation

Bid/Ask Spread Formula

The service calculates bid and ask prices from the trade price with a 0.5% spread on each side:
const decimal = 4; // 4 decimal places for prices
const tradePrice = Number(msg.data.p); // e.g., 96000.00 for BTC

// Convert to integer representation (multiply by 10^4)
const price = Math.floor(tradePrice * Math.pow(10, decimal));
// Example: 96000.00 * 10000 = 960000000

// Calculate bid: 0.5% below trade price
const bidValue = Math.floor((price - price * 0.005) * Math.pow(10, decimal));
// Example: (960000000 - 4800000) * 10000 = 955200000

// Calculate ask: 0.5% above trade price
const askValue = Math.floor((price + price * 0.005) * Math.pow(10, decimal));
// Example: (960000000 + 4800000) * 10000 = 964800000

Why Integer Representation?

Prices are stored as integers to avoid floating-point precision errors:
// Problem with floats:
const price1 = 0.1 + 0.2; // 0.30000000000000004 ❌

// Solution with integers:
const price2 = 1000 + 2000; // 3000 ✅
const actualPrice = price2 / 10000; // 0.3 when needed

Data Distribution

1. Redis Pub/Sub (Real-time WebSocket)

Published immediately on each trade:
const BidAsk = {
  asset: "BTC_USDC_PERP",
  bid: 955200000,  // $95,520.00
  ask: 964800000   // $96,480.00
};

await PubsubClient.publish(
  constant.pubsubKey, 
  JSON.stringify(BidAsk)
);
Consumers:
  • WebSocket Server → Broadcasts to connected clients

2. Redis Queue (Historical Storage)

Raw trade data pushed for database storage:
const rawTrade = {
  data: {
    s: "BTC_USDC_PERP",    // symbol
    p: "96000.00",         // price
    q: "0.5",              // quantity
    t: 1234567890,         // trade ID
    T: 1678901234567,      // timestamp
    m: false               // is_buyer_maker
  }
};

await RedisClient.pushData(
  constant.redisQueue,
  JSON.stringify(rawTrade)
);
Consumers:
  • Batch Upload Service → Stores in TimescaleDB for candle generation

3. Redis Streams (Engine Updates)

Batched price updates every 3 seconds:
const priceUpdates = [
  {
    asset: "BTC_USDC_PERP",
    price: 960000000,
    bidValue: 955200000,
    askValue: 964800000,
    decimal: 4
  },
  {
    asset: "ETH_USDC_PERP",
    price: 35600000,
    bidValue: 35422000,
    askValue: 35778000,
    decimal: 4
  }
];

await RedisStreams.addToRedisStream(
  constant.redisStream,
  {
    function: "pricePoller",
    message: JSON.stringify(priceUpdates)
  }
);
Consumers:
  • Trading Engine → Updates in-memory prices, checks TP/SL triggers

Supported Assets

const crypto_trades = [
  "ETH_USDC_PERP",  // Ethereum Perpetual
  "SOL_USDC_PERP",  // Solana Perpetual
  "BTC_USDC_PERP"   // Bitcoin Perpetual
];

Adding New Assets

To support additional trading pairs:
  1. Add the Binance symbol to crypto_trades array
  2. Ensure the symbol exists on Binance WebSocket API
  3. Update the supported assets in Backend service
  4. Deploy and restart the Price Poller
const crypto_trades = [
  "ETH_USDC_PERP",
  "SOL_USDC_PERP",
  "BTC_USDC_PERP",
  "AVAX_USDC_PERP"  // New asset
];

Error Handling

WebSocket Reconnection

ws.on("error", (err) => {
  console.error("WebSocket error:", err);
  // Implement exponential backoff reconnection
  setTimeout(() => {
    console.log("Attempting to reconnect...");
    reconnect();
  }, 5000);
});

ws.on("close", () => {
  console.log("WebSocket connection closed");
  setTimeout(() => reconnect(), 5000);
});

function reconnect() {
  const ws = new WebSocket(config.BINANCE_WS_URL);
  // Re-subscribe to all streams
  crypto_trades.forEach((symbol) => {
    ws.send(`{"method":"SUBSCRIBE","params":["trade.${symbol}"],"id":4}`);
  });
}

Message Parsing Errors

ws.on("message", async (data) => {
  try {
    const msg = JSON.parse(data.toString());
    
    // Validate message structure
    if (!msg.data || !msg.data.s || !msg.data.p) {
      console.warn("Invalid message format:", msg);
      return;
    }
    
    // Process message...
  } catch (err) {
    console.error("Error parsing message:", err);
    // Continue processing other messages
  }
});

Configuration

Environment Variables

BINANCE_WS_URL
string
required
Binance WebSocket API endpoint (e.g., wss://stream.binance.com:9443/ws)
REDIS_URL
string
required
Redis connection URL for pub/sub, queue, and streams

Redis Configuration

constant.pubsubKey
string
default:"exness:price-updates"
Redis Pub/Sub channel for real-time price broadcasts
constant.redisQueue
string
default:"exness:trade-queue"
Redis queue for raw trade data
constant.redisStream
string
default:"exness:engine-stream"
Redis stream for Engine price updates

Deployment

price-poller:
  build:
    context: .
    dockerfile: apps/docker/Price_Poller.Dockerfile
  container_name: exness-price-poller
  environment:
    REDIS_URL: redis://redis:6379
    BINANCE_WS_URL: wss://stream.binance.com:9443/ws
  depends_on:
    redis:
      condition: service_healthy
  restart: unless-stopped
External Dependency: This service relies on Binance’s WebSocket API. If Binance is down or rate-limits connections, the entire platform loses price feeds. Consider implementing:
  • Multiple data source failover (e.g., Coinbase, Kraken)
  • Price cache with staleness detection
  • Circuit breaker pattern for API failures

Data Flow

Performance Characteristics

  • Message Rate: 10-100 trades/second per asset (varies by market activity)
  • Latency: Less than 50ms from Binance trade to Redis publish
  • Memory: ~10MB (price cache for 3 assets)
  • CPU: Less than 5% (message parsing and forwarding)
  • Network: ~1 KB/s outbound to Redis

Monitoring

Health Checks

let lastUpdateTime = Date.now();
let messageCount = 0;

ws.on("message", async (data) => {
  messageCount++;
  lastUpdateTime = Date.now();
  // Process message...
});

// Staleness detection
setInterval(() => {
  const timeSinceUpdate = Date.now() - lastUpdateTime;
  if (timeSinceUpdate > 30000) {
    console.error(`No price updates for ${timeSinceUpdate}ms - possible connection issue`);
    // Trigger alert or reconnection
  }
  console.log(`Messages processed: ${messageCount}`);
  messageCount = 0;
}, 60000);

Testing

Mock Binance WebSocket

import { WebSocketServer } from 'ws';

// Create mock Binance server
const mockServer = new WebSocketServer({ port: 9443 });

mockServer.on('connection', (socket) => {
  // Send mock trade every second
  setInterval(() => {
    const mockTrade = {
      data: {
        s: "BTC_USDC_PERP",
        p: "96000.00",
        q: "0.5",
        t: Date.now(),
        T: Date.now(),
        m: false
      }
    };
    socket.send(JSON.stringify(mockTrade));
  }, 1000);
});

Build docs developers (and LLMs) love