Skip to main content

Overview

The Sequential Appender is Duckling’s core innovation, providing 60,000+ rows/sec bulk loading performance using DuckDB’s native Appender API. This is 6x faster than traditional INSERT statements and enables syncing massive datasets (60M+ records) in minutes instead of hours.

Performance Comparison

MethodSpeedUse Case
Appender API60,000+ rows/secFull sync (bulk loading)
Traditional INSERT~10,000 rows/secIncremental sync (upserts)
CDC (Real-time)Sub-second latencyLive replication
The Appender API is 6x faster than bulk INSERT because it writes directly to DuckDB’s binary format without SQL parsing overhead.

Implementation (sequentialAppenderService.ts)

Unified Architecture

Duckling uses a unified @duckdb/node-api architecture for all database operations:
import { DuckDBConnection } from '@duckdb/node-api';

// Single package for queries + appends
const { appender, connection } = await this.duckdb.createAppender(tableName);

Benefits

60K+ rows/sec

Direct binary append, 6x faster than INSERT

No Argument Limits

INSERT limited to 65K args (V8), Appender has no limit

Lower Memory

Binary append, no SQL string construction

All Types Supported

JSON, BLOB, BINARY, all numeric/date types

Full Sync Workflow

1. Crash-Safe Staging Table

Full sync uses staging tables to prevent data loss on crashes:
// Build unique staging table name
const stagingTable = `__full_sync_staging_${tableName}_${randomUUID()}`;

// Create staging table with same schema
await this.createTable(stagingTable, schema);

2. Appender-Based Insert

Stream records from MySQL and append to staging table:
const { appender, connection } = await this.duckdb.createAppender(stagingTable);

for await (const batch of this.mysql.streamTableData(tableName, fetchBatchSize)) {
  // Sanitize rows in worker thread pool
  const sanitizedRows = await pool.sanitizeBatch(batch, columns, columnTypes);
  
  // Append each row
  for (const row of sanitizedRows) {
    for (let i = 0; i < columns.length; i++) {
      this.appendValueByType(appender, row[i], columnTypes[columns[i]]);
    }
    appender.endRow();
  }
  
  // Flush periodically to free memory
  if (recordsProcessed - lastFlushed >= FLUSH_INTERVAL) {
    appender.flushSync();
    lastFlushed = recordsProcessed;
  }
}

appender.flushSync();
appender.closeSync();

3. Atomic Swap

Replace production table in a single transaction:
await this.duckdb.run('BEGIN TRANSACTION');
try {
  await this.duckdb.run(`DELETE FROM ${this.q(tableName)}`);
  await this.duckdb.run(`INSERT INTO ${this.q(tableName)} SELECT * FROM ${this.q(stagingTable)}`);
  await this.duckdb.run('COMMIT');
} catch (error) {
  await this.duckdb.run('ROLLBACK');
  throw error;
} finally {
  await this.dropTableIfExists(stagingTable);
}

4. Durability Checkpoint

Force WAL flush to ensure data persists:
await this.duckdb.checkpoint();
The staging table swap is atomic. If the server crashes mid-sync, the production table remains unchanged and the orphan staging table is automatically cleaned up on next sync.

Type Conversion

Supported MySQL Types

MySQL TypeDuckDB MappingAppender MethodNotes
INTEGER, BIGINT, TINYINTSameappendBigInt()All INT types map to BIGINT
VARCHAR, TEXTVARCHARappendVarchar()String types
BLOB, BINARY, VARBINARYBLOBappendBlob()Binary data (verified)
JSONJSONappendVarchar()Stringified via JSON.stringify()
DATEDATEappendVarchar()ISO format (YYYY-MM-DD)
DATETIME, TIMESTAMPTIMESTAMPappendTimestamp()Parsed to DuckDBTimestampValue
TIMETIMEappendTime()Parsed to DuckDBTimeValue
DECIMAL, NUMERICDECIMALappendVarchar()String representation
BOOLEANBOOLEANappendBoolean()Boolean type

Type Conversion Implementation

private appendValueByType(appender: any, value: any, mysqlType: string): void {
  const lowerType = mysqlType.toLowerCase();
  
  if (value === null || value === undefined) {
    appender.appendNull();
    return;
  }
  
  // Integer types
  if (lowerType.includes('bigint')) {
    appender.appendBigInt(BigInt(value));
  } 
  // TIMESTAMP/DATETIME - use typed method to prevent corruption
  else if (lowerType.includes('timestamp') || lowerType.includes('datetime')) {
    const m = String(value).match(/^(\d{4})-(\d{2})-(\d{2})[T ](\d{2}):(\d{2}):(\d{2})(?:\.(\d+))?/);
    if (m) {
      const micros = m[7] ? parseInt(m[7].padEnd(6, '0').slice(0, 6), 10) : 0;
      appender.appendTimestamp(DuckDBTimestampValue.fromParts({
        date: { year: parseInt(m[1], 10), month: parseInt(m[2], 10), day: parseInt(m[3], 10) },
        time: { hour: parseInt(m[4], 10), min: parseInt(m[5], 10), sec: parseInt(m[6], 10), micros },
      }));
    } else {
      appender.appendNull();
    }
  }
  // ... (see source for full implementation)
}
Critical: Use typed append methods (appendTimestamp, appendTime) for fractional-second values. Using appendVarchar for TIMESTAMP/TIME columns with fractional seconds silently corrupts the Appender.

