Skip to main content

Overview

The Trading Engine is the core service responsible for processing trading operations in the Exness Trading Platform. It consumes messages from Redis Streams, validates orders, manages user positions, calculates profit/loss, and enforces risk management rules including slippage control, take profit, and stop loss. Location: apps/Engine/src/index.ts:1 Processing Model: Parallel consumer with configurable concurrency

Key Features

  • Parallel Processing: Handles up to 10 concurrent messages with worker-based architecture
  • Order Management: Create, track, and close trading positions
  • Risk Management: Slippage protection, take profit, stop loss enforcement
  • Balance Management: Real-time margin calculation and balance updates
  • Price Polling: Continuous monitoring of open positions against market prices
  • Consumer Group: Uses Redis consumer groups for reliable message processing

Architecture

Service Initialization

import { config, redisStreams, constant } from "@repo/config";
import { tradeFunction } from "./functions/tradeFunction.js";

// Connect Redis Streams
const RedisStreams = redisStreams(config.REDIS_URL);
await RedisStreams.connect();

// Configuration for parallel processing
const CONCURRENCY_LIMIT = 10; // Process up to 10 messages concurrently
const CONSUMER_GROUP = "engine-group";
const CONSUMER_NAME = `engine-${process.pid}-${Date.now()}`; // Unique consumer name

let activeTasks = 0;
const taskQueue: Array<() => Promise<void>> = [];

// Process a single message
async function processMessage(result: any) {
  try {
    await tradeFunction(result);
  } catch (error) {
    console.error("Error processing message:", error);
  }
}

// Worker function to process queued tasks
async function worker() {
  while (true) {
    if (taskQueue.length > 0 && activeTasks < CONCURRENCY_LIMIT) {
      const task = taskQueue.shift();
      if (task) {
        activeTasks++;
        task().finally(() => {
          activeTasks--;
        });
      }
    } else {
      await new Promise(resolve => setTimeout(resolve, 10));
    }
  }
}

// Start worker threads
const WORKER_COUNT = Math.min(CONCURRENCY_LIMIT, 5);
for (let i = 0; i < WORKER_COUNT; i++) {
  worker();
}

console.log(`Engine started with ${WORKER_COUNT} workers, max concurrency: ${CONCURRENCY_LIMIT}`);

Message Consumer Loop

