Skip to main content

Overview

The Database Storage (DBStorage) service is responsible for persisting trading data and managing user records in PostgreSQL. It consumes messages from Redis Streams, stores closed orders in the database, handles user creation/updates, and serves queries for historical order data. Location: apps/DBstorage/src/index.ts:1 Database: PostgreSQL with Prisma ORM Message Source: Redis Streams (dbStorageStream)

Key Features

  • Closed Order Persistence: Stores completed trades with P/L calculations
  • User Management: Creates and updates user records from Engine
  • Historical Queries: Retrieves closed order history for users
  • Data Integrity: Ensures orders match Prisma schema constraints
  • Idempotent Operations: Handles duplicate messages gracefully
  • Async Processing: Consumes Redis Streams continuously

Architecture

Service Initialization

import { config, redisStreams, constant } from "@repo/config";
import { dbStorageFunction } from "./functions/dbStorageFunction.js";

// Connect Redis Streams
const RedisStreams = redisStreams(config.REDIS_URL);
await RedisStreams.connect();

// Start consuming messages
await RedisStreams.readRedisStream(
  constant.dbStorageStream,
  dbStorageFunction
);
The DBStorage service uses a single consumer that processes messages sequentially to maintain data consistency and avoid race conditions in the database.

Core Functions

Database Storage Function

Main message router that handles different operation types:
import { redisStreams, config, constant } from "@repo/config";
import { prisma } from "@repo/db";

let RedisStreamsInstance: ReturnType<typeof redisStreams> | null = null;

async function getRedisStreams() {
  if (!RedisStreamsInstance) {
    RedisStreamsInstance = redisStreams(config.REDIS_URL);
    await RedisStreamsInstance.connect();
  }
  return RedisStreamsInstance;
}

export async function dbStorageFunction(result: any) {
  console.log("dbStorageFunction called with:", result);
  
  if (result.function === "createCloseOrder") {
    // Handle closed order storage
  }
  
  if (result.function === "getCloseOrders") {
    // Handle closed order queries
  }
  
  if (result.function === "createUser") {
    // Handle user creation/update
  }
}

Create Close Order

Stores a closed trading position with profit/loss data:
if (result.function === "createCloseOrder") {
  const orderData = result?.message;
  
  if (!orderData) {
    console.error("createCloseOrder: Missing order data", result);
    return;
  }
  
  const userId = orderData.userId;
  
  // Verify user exists
  const user = await prisma.user.findUnique({ 
    where: { userID: userId } 
  });
  
  if (!user) {
    console.error("createCloseOrder aborted: user not found", userId);
    return;
  }
  
  try {
    // Normalize symbol to lowercase (btc, sol, eth)
    const symbol = orderData.symbol?.toLowerCase();
    if (!symbol || !["btc", "sol", "eth"].includes(symbol)) {
      console.error("createCloseOrder: Invalid symbol", orderData.symbol);
      return;
    }
    
    // Normalize type to lowercase (buy, sell)
    const type = orderData.type?.toLowerCase();
    if (!type || !["buy", "sell"].includes(type)) {
      console.error("createCloseOrder: Invalid type", orderData.type);
      return;
    }
    
    // Convert timestamps to Date objects
    const openTime = orderData.openTime instanceof Date 
      ? orderData.openTime 
      : new Date(orderData.openTime);
    const closeTime = orderData.closeTime instanceof Date 
      ? orderData.closeTime 
      : new Date(orderData.closeTime);
    
    // Create order in database
    const createdOrder = await prisma.orders.create({
      data: {
        orderId: orderData.orderId,
        userId: userId,
        symbol: symbol as "btc" | "sol" | "eth",
        type: type as "buy" | "sell",
        quantity: parseFloat(orderData.quantity) || 0,
        leverage: parseInt(orderData.leverage) || 1,
        takeProfit: orderData.takeProfit ? parseFloat(orderData.takeProfit) : null,
        stopLoss: orderData.stopLoss ? parseFloat(orderData.stopLoss) : null,
        stippage: orderData.stippage ? parseFloat(orderData.stippage) : null,
        openPrice: parseFloat(orderData.openPrice) || 0,
        closePrice: parseFloat(orderData.closePrice) || 0,
        openTime: openTime,
        closeTime: closeTime,
        profitLoss: parseFloat(orderData.profitLoss) || 0,
      },
    });
    
    console.log("createCloseOrder: Successfully saved to database", {
      orderId: createdOrder.orderId,
      userId: createdOrder.userId,
      symbol: createdOrder.symbol,
      profitLoss: createdOrder.profitLoss,
    });
  } catch (error) {
    console.error("createCloseOrder: Error saving to database", error);
    console.error("Failed order data:", orderData);
  }
}