Data Sanitization

Worker Pool Offloading

Sanitization is CPU-intensive and offloaded to worker threads:
const pool = WorkerPool.getInstance();
const sanitizedRows = await pool.sanitizeBatch(batch, columns, columnTypes);

Invalid Value Handling

private sanitizeValue(value: any, columnType: string): any {
  // Handle MySQL invalid timestamps
  if (value === '0000-00-00 00:00:00' || value === '0000-00-00') {
    return null;
  }
  
  // Handle invalid TIME values (hours >= 24)
  if (lowerType.includes('time')) {
    const m = value.match(/^(\d+):(\d+):(\d+)/);
    if (m && parseInt(m[1], 10) >= 24) {
      logger.warn(`Invalid time value "${value}" (hours >= 24) - converting to NULL`);
      return null;
    }
  }
  
  // JSON stringification
  if (lowerType.includes('json')) {
    return typeof value === 'string' ? value : JSON.stringify(value);
  }
  
  return value;
}

Memory Management

Periodic Flushing

Flush appender every 5,000 records (configurable via APPENDER_FLUSH_INTERVAL):
if (recordsProcessed - lastFlushed >= FLUSH_INTERVAL) {
  logger.info(`Flushing appender at ${recordsProcessed} records to free memory...`);
  appender.flushSync();
  lastFlushed = recordsProcessed;
  
  // Force GC if available
  if (global.gc) {
    global.gc();
  }
}

Progress Logging

Log progress for large tables (every 10,000 records):
const percent = Math.min((recordsProcessed / totalRecords) * 100, 100).toFixed(1);
const memUsage = process.memoryUsage();
const heapUsedMB = (memUsage.heapUsed / 1024 / 1024).toFixed(1);
const heapTotalMB = (memUsage.heapTotal / 1024 / 1024).toFixed(1);

logger.info(`${tableName}: ${recordsProcessed}/${totalRecords} (${percent}%) | Memory: ${heapUsedMB}/${heapTotalMB} MB`);
Memory usage remains flat regardless of table size thanks to streaming batches and periodic flushing. A 60M row table processes with <500MB memory footprint.

Automatic Fallback

If Appender fails for any reason, automatically fall back to INSERT:
try {
  return await this.syncTableSequentialWithAppender(tableName);
} catch (error) {
  logger.warn(`Appender failed, falling back to INSERT method:`, error);
  return await this.syncTableSequential(tableName);
}

Watermark Management

After successful sync, update watermark for incremental sync:
const primaryKeyColumn = await this.detectPrimaryKeyColumn(tableName, schema);
const maxResult = await this.duckdb.execute(
  `SELECT MAX(${this.q(primaryKeyColumn)}) as max_id FROM ${this.q(tableName)}`
);

await this.updateWatermark(tableName, {
  lastProcessedId: maxResult[0]?.max_id,
  lastProcessedTimestamp: new Date(),
  primaryKeyColumn: primaryKeyColumn,
  timestampColumn: await this.detectTimestampColumn(tableName, schema)
});

Real-World Performance

Case Study: 60M Records

MetricValue
Total Records60,000,000
Sync Time~16 minutes
Average Speed62,500 rows/sec
Memory Usage<500 MB
CPU Usage40-60% (4 cores)
Traditional INSERT would take ~1.6 hours for the same dataset. The Appender API reduces sync time by 83%.

Configuration

Environment Variables

# Batch size for MySQL streaming (default: 1000)
BATCH_SIZE=1000

# Flush interval for Appender (default: 5000)
APPENDER_FLUSH_INTERVAL=5000

# Insert batch size for fallback INSERT (default: 5000)
INSERT_BATCH_SIZE=5000

Tuning Tips

BATCH_SIZE

Larger batches reduce MySQL round-trips but increase memory

APPENDER_FLUSH_INTERVAL

Lower intervals reduce memory, higher improves speed

Worker Threads

Match CPU cores for optimal sanitization performance

Connection Pools

Increase pools for multi-database setups

Next Steps

Sync Modes

Learn about full sync, incremental sync, and CDC

Architecture

Understand the complete system architecture

Build docs developers (and LLMs) love