Overview
The Trading Engine is the core service responsible for processing trading operations in the Exness Trading Platform. It consumes messages from Redis Streams, validates orders, manages user positions, calculates profit/loss, and enforces risk management rules including slippage control, take profit, and stop loss.
Location: apps/Engine/src/index.ts:1
Processing Model: Parallel consumer with configurable concurrency
Key Features
Parallel Processing : Handles up to 10 concurrent messages with worker-based architecture
Order Management : Create, track, and close trading positions
Risk Management : Slippage protection, take profit, stop loss enforcement
Balance Management : Real-time margin calculation and balance updates
Price Polling : Continuous monitoring of open positions against market prices
Consumer Group : Uses Redis consumer groups for reliable message processing
Architecture
Service Initialization
import { config , redisStreams , constant } from "@repo/config" ;
import { tradeFunction } from "./functions/tradeFunction.js" ;
// Connect Redis Streams
const RedisStreams = redisStreams ( config . REDIS_URL );
await RedisStreams . connect ();
// Configuration for parallel processing
const CONCURRENCY_LIMIT = 10 ; // Process up to 10 messages concurrently
const CONSUMER_GROUP = "engine-group" ;
const CONSUMER_NAME = `engine- ${ process . pid } - ${ Date . now () } ` ; // Unique consumer name
let activeTasks = 0 ;
const taskQueue : Array <() => Promise < void >> = [];
// Process a single message
async function processMessage ( result : any ) {
try {
await tradeFunction ( result );
} catch ( error ) {
console . error ( "Error processing message:" , error );
}
}
// Worker function to process queued tasks
async function worker () {
while ( true ) {
if ( taskQueue . length > 0 && activeTasks < CONCURRENCY_LIMIT ) {
const task = taskQueue . shift ();
if ( task ) {
activeTasks ++ ;
task (). finally (() => {
activeTasks -- ;
});
}
} else {
await new Promise ( resolve => setTimeout ( resolve , 10 ));
}
}
}
// Start worker threads
const WORKER_COUNT = Math . min ( CONCURRENCY_LIMIT , 5 );
for ( let i = 0 ; i < WORKER_COUNT ; i ++ ) {
worker ();
}
console . log ( `Engine started with ${ WORKER_COUNT } workers, max concurrency: ${ CONCURRENCY_LIMIT } ` );
Message Consumer Loop
// Continuously consume messages and queue them for processing
while ( true ) {
try {
const result = await RedisStreams . readNextFromRedisStream (
constant . redisStream ,
0 , // Block indefinitely
{
consumerGroup: CONSUMER_GROUP ,
consumerName: CONSUMER_NAME
}
);
if ( result ) {
// Queue the task for parallel processing
taskQueue . push (() => processMessage ( result ));
console . log ( `Queued message for processing. Queue length: ${ taskQueue . length } , Active tasks: ${ activeTasks } ` );
}
} catch ( error ) {
console . error ( "Error reading from Redis stream:" , error );
await new Promise ( resolve => setTimeout ( resolve , 1000 ));
}
}
The Engine uses a consumer group pattern to ensure each message is processed exactly once, even with multiple Engine instances running in parallel.
Core Functions
Trade Function Router
The main router that dispatches messages to appropriate handlers:
apps/Engine/src/functions/tradeFunction.ts
export async function tradeFunction ( result : any ) {
console . log ( "tradeFunction received:" , result );
console . log ( "Function type:" , result . function );
if ( result . function === "createCloseOrder" ) {
await createCloseOrderFunction ( result );
}
if ( result . function === "createUser" ) {
await createUserFunction ( result );
}
if ( result . function === "createOrder" ) {
await createOrderFunction ( result );
}
if ( result . function === "getOpenOrder" ) {
await getOpenOrderFunction ( result );
}
if ( result . function === "pricePoller" ) {
pricePollerFunction ( result );
}
}
Create Order Function
Handles new order creation with comprehensive validation and risk management:
apps/Engine/src/functions/createOrder.ts
export async function createOrderFunction ( result : any ) {
// 1. Validate user exists
const user = users . find (( user : any ) => user . userId === result . userId );
if ( ! user ) {
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createOrder" ,
message: JSON . stringify ({
error: "User not found" ,
success: false
})
}
);
return ;
}
// 2. Normalize symbol (BTCUSDT -> btc)
const normalizedSymbol = normalizeSymbol ( result . symbol );
const priceAssetName = getPriceAssetName ( normalizedSymbol );
// 3. Fetch current price data
const priceData = prices . find (( p : any ) => p . asset === priceAssetName );
if ( ! priceData ) {
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createOrder" ,
message: JSON . stringify ({
error: `Price data not found for symbol: ${ result . symbol } ` ,
success: false
})
}
);
return ;
}
// 4. Calculate expected price based on order type
const orderType = result . type ?. toLowerCase ();
let expectedPrice : number ;
if ( orderType === "buy" ) {
expectedPrice = priceData . askValue ;
} else {
expectedPrice = priceData . bidValue ;
}
// 5. Validate balance
const quantity = parseFloat ( result . quantity );
const leverage = parseInt ( result . leverage ) || 1 ;
const marginRequired = ( quantity * expectedPrice ) / leverage ;
const dbUser = await prisma . user . findUnique ({
where: { userID: result . userId },
select: { balance: true }
});
if ( dbUser . balance < marginRequired ) {
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createOrder" ,
message: JSON . stringify ({
error: `Insufficient balance. Required: $ ${ marginRequired . toFixed ( 2 ) } , Available: $ ${ dbUser . balance . toFixed ( 2 ) } ` ,
success: false
})
}
);
return ;
}
// 6. Create order
const orderId = uuid ();
const newOrder = {
userId: result . userId ,
orderId: orderId ,
symbol: normalizedSymbol ,
type: orderType ,
quantity: quantity ,
leverage: leverage ,
openPrice: expectedPrice ,
openTime: new Date (),
takeProfit: result . takeProfit ,
stopLoss: result . stopLoss ,
stippage: result . slippage ,
};
openOrders . push ( newOrder );
// 7. Check slippage
const currentPriceData = prices . find (( p : any ) => p . asset === priceAssetName );
let currentExecutionPrice = orderType === "buy"
? currentPriceData . askValue
: currentPriceData . bidValue ;
if ( result . slippage !== undefined && result . slippage !== null ) {
const slippageTolerance = parseFloat ( String ( result . slippage ));
const priceDifference = Math . abs ( currentExecutionPrice - expectedPrice );
const priceDifferencePercent = ( priceDifference / expectedPrice ) * 100 ;
if ( priceDifferencePercent > slippageTolerance ) {
// Remove order due to slippage
const orderIndex = openOrders . findIndex ( o => o . orderId === orderId );
if ( orderIndex !== - 1 ) {
openOrders . splice ( orderIndex , 1 );
}
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createOrder" ,
message: JSON . stringify ({
error: `Order cancelled due to slippage. Slippage: ${ priceDifferencePercent . toFixed ( 4 ) } % (Tolerance: ${ slippageTolerance } %)` ,
success: false
})
}
);
return ;
}
}
// 8. Update order with actual execution price
const orderIndex = openOrders . findIndex ( o => o . orderId === orderId );
if ( orderIndex !== - 1 ) {
openOrders [ orderIndex ]. openPrice = currentExecutionPrice ;
}
// 9. Deduct margin from balance
const actualMarginRequired = ( quantity * currentExecutionPrice ) / leverage ;
const newBalance = dbUser . balance - actualMarginRequired ;
await prisma . user . update ({
where: { userID: result . userId },
data: { balance: newBalance }
});
// 10. Send success response
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createOrder" ,
message: JSON . stringify ({
success: true ,
orderId: orderId ,
message: "Order created successfully"
}),
requestId: result . requestId
}
);
}
Slippage Protection : If the current market price deviates from the expected price by more than the slippage tolerance percentage, the order is automatically cancelled and margin is not deducted.
Close Order Function
Closes an open position and calculates profit/loss:
apps/Engine/src/functions/createCloseOrder.ts
export async function createCloseOrderFunction ( result : any ) {
const { orderId , userId } = result ;
// Find the order
const orderIndex = openOrders . findIndex (
( order : any ) => order . orderId === orderId && order . userId === userId
);
if ( orderIndex === - 1 ) {
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createCloseOrder" ,
message: JSON . stringify ({ error: "Order not found" })
}
);
return ;
}
const order = openOrders [ orderIndex ];
// Get current price
const priceAssetName = getPriceAssetName ( order . symbol );
const priceData = prices . find (( p : any ) => p . asset === priceAssetName );
const closePrice = order . type === "buy"
? priceData . bidValue
: priceData . askValue ;
// Calculate profit/loss
let profitLoss ;
if ( order . type === "buy" ) {
profitLoss = ( closePrice - order . openPrice ) * order . quantity * order . leverage ;
} else {
profitLoss = ( order . openPrice - closePrice ) * order . quantity * order . leverage ;
}
// Calculate margin return
const marginReturn = ( order . quantity * order . openPrice ) / order . leverage ;
// Update user balance
const dbUser = await prisma . user . findUnique ({
where: { userID: userId }
});
const newBalance = dbUser . balance + marginReturn + profitLoss ;
await prisma . user . update ({
where: { userID: userId },
data: { balance: newBalance }
});
// Remove from open orders
openOrders . splice ( orderIndex , 1 );
// Send to DBStorage for persistence
const closedOrder = {
... order ,
closePrice ,
closeTime: new Date (),
profitLoss
};
await RedisStreams . addToRedisStream (
constant . dbStorageStream ,
{
function: "createCloseOrder" ,
message: closedOrder
}
);
// Send response to Backend
await RedisStreams . addToRedisStream (
constant . secondaryRedisStream ,
{
function: "createCloseOrder" ,
message: JSON . stringify ( closedOrder ),
requestId: result . requestId
}
);
}
Price Poller Function
Continuously monitors open positions for take profit/stop loss triggers:
apps/Engine/src/functions/pricePoller.ts
export function pricePollerFunction ( result : any ) {
try {
const priceUpdates = JSON . parse ( result . message );
// Update global prices array
priceUpdates . forEach (( update : PriceUpdate ) => {
const idx = prices . findIndex (( p : any ) => p . asset === update . asset );
if ( idx !== - 1 ) {
prices [ idx ] = update ;
} else {
prices . push ( update );
}
});
// Check all open orders for TP/SL triggers
openOrders . forEach (( order : any ) => {
const priceAssetName = getPriceAssetName ( order . symbol );
const priceData = prices . find (( p : any ) => p . asset === priceAssetName );
if ( ! priceData ) return ;
const currentPrice = order . type === "buy"
? priceData . bidValue
: priceData . askValue ;
// Check Take Profit
if ( order . takeProfit && shouldTriggerTakeProfit ( order , currentPrice )) {
console . log ( `Take Profit triggered for order ${ order . orderId } ` );
createCloseOrderFunction ({ orderId: order . orderId , userId: order . userId });
}
// Check Stop Loss
if ( order . stopLoss && shouldTriggerStopLoss ( order , currentPrice )) {
console . log ( `Stop Loss triggered for order ${ order . orderId } ` );
createCloseOrderFunction ({ orderId: order . orderId , userId: order . userId });
}
});
} catch ( error ) {
console . error ( "Error in pricePollerFunction:" , error );
}
}
The Price Poller function is triggered every 3 seconds by the Price Poller service, ensuring near-real-time monitoring of take profit and stop loss conditions.
Data Structures
In-Memory Storage
apps/Engine/src/data/users.ts
apps/Engine/src/data/orders.ts
apps/Engine/src/data/price.ts
export const users : Array <{
userId : string ;
email : string ;
balance : number ;
}> = [];
Memory-Based Storage : The Engine stores open orders and current prices in memory for ultra-low latency. This means open orders are lost if the service restarts. For production, consider implementing snapshot/restore from Redis or MongoDB.
Configuration
Environment Variables
Redis connection URL for streams and pub/sub
PostgreSQL connection string for balance updates
Maximum concurrent message processing tasks
Number of worker threads for task processing
Deployment
engine :
build :
context : .
dockerfile : apps/docker/Engine.Dockerfile
container_name : exness-engine
environment :
REDIS_URL : redis://redis:6379
DATABASE_URL : postgresql://postgresql:postgresql@postgres:5432/exness
depends_on :
postgres :
condition : service_healthy
redis :
condition : service_healthy
restart : unless-stopped
Data Flow
Throughput : ~100 orders/second with 10 concurrent workers
Latency : 50-200ms per order (including DB operations)
Memory : ~50MB base + 1KB per open order
TP/SL Check Frequency : Every 3 seconds via Price Poller