Skip to main content

Overview

Duckling supports three sync modes optimized for different use cases:
  1. Full Sync - Complete data refresh using Appender API (60K+ rows/sec)
  2. Incremental Sync - Watermark-based delta processing (10K rows/sec)
  3. CDC (Change Data Capture) - Real-time binlog streaming (sub-second latency)

Full Sync

Use Cases

  • Initial database setup
  • Schema changes requiring rebuild
  • Recovery from sync failures
  • Tables without timestamp columns

How It Works

1. Create staging table with same schema
2. Stream all records from MySQL → Append to staging
3. Atomic swap: DELETE + INSERT in transaction
4. CHECKPOINT to ensure durability
5. Update watermark for next incremental sync

API Endpoint

curl -X POST 'http://localhost:3001/api/sync/full?db=lms' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

Performance

Speed

60,000+ rows/sec using Appender API

Memory

<500 MB for datasets up to 60M records

Atomicity

ACID transactions prevent partial states

Crash-Safe

Staging tables protect production data

Example Output

{
  "totalTables": 45,
  "successfulTables": 45,
  "failedTables": 0,
  "totalRecords": 12500000,
  "totalDuration": 320000,
  "syncDetails": {
    "sequential": 45,
    "watermark": 0
  }
}
Full sync for a 60M record database completes in ~16 minutes (62,500 rows/sec average). The same operation with traditional INSERT would take 1.6+ hours.

Incremental Sync

Use Cases

  • Regular scheduled updates (default: every 15 minutes)
  • Tables with updatedAt/createdAt columns
  • Efficient delta processing
  • Production steady-state operation

Watermark-Based Tracking

