Skip to main content
The @repo/timescaledb package provides a specialized PostgreSQL client for time-series trading data using TimescaleDB extensions.

Installation

This package is internal to the monorepo and installed automatically:
"dependencies": {
  "@repo/timescaledb": "workspace:*"
}

Features

  • Hypertables: Automatic partitioning for time-series data
  • Continuous Aggregates: Pre-computed candlestick data at multiple intervals
  • Data Compression: Automatic compression of older data
  • Retention Policies: Automatic cleanup of old data
  • Performance Optimized: Indexes and policies for fast queries

Quick Start

import { timeScaleDB } from '@repo/timescaledb';

const db = timeScaleDB();
await db.connect();
await db.setupTimescale();

// Now ready to insert and query trade data

Database Schema

Trades Table (Hypertable)

The main table stores individual trade data:
CREATE TABLE trades (
  time        TIMESTAMPTZ       NOT NULL,
  symbol      VARCHAR(20)       NOT NULL,
  price       NUMERIC(20,8)     NOT NULL,
  volume      NUMERIC(20,8)     NOT NULL,
  trade_id    BIGINT            NOT NULL,
  side        VARCHAR(4)        NOT NULL CHECK (side IN ('buy', 'sell')),
  PRIMARY KEY (time, symbol, trade_id)
);
This table is automatically converted to a hypertable for optimal time-series performance.

Continuous Aggregates (Candlesticks)

Pre-computed candlestick data at multiple intervals:
  • 1 minute (candles_1m) - 7 days retention
  • 5 minutes (candles_5m) - 14 days retention
  • 15 minutes (candles_15m) - 1 month retention
  • 30 minutes (candles_30m) - 2 months retention
  • 1 hour (candles_1h) - 6 months retention
  • 4 hours (candles_4h) - 1 year retention
  • 1 day (candles_1d) - 2 years retention
  • 1 week (candles_1w) - 5 years retention
  • 1 month (candles_1mo) - 10 years retention
  • 1 year (candles_1y) - 50 years retention
Each aggregate contains:
  • symbol: Trading pair
  • bucket: Time bucket
  • open: Opening price
  • high: Highest price
  • low: Lowest price
  • close: Closing price
  • volume: Total volume
  • trade_count: Number of trades

Setup and Initialization

Connect to Database

import { timeScaleDB } from '@repo/timescaledb';

const db = timeScaleDB();

try {
  await db.connect();
  console.log('Connected to TimescaleDB');
} catch (error) {
  console.error('Failed to connect:', error);
}

Initialize Schema

// Creates hypertable, continuous aggregates, and policies
await db.setupTimescale();

// This will:
// 1. Enable TimescaleDB extension
// 2. Create trades hypertable
// 3. Add compression policy (30 days)
// 4. Add retention policy (90 days)
// 5. Create 10 continuous aggregates
// 6. Add refresh policies for each aggregate
// 7. Refresh aggregates if data exists
The setupTimescale() method is idempotent - it’s safe to run multiple times.

Inserting Trade Data

Single Trade Insert

const client = db.getClient();

await client.query(`
  INSERT INTO trades (time, symbol, price, volume, trade_id, side)
  VALUES ($1, $2, $3, $4, $5, $6)
`, [
  new Date(),
  'BTCUSDT',
  50000.00,
  1.5,
  Date.now(),
  'buy'
]);

Batch Insert

import { timeScaleDB } from '@repo/timescaledb';

const db = timeScaleDB();
await db.connect();
const client = db.getClient();

// Prepare batch data
const trades = [
  { time: new Date(), symbol: 'BTCUSDT', price: 50000, volume: 1.0, side: 'buy' },
  { time: new Date(), symbol: 'ETHUSDT', price: 3000, volume: 5.0, side: 'sell' },
  { time: new Date(), symbol: 'SOLUSDT', price: 100, volume: 50.0, side: 'buy' }
];

// Insert batch
for (const trade of trades) {
  await client.query(`
    INSERT INTO trades (time, symbol, price, volume, trade_id, side)
    VALUES ($1, $2, $3, $4, $5, $6)
  `, [
    trade.time,
    trade.symbol,
    trade.price,
    trade.volume,
    Date.now() + Math.random() * 1000,
    trade.side
  ]);
}

console.log(`Inserted ${trades.length} trades`);

Querying Data

Query Recent Trades

const client = db.getClient();

const result = await client.query(`
  SELECT time, symbol, price, volume, side
  FROM trades
  WHERE time > NOW() - INTERVAL '1 hour'
  ORDER BY time DESC
  LIMIT 100
`);

console.log(`Found ${result.rows.length} recent trades`);

Query Candlestick Data

const candles = await client.query(`
  SELECT 
    bucket as time,
    symbol,
    open,
    high,
    low,
    close,
    volume,
    trade_count
  FROM candles_1m
  WHERE symbol = $1
    AND bucket > NOW() - INTERVAL '1 hour'
  ORDER BY bucket ASC
`, ['BTCUSDT']);