Get Close Orders

Retrieves historical closed orders for a user:
if (result.function === "getCloseOrders") {
  const userId = result?.userId || result?.message;
  
  if (!userId) {
    console.error("getCloseOrders: Missing userId");
    const RedisStreams = await getRedisStreams();
    await RedisStreams.addToRedisStream(constant.secondaryRedisStream, {
      function: "getCloseOrders",
      message: JSON.stringify([]),
    });
    return;
  }
  
  try {
    const RedisStreams = await getRedisStreams();
    
    // Fetch all closed orders for the user
    const closeOrders = await prisma.orders.findMany({
      where: {
        userId: userId,
      },
      orderBy: {
        closeTime: "desc",
      },
    });
    
    console.log(`Found ${closeOrders.length} closed orders for user ${userId}`);
    
    // Format response to match frontend expectations
    const formattedCloseOrders = closeOrders.map((order: any) => ({
      orderId: order.orderId || order.id,
      symbol: order.symbol,
      type: order.type,
      quantity: order.quantity,
      openPrice: order.openPrice,
      closePrice: order.closePrice,
      openTime: order.openTime?.toISOString() || order.openTime,
      closeTime: order.closeTime?.toISOString() || order.closeTime,
      profitLoss: order.profitLoss || 0,
      status: "closed"
    }));
    
    await RedisStreams.addToRedisStream(constant.secondaryRedisStream, {
      function: "getCloseOrders",
      message: JSON.stringify(formattedCloseOrders),
    });
  } catch (error) {
    console.error("Error fetching closeOrders:", error);
    const RedisStreams = await getRedisStreams();
    await RedisStreams.addToRedisStream(constant.secondaryRedisStream, {
      function: "getCloseOrders",
      message: JSON.stringify([]),
    });
  }
}

Create User

Handles user creation and updates with conflict resolution:
if (result.function === "createUser") {
  const { userId, userEmail } = result.message || {};
  
  if (!userId || !userEmail) {
    console.log("createUser missing fields", result.message);
    return;
  }
  
  try {
    // Check if user exists by email
    const existingUserByEmail = await prisma.user.findUnique({
      where: { email: userEmail }
    });
    
    if (existingUserByEmail) {
      // User exists with this email, update userID if different
      if (existingUserByEmail.userID !== userId) {
        await prisma.user.update({
          where: { email: userEmail },
          data: { userID: userId }
        });
        console.log("Updated existing user's userID", { userId, userEmail });
      } else {
        console.log("User already exists with same userID", { userId, userEmail });
      }
    } else {
      // Check if userID already exists
      const existingUserByID = await prisma.user.findUnique({
        where: { userID: userId }
      });
      
      if (existingUserByID) {
        // Update email for existing userID
        await prisma.user.update({
          where: { userID: userId },
          data: { email: userEmail }
        });
        console.log("Updated existing user's email", { userId, userEmail });
      } else {
        // Create new user
        await prisma.user.create({
          data: { userID: userId, email: userEmail }
        });
        console.log("Created new user", { userId, userEmail });
      }
    }
  } catch (error) {
    console.error("Error in createUser:", error);
    
    // Conflict resolution: find and update
    const existingUser = await prisma.user.findFirst({
      where: {
        OR: [
          { email: userEmail },
          { userID: userId }
        ]
      }
    });
    
    if (existingUser) {
      await prisma.user.update({
        where: { id: existingUser.id },
        data: { userID: userId, email: userEmail }
      });
      console.log("Updated existing user to resolve conflict", { userId, userEmail });
    }
  }
}
Unique Constraint Handling: The user creation logic handles race conditions where the same user might be created by both Engine and DBStorage simultaneously. It uses conflict resolution to ensure data consistency.