Incremental sync uses watermarks to track the last processed record:
CREATE TABLE appender_watermarks (
  table_name VARCHAR PRIMARY KEY,
  last_processed_id BIGINT,
  last_processed_timestamp TIMESTAMP,
  primary_key_column VARCHAR,
  timestamp_column VARCHAR,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Timestamp Detection Priority

The system automatically detects the best timestamp column:
  1. updatedAt / updated_at / modifiedAt / modified_at (highest priority)
    • Captures all record modifications
    • Best for transactional tables (User, Order, Product)
  2. createdAt / created_at (fallback)
    • For append-only tables (Event, Log, Audit)
    • Tracks new records only
  3. timestamp (final fallback)
    • Generic timestamp column
    • Legacy systems

Query Pattern

-- Incremental query (note: uses >= not >)
SELECT * FROM TableName 
WHERE updatedAt >= '2025-10-30 12:00:00'
ORDER BY updatedAt ASC
The system uses >= (not >) to prevent data loss at timestamp boundaries. The last record is re-processed each sync but safely handled by INSERT OR REPLACE.

INSERT OR REPLACE Behavior

Incremental sync uses upsert for all records:
INSERT OR REPLACE INTO TableName (col1, col2, ...) VALUES (?, ?, ...)
ScenarioBehavior
New record (PK not exists)INSERT
Updated record (PK exists)REPLACE (update)
Duplicate re-processingREPLACE (idempotent)

API Endpoint

curl -X POST 'http://localhost:3001/api/sync/incremental?db=lms' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

Performance

Speed

10,000 rows/sec using INSERT OR REPLACE

Efficiency

Only processes changed records

Idempotent

Safe to re-run without duplicates

Fallback

Auto-switches to full sync if no watermark

Example Output

{
  "totalTables": 45,
  "successfulTables": 45,
  "failedTables": 0,
  "totalRecords": 1250,
  "totalDuration": 8500,
  "syncDetails": {
    "sequential": 3,
    "watermark": 42
  }
}
Incremental sync processes only changed records. For a database with 1,000 updates/15min, sync completes in ~8 seconds vs 16 minutes for full sync.

CDC (Change Data Capture)

Use Cases

  • Real-time data replication (sub-second latency)
  • Event-driven architectures
  • Live dashboards and analytics
  • Minimal replication lag requirements

How It Works

CDC streams MySQL binlog events in real-time:
MySQL Binlog → @vlasky/zongji → Event Queue → DuckDB
     ↓              ↓              ↓           ↓
  WriteRows    Parse Event    Serialize   INSERT
  UpdateRows   + Sanitize     Processing  UPDATE
  DeleteRows   + Filter       (FIFO)      DELETE

Implementation (cdcService.ts)

import ZongJi from '@vlasky/zongji';

const zongji = new ZongJi({
  host: config.mysqlHost,
  port: config.mysqlPort,
  user: config.mysqlUser,
  password: config.mysqlPassword,
  ssl: { rejectUnauthorized: true }
});

zongji.on('binlog', async (event) => {
  const eventName = event.getTypeName();
  const tableName = event.tableMap[event.tableId].tableName;
  
  switch (eventName) {
    case 'WriteRows':
      await this.handleInsert(tableName, event.rows);
      break;
    case 'UpdateRows':
      await this.handleUpdate(tableName, event.rows);
      break;
    case 'DeleteRows':
      await this.handleDelete(tableName, event.rows);
      break;
  }
  
  // Save binlog position for resume
  await this.savePosition(event.binlogName, event.nextPosition);
});

Event Processing

INSERT (WriteRows)

await this.handleInsert(tableName, rows);
// → INSERT OR REPLACE INTO table (...) VALUES (...)

UPDATE (UpdateRows)

await this.handleUpdate(tableName, rows);
// → INSERT OR REPLACE INTO table (...) VALUES (row.after)

DELETE (DeleteRows)

await this.handleDelete(tableName, rows);
// → DELETE FROM table WHERE id = ?

Binlog Position Tracking

CDC tracks binlog position for crash recovery:
CREATE TABLE cdc_binlog_position (
  database_id VARCHAR PRIMARY KEY,
  filename VARCHAR NOT NULL,
  position BIGINT NOT NULL,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Resume on restart:
const lastPosition = await this.getLastPosition();
if (lastPosition) {
  zongji.start({
    filename: lastPosition.filename,
    position: lastPosition.position
  });
} else {
  zongji.start({ startAtEnd: true });
}

Backpressure & Queue Management

CDC uses event queue with backpressure to prevent memory exhaustion:
// Pause binlog stream when queue is full
if (this.eventQueue.length >= MAX_QUEUE_SIZE) {
  this.isPaused = true;
  this.pauseBinlogStream();
}

// Resume when queue drains below 50%
if (this.isPaused && this.eventQueue.length < MAX_QUEUE_SIZE / 2) {
  this.isPaused = false;
  this.resumeBinlogStream();
}

API Endpoints

# Start CDC
curl -X POST 'http://localhost:3001/api/cdc/start?db=lms' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

# Stop CDC
curl -X POST 'http://localhost:3001/api/cdc/stop?db=lms' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

# Get CDC stats
curl 'http://localhost:3001/api/cdc/stats?db=lms' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

CDC Statistics

{
  "isRunning": true,
  "connectedAt": "2025-11-06T10:30:00Z",
  "lastEventAt": "2025-11-06T12:45:30Z",
  "eventsProcessed": 125000,
  "insertsProcessed": 45000,
  "updatesProcessed": 65000,
  "deletesProcessed": 15000,
  "errors": 0,
  "currentPosition": {
    "filename": "mysql-bin.000042",
    "position": 1234567890,
    "timestamp": "2025-11-06T12:45:30Z"
  },
  "queueSize": 12,
  "queueHighWaterMark": 450
}

Performance

Latency

Sub-second replication lag

Throughput

Handles 1000+ writes/sec

Ordering

Sequential processing preserves order

Recovery

Auto-resume from last position
CDC provides near real-time replication with typical latency of 100-500ms. Perfect for live dashboards and event-driven applications.

Sync Mode Comparison

FeatureFull SyncIncremental SyncCDC
Speed60K+ rows/sec10K rows/secSub-second
Use CaseInitial loadScheduled updatesReal-time
Memory<500 MB<100 MB<50 MB
LatencyMinutesSecondsMilliseconds
RequirementsNoneTimestamp columnBinlog enabled
Idempotent✅ Yes✅ Yes✅ Yes
ACID✅ Yes✅ Yes⚠️ Per-event

Sync Strategy Recommendations

Production Setup

1. Initial Load: Full Sync (one-time)
2. Steady State: Incremental Sync (every 15 minutes)
3. Real-Time (optional): CDC for critical tables

Configuration

# Enable incremental sync (default: true)
ENABLE_INCREMENTAL_SYNC=true

# Auto-start sync on boot (default: true)
AUTO_START_SYNC=true

# Sync interval in minutes (default: 15)
SYNC_INTERVAL_MINUTES=15

# Batch size for streaming (default: 1000)
BATCH_SIZE=1000

Watermark Management Best Practices

Checkpoint Frequency

Incremental sync checkpoints watermark every 10 batches:
const WATERMARK_CHECKPOINT_INTERVAL = 10;
if (batchesSinceCheckpoint >= WATERMARK_CHECKPOINT_INTERVAL) {
  await this.updateWatermark(tableName, watermark);
  batchesSinceCheckpoint = 0;
}

Boundary Handling

Critical: Use >= (not >) in incremental queries to prevent data loss at timestamp boundaries. INSERT OR REPLACE handles duplicate re-processing idempotently.

Fallback Strategy

If watermark is invalid or missing:
if (!watermark || !watermark.timestampColumn) {
  logger.warn(`No valid watermark, falling back to full sync`);
  return await this.syncTableSequentialWithAppender(tableName);
}

Monitoring Sync Operations

Sync Logs

Every sync operation is logged:
SELECT 
  table_name,
  sync_type,
  records_processed,
  duration_ms,
  status,
  created_at
FROM sync_log
WHERE status = 'success'
ORDER BY created_at DESC
LIMIT 10;

API Endpoint

curl 'http://localhost:3001/api/sync-logs?limit=20&status=success' \
  -H 'Authorization: Bearer ${DUCKLING_API_KEY}'

Next Steps

Architecture

Understand the complete system architecture

Sequential Appender

Deep dive into 60K+ rows/sec performance

Multi-Database

Managing multiple isolated replicas

Build docs developers (and LLMs) love