console.log(`Retrieved ${candles.rows.length} 1m candles`);

Aggregation Queries

// Calculate average price over time
const avgPrice = await client.query(`
  SELECT 
    time_bucket('1 hour', time) as hour,
    symbol,
    AVG(price) as avg_price,
    SUM(volume) as total_volume
  FROM trades
  WHERE time > NOW() - INTERVAL '24 hours'
  GROUP BY hour, symbol
  ORDER BY hour DESC
`);

// Find high/low for the day
const dayStats = await client.query(`
  SELECT 
    symbol,
    MIN(price) as low,
    MAX(price) as high,
    FIRST(price, time) as open,
    LAST(price, time) as close
  FROM trades
  WHERE time > date_trunc('day', NOW())
  GROUP BY symbol
`);

Refreshing Aggregates

Manual Refresh

Continuous aggregates are refreshed automatically, but you can trigger manual refreshes:
// Refresh all aggregates with existing data
const result = await db.refreshAllContinuousAggregates();

console.log('Refreshed aggregates:', result.refreshed);
if (result.errors.length > 0) {
  console.log('Errors:', result.errors);
}

Refresh Specific Time Range

// Refresh specific time range
const result = await db.refreshAllContinuousAggregates({
  from: '2024-01-01T00:00:00Z',
  to: '2024-01-31T23:59:59Z'
});

console.log(`Refreshed ${result.refreshed.length} aggregates`);
console.log(`Time range: ${result.timeRange.from} to ${result.timeRange.to}`);

Complete Trading Example

Here’s a complete example integrating TimescaleDB with real-time trading:
import { timeScaleDB } from '@repo/timescaledb';
import { redisStreams, config, constant } from '@repo/config';

// Initialize
const db = timeScaleDB();
const streams = redisStreams(config.REDIS_URL);

await db.connect();
await streams.connect();
await db.setupTimescale();

const client = db.getClient();

// Process incoming trades from Redis stream
while (true) {
  const trade = await streams.readNextFromRedisStream(
    constant.dbStorageStream,
    0 // Block indefinitely for next trade
  );
  
  if (trade) {
    // Insert into TimescaleDB
    await client.query(`
      INSERT INTO trades (time, symbol, price, volume, trade_id, side)
      VALUES ($1, $2, $3, $4, $5, $6)
    `, [
      new Date(trade.timestamp),
      trade.symbol,
      trade.price,
      trade.volume,
      trade.tradeId,
      trade.side
    ]);
    
    console.log(`Stored ${trade.symbol} trade: $${trade.price}`);
  }
}

// Cleanup
process.on('SIGTERM', async () => {
  await db.disconnect();
  await streams.disconnect();
});

Data Policies

Compression Policy

Data older than 30 days is automatically compressed:
-- Applied automatically by setupTimescale()
ALTER TABLE trades SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'symbol',
  timescaledb.compress_orderby = 'time DESC'
);

SELECT add_compression_policy('trades', INTERVAL '30 days');

Retention Policy

Data older than 90 days is automatically deleted:
-- Applied automatically by setupTimescale()
SELECT add_retention_policy('trades', INTERVAL '90 days');
Adjust retention policies based on your storage requirements and compliance needs.

Performance Tips

Choose the right continuous aggregate for your use case:
  • Real-time charts: Use 1m or 5m candles
  • Historical analysis: Use 1h, 4h, or 1d candles
  • Long-term trends: Use 1w, 1mo, or 1y candles
This avoids querying raw trades data unnecessarily.
Always include both symbol and time filters:
// Good - uses indexes efficiently
WHERE symbol = 'BTCUSDT' AND time > NOW() - INTERVAL '1 hour'

// Avoid - slower without symbol filter
WHERE time > NOW() - INTERVAL '1 hour'
Instead of querying raw data, use time_bucket:
SELECT 
  time_bucket('5 minutes', time) as bucket,
  AVG(price) as avg_price
FROM trades
WHERE symbol = 'BTCUSDT'
GROUP BY bucket
Insert multiple trades in a single transaction:
await client.query('BEGIN');
for (const trade of trades) {
  await client.query(insertQuery, params);
}
await client.query('COMMIT');

API Reference

timeScaleDB()

Creates a new TimescaleDB client instance. Methods:
connect()
Promise<void>
Connect to TimescaleDB using config from @repo/config
getClient()
pg.Client
Get the underlying PostgreSQL client for queries
setupTimescale()
Promise<void>
Initialize schema, hypertables, continuous aggregates, and policies
refreshAllContinuousAggregates(timeRange?)
Promise<object>
Manually refresh all continuous aggregatesParameters:
  • timeRange (optional): { from: string, to: string }
Returns:
{
  refreshed: string[],
  errors: Array<{ aggregate: string, error: string }>,
  timeRange: { from: string, to: string }
}
disconnect()
Promise<void>
Close the database connection

@repo/config

Provides TimescaleDB connection configuration

@repo/db

PostgreSQL client for relational data

Build docs developers (and LLMs) love