Skip to main content
This guide demonstrates how to set up a high-performance data replication pipeline from PostgreSQL to MotherDuck using DLT (Data Loading Tool). The pipeline leverages ConnectorX for efficient extraction and Parquet format for interim storage.

Overview

The DLT pipeline allows you to:
  • Extract data from specified PostgreSQL tables in parallel
  • Transform data during the normalization phase
  • Load data into MotherDuck with configurable batch sizes
  • Track detailed metrics about each pipeline run

Requirements

  • Python 3.11 or higher
  • PostgreSQL database (source)
  • MotherDuck account and token

Installation

1

Install uv (recommended)

uv is a fast Python package installer built in Rust.
curl -LsSf https://astral.sh/uv/install.sh | sh
2

Install dependencies

Create a pyproject.toml file with the following dependencies:
pyproject.toml
[project]
name = "dlt-db-replication"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "dlt[motherduck]>=1.7.0",
    "connectorx<0.4.2",
    "humanize>=4.12.1",
    "psycopg2-binary>=2.9.10",
    "sqlalchemy>=2.0.38",
]
Why these dependencies?
  • dlt[motherduck]: Core DLT library with MotherDuck support
  • connectorx: High-performance database connector for PostgreSQL
  • psycopg2-binary: PostgreSQL adapter for Python
  • sqlalchemy: SQL toolkit and ORM
  • humanize: Human-readable output formatting
Install with:
uv sync
3

Configure credentials

Create a .dlt/secrets.toml file with your database credentials:
.dlt/secrets.toml
# PostgreSQL credentials
[sources.sql_database.credentials]
drivername = "postgresql"
database = "your_database_name"
host = "your_postgres_host"
password = "your_postgres_password"
port = 5432
username = "your_postgres_username"

# MotherDuck credentials
[destination.motherduck.credentials]
token = "your_motherduck_token"
Never commit .dlt/secrets.toml to version control. Add it to your .gitignore file.

Configuration

Create a .dlt/config.toml file to control pipeline behavior:
Control log levels and telemetry:
.dlt/config.toml
[runtime]
log_level = "WARNING"  # Options: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
dlthub_telemetry = true
Define which PostgreSQL schema and tables to extract:
.dlt/config.toml
[sources.sql_database]
schema = "my_pg"

# List of tables to replicate
tables = [
    # Customer-related tables
    "customer",
    "customer_address",
    "customer_demographics",
    "household_demographics",
    "income_band",
    
    # Time-related tables
    "date_dim",
    "time_dim",
    
    # Product-related tables
    "item",
    "inventory",
    "warehouse",
    
    # Sales and returns tables
    "store_sales",
    "store_returns",
    "catalog_sales",
    "catalog_returns",
    "web_sales",
    "web_returns",
    
    # Store and catalog tables
    "store",
    "call_center",
    "catalog_page",
    
    # Web-related tables
    "web_page",
    "web_site",
    
    # Other dimension tables
    "promotion",
    "reason",
    "ship_mode"
]

# Number of parallel workers for data extraction
workers = 6
Match the pool size to your worker count:
.dlt/config.toml
[postgres]
# Should match the number of workers for optimal performance
pool_size = 6
Configure interim data storage format:
.dlt/config.toml
[data_writer]
# Parquet provides good compression and performance
format = "parquet"
Configure batch size for loading:
.dlt/config.toml
[destination.motherduck]
# Number of rows per batch - larger sizes improve performance
batch_size = 1000000
Start with 1,000,000 rows per batch and adjust based on your available memory and performance requirements.
Configure workers for each pipeline stage:
.dlt/config.toml
# Extract stage configuration
[extract]
workers = 8

# Normalize stage configuration (data transformation)
[normalize]
workers = 4

# Load stage configuration (loading to MotherDuck)
[load]
workers = 4
Higher worker counts improve performance but require more system resources. Adjust based on your infrastructure.

Pipeline Implementation

Create sql_database_pipeline.py with the following code:
sql_database_pipeline.py
import dlt
from dlt.sources.sql_database import sql_database