// Continuously consume messages and queue them for processing
while (true) {
  try {
    const result = await RedisStreams.readNextFromRedisStream(
      constant.redisStream,
      0, // Block indefinitely
      {
        consumerGroup: CONSUMER_GROUP,
        consumerName: CONSUMER_NAME
      }
    );
    
    if (result) {
      // Queue the task for parallel processing
      taskQueue.push(() => processMessage(result));
      console.log(`Queued message for processing. Queue length: ${taskQueue.length}, Active tasks: ${activeTasks}`);
    }
  } catch (error) {
    console.error("Error reading from Redis stream:", error);
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
}
The Engine uses a consumer group pattern to ensure each message is processed exactly once, even with multiple Engine instances running in parallel.

Core Functions

Trade Function Router

The main router that dispatches messages to appropriate handlers:
export async function tradeFunction(result: any) {
  console.log("tradeFunction received:", result);
  console.log("Function type:", result.function);

  if (result.function === "createCloseOrder") {
    await createCloseOrderFunction(result);
  }
  if (result.function === "createUser") {
    await createUserFunction(result);
  }
  if (result.function === "createOrder") {
    await createOrderFunction(result);
  }
  if (result.function === "getOpenOrder") {
    await getOpenOrderFunction(result);
  }
  if (result.function === "pricePoller") {
    pricePollerFunction(result);
  }
}

Create Order Function

Handles new order creation with comprehensive validation and risk management:
export async function createOrderFunction(result: any) {
  // 1. Validate user exists
  const user = users.find((user: any) => user.userId === result.userId);
  if (!user) {
    await RedisStreams.addToRedisStream(
      constant.secondaryRedisStream,
      { 
        function: "createOrder", 
        message: JSON.stringify({ 
          error: "User not found",
          success: false 
        }) 
      }
    );
    return;
  }

  // 2. Normalize symbol (BTCUSDT -> btc)
  const normalizedSymbol = normalizeSymbol(result.symbol);
  const priceAssetName = getPriceAssetName(normalizedSymbol);

  // 3. Fetch current price data
  const priceData = prices.find((p: any) => p.asset === priceAssetName);
  if (!priceData) {
    await RedisStreams.addToRedisStream(
      constant.secondaryRedisStream,
      { 
        function: "createOrder", 
        message: JSON.stringify({ 
          error: `Price data not found for symbol: ${result.symbol}`,
          success: false 
        }) 
      }
    );
    return;
  }

  // 4. Calculate expected price based on order type
  const orderType = result.type?.toLowerCase();
  let expectedPrice: number;
  if (orderType === "buy") {
    expectedPrice = priceData.askValue;
  } else {
    expectedPrice = priceData.bidValue;
  }

  // 5. Validate balance
  const quantity = parseFloat(result.quantity);
  const leverage = parseInt(result.leverage) || 1;
  const marginRequired = (quantity * expectedPrice) / leverage;
  
  const dbUser = await prisma.user.findUnique({
    where: { userID: result.userId },
    select: { balance: true }
  });
  
  if (dbUser.balance < marginRequired) {
    await RedisStreams.addToRedisStream(
      constant.secondaryRedisStream,
      { 
        function: "createOrder", 
        message: JSON.stringify({ 
          error: `Insufficient balance. Required: $${marginRequired.toFixed(2)}, Available: $${dbUser.balance.toFixed(2)}`,
          success: false 
        }) 
      }
    );
    return;
  }

  // 6. Create order
  const orderId = uuid();
  const newOrder = {
    userId: result.userId,
    orderId: orderId,
    symbol: normalizedSymbol,
    type: orderType,
    quantity: quantity,
    leverage: leverage,
    openPrice: expectedPrice,
    openTime: new Date(),
    takeProfit: result.takeProfit,
    stopLoss: result.stopLoss,
    stippage: result.slippage,
  };
  
  openOrders.push(newOrder);

  // 7. Check slippage
  const currentPriceData = prices.find((p: any) => p.asset === priceAssetName);
  let currentExecutionPrice = orderType === "buy" 
    ? currentPriceData.askValue 
    : currentPriceData.bidValue;

  if (result.slippage !== undefined && result.slippage !== null) {
    const slippageTolerance = parseFloat(String(result.slippage));
    const priceDifference = Math.abs(currentExecutionPrice - expectedPrice);
    const priceDifferencePercent = (priceDifference / expectedPrice) * 100;

    if (priceDifferencePercent > slippageTolerance) {
      // Remove order due to slippage
      const orderIndex = openOrders.findIndex(o => o.orderId === orderId);
      if (orderIndex !== -1) {
        openOrders.splice(orderIndex, 1);
      }
      
      await RedisStreams.addToRedisStream(
        constant.secondaryRedisStream,
        { 
          function: "createOrder", 
          message: JSON.stringify({ 
            error: `Order cancelled due to slippage. Slippage: ${priceDifferencePercent.toFixed(4)}% (Tolerance: ${slippageTolerance}%)`,
            success: false 
          }) 
        }
      );
      return;
    }
  }

  // 8. Update order with actual execution price
  const orderIndex = openOrders.findIndex(o => o.orderId === orderId);
  if (orderIndex !== -1) {
    openOrders[orderIndex].openPrice = currentExecutionPrice;
  }

  // 9. Deduct margin from balance
  const actualMarginRequired = (quantity * currentExecutionPrice) / leverage;
  const newBalance = dbUser.balance - actualMarginRequired;
  
  await prisma.user.update({
    where: { userID: result.userId },
    data: { balance: newBalance }
  });

  // 10. Send success response
  await RedisStreams.addToRedisStream(
    constant.secondaryRedisStream,
    { 
      function: "createOrder", 
      message: JSON.stringify({ 
        success: true,
        orderId: orderId,
        message: "Order created successfully"
      }),
      requestId: result.requestId
    }
  );
}
Slippage Protection: If the current market price deviates from the expected price by more than the slippage tolerance percentage, the order is automatically cancelled and margin is not deducted.

Close Order Function

Closes an open position and calculates profit/loss:
export async function createCloseOrderFunction(result: any) {
  const { orderId, userId } = result;
  
  // Find the order
  const orderIndex = openOrders.findIndex(
    (order: any) => order.orderId === orderId && order.userId === userId
  );
  
  if (orderIndex === -1) {
    await RedisStreams.addToRedisStream(
      constant.secondaryRedisStream,
      { 
        function: "createCloseOrder",
        message: JSON.stringify({ error: "Order not found" })
      }
    );
    return;
  }
  
  const order = openOrders[orderIndex];
  
  // Get current price
  const priceAssetName = getPriceAssetName(order.symbol);
  const priceData = prices.find((p: any) => p.asset === priceAssetName);
  
  const closePrice = order.type === "buy" 
    ? priceData.bidValue 
    : priceData.askValue;
  
  // Calculate profit/loss
  let profitLoss;
  if (order.type === "buy") {
    profitLoss = (closePrice - order.openPrice) * order.quantity * order.leverage;
  } else {
    profitLoss = (order.openPrice - closePrice) * order.quantity * order.leverage;
  }
  
  // Calculate margin return
  const marginReturn = (order.quantity * order.openPrice) / order.leverage;
  
  // Update user balance
  const dbUser = await prisma.user.findUnique({
    where: { userID: userId }
  });
  
  const newBalance = dbUser.balance + marginReturn + profitLoss;
  
  await prisma.user.update({
    where: { userID: userId },
    data: { balance: newBalance }
  });
  
  // Remove from open orders
  openOrders.splice(orderIndex, 1);
  
  // Send to DBStorage for persistence
  const closedOrder = {
    ...order,
    closePrice,
    closeTime: new Date(),
    profitLoss
  };
  
  await RedisStreams.addToRedisStream(
    constant.dbStorageStream,
    {
      function: "createCloseOrder",
      message: closedOrder
    }
  );
  
  // Send response to Backend
  await RedisStreams.addToRedisStream(
    constant.secondaryRedisStream,
    { 
      function: "createCloseOrder",
      message: JSON.stringify(closedOrder),
      requestId: result.requestId
    }
  );
}

Price Poller Function

Continuously monitors open positions for take profit/stop loss triggers:
export function pricePollerFunction(result: any) {
  try {
    const priceUpdates = JSON.parse(result.message);
    
    // Update global prices array
    priceUpdates.forEach((update: PriceUpdate) => {
      const idx = prices.findIndex((p: any) => p.asset === update.asset);
      if (idx !== -1) {
        prices[idx] = update;
      } else {
        prices.push(update);
      }
    });
    
    // Check all open orders for TP/SL triggers
    openOrders.forEach((order: any) => {
      const priceAssetName = getPriceAssetName(order.symbol);
      const priceData = prices.find((p: any) => p.asset === priceAssetName);
      
      if (!priceData) return;
      
      const currentPrice = order.type === "buy" 
        ? priceData.bidValue 
        : priceData.askValue;
      
      // Check Take Profit
      if (order.takeProfit && shouldTriggerTakeProfit(order, currentPrice)) {
        console.log(`Take Profit triggered for order ${order.orderId}`);
        createCloseOrderFunction({ orderId: order.orderId, userId: order.userId });
      }
      
      // Check Stop Loss
      if (order.stopLoss && shouldTriggerStopLoss(order, currentPrice)) {
        console.log(`Stop Loss triggered for order ${order.orderId}`);
        createCloseOrderFunction({ orderId: order.orderId, userId: order.userId });
      }
    });
  } catch (error) {
    console.error("Error in pricePollerFunction:", error);
  }
}
The Price Poller function is triggered every 3 seconds by the Price Poller service, ensuring near-real-time monitoring of take profit and stop loss conditions.

Data Structures

In-Memory Storage

export const users: Array<{
  userId: string;
  email: string;
  balance: number;
}> = [];
Memory-Based Storage: The Engine stores open orders and current prices in memory for ultra-low latency. This means open orders are lost if the service restarts. For production, consider implementing snapshot/restore from Redis or MongoDB.

Configuration

Environment Variables

REDIS_URL
string
required
Redis connection URL for streams and pub/sub
DATABASE_URL
string
required
PostgreSQL connection string for balance updates

Performance Tuning

CONCURRENCY_LIMIT
number
default:"10"
Maximum concurrent message processing tasks
WORKER_COUNT
number
default:"5"
Number of worker threads for task processing

Deployment

engine:
  build:
    context: .
    dockerfile: apps/docker/Engine.Dockerfile
  container_name: exness-engine
  environment:
    REDIS_URL: redis://redis:6379
    DATABASE_URL: postgresql://postgresql:postgresql@postgres:5432/exness
  depends_on:
    postgres:
      condition: service_healthy
    redis:
      condition: service_healthy
  restart: unless-stopped

Data Flow

Performance Characteristics

  • Throughput: ~100 orders/second with 10 concurrent workers
  • Latency: 50-200ms per order (including DB operations)
  • Memory: ~50MB base + 1KB per open order
  • TP/SL Check Frequency: Every 3 seconds via Price Poller

Build docs developers (and LLMs) love