Skip to main content

Overview

Duckling enables microservices to query analytical data with 5-15ms latency through its WebSocket SDK. This guide shows you how to integrate Duckling into a microservices architecture for real-time analytics without overloading your operational MySQL database.

Architecture Pattern

┌─────────────────┐
│  Operational    │
│  MySQL DB       │
└────────┬────────┘
         │ Replication

┌─────────────────┐
│  Duckling       │
│  DuckDB Server  │
└────────┬────────┘
         │ WebSocket (5-15ms)

┌─────────────────────────────────────┐
│         Microservices Layer         │
├──────────┬──────────┬───────────────┤
│  API     │  Report  │  Dashboard    │
│  Service │  Service │  Service      │
└──────────┴──────────┴───────────────┘

Key Benefits

  • Isolate analytical queries from operational database
  • 5-10x faster queries through columnar storage
  • Real-time sync every 15 minutes (configurable)
  • No operational impact on transactional workload
  • WebSocket connections for low latency

Service Integration Patterns

Pattern 1: Single Shared Client

Create one client instance shared across all request handlers:
// services/duckdb.service.ts
import { DucklingClient } from '@chittihq/duckling';

class DuckDBService {
  private static instance: DuckDBService;
  private client: DucklingClient;

  private constructor() {
    this.client = new DucklingClient({
      url: process.env.DUCKLING_WS_URL || 'ws://duckling-server:3001/ws',
      apiKey: process.env.DUCKLING_API_KEY,
      databaseName: 'default',
      autoReconnect: true,
      maxReconnectAttempts: 5
    });

    // Event handlers
    this.client.on('connected', () => {
      console.log('Connected to DuckDB server');
    });

    this.client.on('error', (error: Error) => {
      console.error('DuckDB connection error:', error.message);
    });
  }

  public static getInstance(): DuckDBService {
    if (!DuckDBService.instance) {
      DuckDBService.instance = new DuckDBService();
    }
    return DuckDBService.instance;
  }

  public async query<T>(sql: string, params?: any[]): Promise<T[]> {
    return this.client.query<T>(sql, params);
  }

  public async queryBatch<T>(queries: string[]): Promise<T[][]> {
    return this.client.queryBatch(queries);
  }

  public close(): void {
    this.client.close();
  }
}

export default DuckDBService.getInstance();

Pattern 2: Connection Pool for High Concurrency

For services handling thousands of concurrent requests:
// services/duckdb-pool.service.ts
import { DucklingClient } from '@chittihq/duckling';

class DuckDBPoolService {
  private clients: DucklingClient[] = [];
  private currentIndex = 0;
  private poolSize: number;

  constructor(poolSize: number = 5) {
    this.poolSize = poolSize;
  }

  async initialize(): Promise<void> {
    console.log(`Initializing connection pool with ${this.poolSize} connections...`);

    const connectionPromises = Array.from({ length: this.poolSize }, async (_, i) => {
      const client = new DucklingClient({
        url: process.env.DUCKLING_WS_URL || 'ws://duckling-server:3001/ws',
        apiKey: process.env.DUCKLING_API_KEY,
        autoReconnect: true
      });

      await client.connect();
      this.clients.push(client);
      console.log(`Connection ${i + 1}/${this.poolSize} established`);
    });

    await Promise.all(connectionPromises);
    console.log('Connection pool ready');
  }

  private getNextClient(): DucklingClient {
    const client = this.clients[this.currentIndex];
    this.currentIndex = (this.currentIndex + 1) % this.clients.length;
    return client;
  }

  async query<T>(sql: string, params?: any[]): Promise<T[]> {
    const client = this.getNextClient();
    return client.query<T>(sql, params);
  }

  async queryBatch<T>(queries: string[]): Promise<T[][]> {
    return Promise.all(
      queries.map((sql, index) => {
        const client = this.clients[index % this.clients.length];
        return client.query<T>(sql);
      })
    );
  }

  close(): void {
    this.clients.forEach(client => client.close());
    this.clients = [];
  }
}

