The Batch Upload service consumes raw trade data from a Redis queue and efficiently stores it in TimescaleDB. This historical trade data is used to generate candlestick charts at various time intervals (1m, 5m, 15m, 30m, 1h, 4h, 1d) via TimescaleDB’s continuous aggregates.Location:apps/Batch_Upload/src/index.ts:1Database: TimescaleDB (PostgreSQL extension)Data Source: Redis Queue (populated by Price Poller)
import { constant, config, redisClient } from "@repo/config";import "dotenv/config";import { timeScaleDB } from "@repo/timescaledb";const db = timeScaleDB();async function main() { // Connect to TimescaleDB await db.connect(); await db.setupTimescale(); // Connect to Redis Queue const RedisClient = redisClient(config.REDIS_URL); await RedisClient.connect(); let BATCH_SIZE = 0; const BATCH_LIMIT = 100; // Continuous processing loop while (true) { try { const msg = await RedisClient.popData(constant.redisQueue); if (msg) { const trade = JSON.parse(msg); await processTrade(trade); BATCH_SIZE++; } else { // Queue empty, wait before checking again await new Promise((resolve) => setTimeout(resolve, 100)); } if (BATCH_SIZE >= BATCH_LIMIT) { BATCH_SIZE = 0; } } catch (err) { console.error("Error processing trade:", err); } }}main();
The service uses a simple counter-based batching strategy. While individual INSERTs are executed, TimescaleDB’s optimized storage engine handles this efficiently. Future optimization could implement true batch INSERT statements.
CREATE TABLE trades ( time TIMESTAMPTZ NOT NULL, symbol TEXT NOT NULL, price NUMERIC NOT NULL, volume NUMERIC NOT NULL, trade_id BIGINT NOT NULL, side TEXT NOT NULL, -- 'buy' or 'sell' UNIQUE (symbol, trade_id));-- Convert to hypertable (partitioned by time)SELECT create_hypertable('trades', 'time');-- Create indexes for fast queriesCREATE INDEX idx_trades_symbol_time ON trades (symbol, time DESC);CREATE INDEX idx_trades_time ON trades (time DESC);
TimescaleDB automatically maintains materialized views for candlestick data:
CREATE MATERIALIZED VIEW candles_1mWITH (timescaledb.continuous) ASSELECT time_bucket('1 minute', time) AS bucket, symbol, FIRST(price, time) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price, time) AS close, SUM(volume) AS volume, COUNT(*) AS trade_countFROM tradesGROUP BY bucket, symbol;
Continuous aggregates are incrementally updated as new trades are inserted, providing efficient O(1) queries for candlestick data regardless of the underlying trade volume.
let BATCH_SIZE = 0;const BATCH_LIMIT = 100; // Process up to 100 trades before checkpointif (BATCH_SIZE >= BATCH_LIMIT) { BATCH_SIZE = 0; // Future: Could implement COMMIT here if using transactions}
Single Consumer Pattern: The current implementation uses a single consumer (LPOP) which prevents horizontal scaling. To scale, implement consumer groups or partition by symbol.
// Option 1: Partition by symbolconst symbols = ['BTC_USDC_PERP', 'ETH_USDC_PERP', 'SOL_USDC_PERP'];const assignedSymbol = symbols[process.env.WORKER_ID % symbols.length];// Only process trades for assigned symbolif (trade.data.s === assignedSymbol) { await processTrade(trade);}// Option 2: Use Redis Streams instead of queueconst result = await RedisStreams.readNextFromRedisStream( 'trades-stream', 0, { consumerGroup: 'batch-upload-group', consumerName: 'worker-1' });