def use_config_tables() -> None:
    """
    Load tables specified in .dlt/config.toml into MotherDuck using replace mode.

    This function:
    1. Creates a DLT pipeline with MotherDuck as the destination
    2. Retrieves table configuration from the DLT config file
    3. Sets up a SQL database source with those tables
    4. Executes the pipeline to extract, transform, and load the data
    5. Logs detailed runtime metrics about the pipeline execution

    Raises:
        ValueError: If no tables are configured in the configuration file
    """
    # Initialize a DLT pipeline with MotherDuck as the destination
    pipeline = dlt.pipeline(
        pipeline_name="pg2md", 
        destination="motherduck", 
        dataset_name="pg2md_data"
    )

    # Retrieve the configured tables from the DLT config file
    tables = dlt.config.get("sources.sql_database.tables")

    # Validate that tables are configured
    if not tables:
        raise ValueError(
            "No tables configured in .dlt/config.toml under [sources.sql_database.tables]"
        )

    # Create the SQL database source with:
    # - backend="connectorx": Uses ConnectorX for efficient data extraction
    # - parallelize(): Enables parallel processing for improved performance
    # - with_resources(*tables): Specifies which tables to extract
    source = sql_database(backend="connectorx").parallelize().with_resources(*tables)

    # Execute the pipeline with write_disposition="replace"
    # This drops existing tables and recreates them with fresh data
    pipeline.run(source, write_disposition="replace")

if __name__ == "__main__":
    use_config_tables()

Write Disposition Options

The pipeline uses write_disposition="replace" which drops and recreates tables on each run. Other options include:
  • append: Add new rows to existing tables
  • merge: Update existing rows and add new ones (requires primary key)
Learn more about write dispositions in the DLT documentation.

Running the Pipeline

# Run directly with uv - handles environment creation automatically
uv run sql_database_pipeline.py
The pipeline will:
  1. Connect to your PostgreSQL database
  2. Extract the configured tables in parallel using ConnectorX
  3. Transform the data during normalization
  4. Load the data into MotherDuck in batches
  5. Output detailed metrics about the run

Performance Optimization

The pipeline is configured for high performance with:
1

Parallel Extraction

Extract data from PostgreSQL using multiple workers:
[sources.sql_database]
workers = 6

[postgres]
pool_size = 6  # Match worker count
This allows multiple tables to be extracted simultaneously.
2

Efficient Backend

Use ConnectorX for fast data extraction:
source = sql_database(backend="connectorx").parallelize()
ConnectorX is significantly faster than traditional database adapters.
3

Parquet Format

Use Parquet for interim storage:
[data_writer]
format = "parquet"
Parquet provides efficient compression and columnar storage.
4

Large Batch Sizes

Load data to MotherDuck in large batches:
[destination.motherduck]
batch_size = 1000000
Larger batches reduce network overhead and improve throughput.
5

Stage Parallelization

Configure workers for each pipeline stage:
[extract]
workers = 8

[normalize]
workers = 4

[load]
workers = 4
This enables parallel processing across the entire pipeline.

Troubleshooting

If you encounter connection errors:
  1. Verify PostgreSQL credentials in .dlt/secrets.toml
  2. Check network connectivity to PostgreSQL host
  3. Ensure PostgreSQL allows connections from your IP
  4. Verify MotherDuck token is valid
Test your PostgreSQL connection:
psql -h your_host -U your_username -d your_database
If you experience out-of-memory errors:
  1. Reduce batch size in MotherDuck configuration:
    [destination.motherduck]
    batch_size = 500000  # Reduced from 1000000
    
  2. Decrease worker counts:
    [sources.sql_database]
    workers = 4  # Reduced from 6
    
    [extract]
    workers = 4  # Reduced from 8
    
  3. Process fewer tables at once by splitting tables configuration
If the pipeline is slower than expected:
  1. Increase parallelization settings:
    [sources.sql_database]
    workers = 8  # Increased
    
    [extract]
    workers = 12  # Increased
    
  2. Verify ConnectorX backend is being used:
    source = sql_database(backend="connectorx")
    
  3. Check PostgreSQL server load and connection limits
  4. Monitor network bandwidth between PostgreSQL and your runner
Enable debug logging to identify bottlenecks:
[runtime]
log_level = "DEBUG"
If specific tables cannot be found:
  1. Verify the schema name in .dlt/config.toml:
    [sources.sql_database]
    schema = "my_pg"  # Must match your PostgreSQL schema
    
  2. Check table names are correct and exist in PostgreSQL:
    SELECT table_name FROM information_schema.tables 
    WHERE table_schema = 'my_pg';
    
  3. Ensure the PostgreSQL user has SELECT permissions on the tables

Next Steps

  • Explore DLT incremental loading for efficient updates
  • Learn about DLT transformations to modify data during pipeline execution
  • Set up monitoring and alerting for production pipelines
  • Consider scheduling the pipeline with cron, Airflow, or other orchestration tools

Build docs developers (and LLMs) love