export default new DuckDBPoolService(5);

Use Case Examples

API Service: User Analytics Endpoint

// api-service/routes/analytics.ts
import express from 'express';
import duckdb from '../services/duckdb.service';

const router = express.Router();

interface UserStats {
  userId: number;
  name: string;
  email: string;
  totalOrders: number;
  totalRevenue: number;
  lastOrderDate: string;
}

// Get user analytics
router.get('/users/:id/analytics', async (req, res) => {
  try {
    const userId = parseInt(req.params.id);

    const stats = await duckdb.query<UserStats>(`
      SELECT
        u.id as userId,
        u.name,
        u.email,
        COUNT(o.id) as totalOrders,
        COALESCE(SUM(o.total), 0) as totalRevenue,
        MAX(o.createdAt) as lastOrderDate
      FROM User u
      LEFT JOIN \`Order\` o ON u.id = o.userId
      WHERE u.id = ?
      GROUP BY u.id, u.name, u.email
    `, [userId]);

    if (stats.length === 0) {
      return res.status(404).json({ error: 'User not found' });
    }

    res.json(stats[0]);
  } catch (error) {
    console.error('Analytics query failed:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

// Get top customers
router.get('/customers/top', async (req, res) => {
  try {
    const limit = parseInt(req.query.limit as string) || 10;

    const topCustomers = await duckdb.query<UserStats>(`
      SELECT
        u.id as userId,
        u.name,
        u.email,
        COUNT(o.id) as totalOrders,
        SUM(o.total) as totalRevenue
      FROM User u
      JOIN \`Order\` o ON u.id = o.userId
      GROUP BY u.id, u.name, u.email
      ORDER BY totalRevenue DESC
      LIMIT ?
    `, [limit]);

    res.json(topCustomers);
  } catch (error) {
    console.error('Top customers query failed:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

export default router;

Report Service: Daily Sales Report

// report-service/generators/daily-sales.ts
import duckdb from '../services/duckdb.service';

interface DailySales {
  date: string;
  orderCount: number;
  totalRevenue: number;
  avgOrderValue: number;
  topProduct: string;
  topProductRevenue: number;
}

export async function generateDailySalesReport(
  startDate: string,
  endDate: string
): Promise<DailySales[]> {
  const queries = [
    // Query 1: Daily aggregates
    `SELECT
      DATE_TRUNC('day', createdAt) as date,
      COUNT(*) as orderCount,
      SUM(total) as totalRevenue,
      AVG(total) as avgOrderValue
    FROM \`Order\`
    WHERE createdAt BETWEEN '${startDate}' AND '${endDate}'
    GROUP BY date
    ORDER BY date`,

    // Query 2: Top products per day
    `SELECT
      DATE_TRUNC('day', o.createdAt) as date,
      p.name as topProduct,
      SUM(oi.quantity * oi.price) as topProductRevenue
    FROM \`Order\` o
    JOIN OrderItem oi ON o.id = oi.orderId
    JOIN Product p ON oi.productId = p.id
    WHERE o.createdAt BETWEEN '${startDate}' AND '${endDate}'
    GROUP BY date, p.id, p.name
    ORDER BY date, topProductRevenue DESC`
  ];

  const [dailyStats, topProducts] = await duckdb.queryBatch(queries);

  // Combine results
  return (dailyStats as any[]).map((day: any, index: number) => ({
    date: day.date,
    orderCount: day.orderCount,
    totalRevenue: day.totalRevenue,
    avgOrderValue: day.avgOrderValue,
    topProduct: (topProducts as any[])[index]?.topProduct || 'N/A',
    topProductRevenue: (topProducts as any[])[index]?.topProductRevenue || 0
  }));
}

Dashboard Service: Real-Time Metrics

// dashboard-service/controllers/metrics.ts
import duckdb from '../services/duckdb.service';

interface DashboardMetrics {
  totalUsers: number;
  totalOrders: number;
  totalRevenue: number;
  avgOrderValue: number;
  ordersToday: number;
  revenueToday: number;
}

export async function getDashboardMetrics(): Promise<DashboardMetrics> {
  // Execute all metrics queries in parallel
  const queries = [
    'SELECT COUNT(*) as count FROM User',
    'SELECT COUNT(*) as count FROM `Order`',
    'SELECT COALESCE(SUM(total), 0) as sum FROM `Order`',
    'SELECT COALESCE(AVG(total), 0) as avg FROM `Order`',
    `SELECT COUNT(*) as count FROM \`Order\`
     WHERE DATE(createdAt) = CURRENT_DATE`,
    `SELECT COALESCE(SUM(total), 0) as sum FROM \`Order\`
     WHERE DATE(createdAt) = CURRENT_DATE`
  ];

  const results = await duckdb.queryBatch(queries);

  return {
    totalUsers: (results[0][0] as any).count,
    totalOrders: (results[1][0] as any).count,
    totalRevenue: (results[2][0] as any).sum,
    avgOrderValue: (results[3][0] as any).avg,
    ordersToday: (results[4][0] as any).count,
    revenueToday: (results[5][0] as any).sum
  };
}

Multi-Database Architecture

Support multiple databases in a single microservice:
// services/multi-db.service.ts
import { DucklingClient } from '@chittihq/duckling';

class MultiDatabaseService {
  private clients: Map<string, DucklingClient> = new Map();

  constructor(databases: Array<{ id: string; name: string }>) {
    databases.forEach(db => {
      const client = new DucklingClient({
        url: process.env.DUCKLING_WS_URL || 'ws://duckling-server:3001/ws',
        apiKey: process.env.DUCKLING_API_KEY,
        databaseName: db.id,
        autoReconnect: true
      });

      this.clients.set(db.id, client);
      console.log(`Initialized connection for database: ${db.name}`);
    });
  }

  async query<T>(databaseId: string, sql: string, params?: any[]): Promise<T[]> {
    const client = this.clients.get(databaseId);
    if (!client) {
      throw new Error(`Database ${databaseId} not found`);
    }
    return client.query<T>(sql, params);
  }

  close(): void {
    this.clients.forEach(client => client.close());
    this.clients.clear();
  }
}

// Initialize with your databases
const multiDB = new MultiDatabaseService([
  { id: 'lms', name: 'Learning Management System' },
  { id: 'chitti_common', name: 'Common Database' }
]);

export default multiDB;

// Usage in route handler
router.get('/lms/users', async (req, res) => {
  const users = await multiDB.query('lms', 'SELECT * FROM User LIMIT 10');
  res.json(users);
});

router.get('/common/actions', async (req, res) => {
  const actions = await multiDB.query('chitti_common', 'SELECT * FROM Action LIMIT 10');
  res.json(actions);
});

Docker Compose Setup

Integrate Duckling into your microservices stack:
version: '3.8'

services:
  # Operational MySQL database
  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: myapp
    volumes:
      - mysql-data:/var/lib/mysql
    networks:
      - microservices

  # Duckling DuckDB server
  duckling-server:
    image: your-registry/duckling-server:latest
    environment:
      - MYSQL_CONNECTION_STRING=mysql://root:root@mysql:3306/myapp
      - DUCKLING_API_KEY=${DUCKLING_API_KEY}
      - SYNC_INTERVAL_MINUTES=15
    volumes:
      - duckling-data:/app/data
    networks:
      - microservices
    depends_on:
      - mysql

  # API Service
  api-service:
    build: ./api-service
    environment:
      - DUCKLING_WS_URL=ws://duckling-server:3001/ws
      - DUCKLING_API_KEY=${DUCKLING_API_KEY}
    ports:
      - "4000:4000"
    networks:
      - microservices
    depends_on:
      - duckling-server

  # Report Service
  report-service:
    build: ./report-service
    environment:
      - DUCKLING_WS_URL=ws://duckling-server:3001/ws
      - DUCKLING_API_KEY=${DUCKLING_API_KEY}
    ports:
      - "4001:4001"
    networks:
      - microservices
    depends_on:
      - duckling-server

  # Dashboard Service
  dashboard-service:
    build: ./dashboard-service
    environment:
      - DUCKLING_WS_URL=ws://duckling-server:3001/ws
      - DUCKLING_API_KEY=${DUCKLING_API_KEY}
    ports:
      - "4002:4002"
    networks:
      - microservices
    depends_on:
      - duckling-server

networks:
  microservices:
    driver: bridge

volumes:
  mysql-data:
  duckling-data:

Error Handling & Resilience

Retry Logic with Circuit Breaker

// utils/circuit-breaker.ts
import duckdb from '../services/duckdb.service';

class CircuitBreaker {
  private failureCount = 0;
  private successCount = 0;
  private lastFailureTime = 0;
  private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';

  private readonly failureThreshold = 5;
  private readonly successThreshold = 2;
  private readonly timeout = 60000; // 1 minute

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = 'HALF_OPEN';
        console.log('Circuit breaker: HALF_OPEN');
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    this.failureCount = 0;
    this.successCount++;

    if (this.state === 'HALF_OPEN' && this.successCount >= this.successThreshold) {
      this.state = 'CLOSED';
      this.successCount = 0;
      console.log('Circuit breaker: CLOSED');
    }
  }

  private onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      console.log('Circuit breaker: OPEN');
    }
  }
}

export const circuitBreaker = new CircuitBreaker();

// Usage
export async function queryWithCircuitBreaker<T>(
  sql: string,
  params?: any[]
): Promise<T[]> {
  return circuitBreaker.execute(() => duckdb.query<T>(sql, params));
}

Graceful Degradation

// Fallback to cached data if DuckDB is unavailable
import { Redis } from 'ioredis';
import duckdb from '../services/duckdb.service';

const redis = new Redis(process.env.REDIS_URL);

export async function getMetricsWithFallback(): Promise<DashboardMetrics> {
  const cacheKey = 'dashboard:metrics';

  try {
    // Try DuckDB first
    const metrics = await getDashboardMetrics();

    // Cache for 5 minutes
    await redis.setex(cacheKey, 300, JSON.stringify(metrics));

    return metrics;
  } catch (error) {
    console.error('DuckDB query failed, using cached data:', error);

    // Fallback to cache
    const cached = await redis.get(cacheKey);
    if (cached) {
      return JSON.parse(cached);
    }

    throw new Error('DuckDB unavailable and no cached data');
  }
}

Performance Monitoring

Query Performance Tracking

// middleware/performance.ts
import { performance } from 'perf_hooks';

export async function trackQueryPerformance<T>(
  queryName: string,
  fn: () => Promise<T>
): Promise<T> {
  const start = performance.now();

  try {
    const result = await fn();
    const duration = performance.now() - start;

    console.log(`Query ${queryName} completed in ${duration.toFixed(2)}ms`);

    // Send to metrics service (e.g., Prometheus, DataDog)
    // metricsClient.histogram('duckdb.query.duration', duration, { query: queryName });

    return result;
  } catch (error) {
    const duration = performance.now() - start;
    console.error(`Query ${queryName} failed after ${duration.toFixed(2)}ms`);
    throw error;
  }
}

// Usage
const users = await trackQueryPerformance('get_top_customers', () =>
  duckdb.query<User>('SELECT * FROM User ORDER BY revenue DESC LIMIT 10')
);

Best Practices

Key Recommendations:
  1. Reuse connections: Create one client per database and reuse
  2. Use connection pools: For high-concurrency services (1000+ req/sec)
  3. Batch queries: Use queryBatch() for multiple related queries
  4. Handle errors: Implement circuit breakers and graceful degradation
  5. Monitor performance: Track query latency and throughput
  6. Cache strategically: Cache hot queries with short TTL
  7. Use TypeScript types: Define schema interfaces for type safety

Next Steps

Build docs developers (and LLMs) love