Database Schema

Orders Table

model Orders {
  id          Int      @id @default(autoincrement())
  orderId     String   @unique
  userId      String
  symbol      String   // "btc", "eth", "sol"
  type        String   // "buy", "sell"
  quantity    Float
  leverage    Int
  takeProfit  Float?
  stopLoss    Float?
  stippage    Float?
  openPrice   Float
  closePrice  Float
  openTime    DateTime
  closeTime   DateTime
  profitLoss  Float
  
  user        User     @relation(fields: [userId], references: [userID])
  
  @@index([userId])
  @@index([closeTime])
}

Users Table

model User {
  id      Int      @id @default(autoincrement())
  userID  String   @unique
  email   String   @unique
  balance Float    @default(10000.0)
  
  orders  Orders[]
  
  @@index([userID])
  @@index([email])
}

Data Flow

Configuration

Environment Variables

REDIS_URL
string
required
Redis connection URL for streams
DATABASE_URL
string
required
PostgreSQL connection string for Prisma

Redis Streams

constant.dbStorageStream
string
default:"exness:dbstorage-stream"
Stream for incoming storage requests
constant.secondaryRedisStream
string
default:"exness:response-stream"
Stream for outgoing query responses

Deployment

dbstorage:
  build:
    context: .
    dockerfile: apps/docker/DBstorage.Dockerfile
  container_name: exness-dbstorage
  environment:
    REDIS_URL: redis://redis:6379
    DATABASE_URL: postgresql://postgresql:postgresql@postgres:5432/exness
  depends_on:
    postgres:
      condition: service_healthy
    redis:
      condition: service_healthy
    db-migrate:
      condition: service_completed_successfully
  restart: unless-stopped
DBStorage must wait for database migrations to complete before starting to ensure the schema is up to date.

Error Handling

Validation Errors

// Symbol validation
const symbol = orderData.symbol?.toLowerCase();
if (!symbol || !["btc", "sol", "eth"].includes(symbol)) {
  console.error("createCloseOrder: Invalid symbol", orderData.symbol);
  return; // Skip invalid order
}

// Type validation
const type = orderData.type?.toLowerCase();
if (!type || !["buy", "sell"].includes(type)) {
  console.error("createCloseOrder: Invalid type", orderData.type);
  return; // Skip invalid order
}

// Date validation
const openTime = new Date(orderData.openTime);
if (isNaN(openTime.getTime())) {
  console.error("Invalid openTime", orderData.openTime);
  return;
}

Database Errors

try {
  await prisma.orders.create({ data: orderData });
} catch (error: any) {
  if (error.code === 'P2002') {
    // Unique constraint violation
    console.warn("Order already exists:", orderData.orderId);
  } else if (error.code === 'P2003') {
    // Foreign key constraint violation
    console.error("User not found:", orderData.userId);
  } else {
    console.error("Database error:", error);
  }
}

Performance Considerations

Indexing Strategy

-- Index on userId for fast user-specific queries
CREATE INDEX idx_orders_userId ON Orders(userId);

-- Index on closeTime for chronological sorting
CREATE INDEX idx_orders_closeTime ON Orders(closeTime DESC);

-- Composite index for filtered queries
CREATE INDEX idx_orders_user_time ON Orders(userId, closeTime DESC);

Query Optimization

// Only select needed fields
const closeOrders = await prisma.orders.findMany({
  where: { userId },
  select: {
    orderId: true,
    symbol: true,
    type: true,
    quantity: true,
    openPrice: true,
    closePrice: true,
    openTime: true,
    closeTime: true,
    profitLoss: true
  },
  orderBy: { closeTime: "desc" },
  take: 100 // Limit to recent 100 orders
});

Monitoring

Performance Metrics

let ordersStored = 0;
let queriesProcessed = 0;
let errors = 0;

setInterval(() => {
  console.log({
    ordersStored,
    queriesProcessed,
    errors,
    uptime: process.uptime()
  });
  ordersStored = 0;
  queriesProcessed = 0;
  errors = 0;
}, 60000);

Build docs developers (and LLMs) love