Skip to main content

SQL Database Source

The sql_database source loads data from SQL databases using SQLAlchemy. It supports any database with a SQLAlchemy driver, including PostgreSQL, MySQL, SQL Server, Oracle, and SQLite.

Quick Start

Load all tables from a PostgreSQL database:
import dlt
from dlt.sources.sql_database import sql_database

# Create the source
source = sql_database(
    credentials="postgresql://user:password@localhost:5432/mydb",
)

# Load all tables
pipeline = dlt.pipeline(
    pipeline_name="postgres_pipeline",
    destination="duckdb",
    dataset_name="postgres_data"
)

load_info = pipeline.run(source)
print(load_info)

Connection Methods

Connect to your database using different credential formats:
source = sql_database(
    credentials="postgresql://user:password@localhost:5432/mydb"
)

Selecting Tables

Control which tables to load:
# Load all tables in the default schema
source = sql_database(
    credentials="postgresql://user:password@localhost:5432/mydb"
)

Loading Individual Tables

Use sql_table to load a single table with more control:
from dlt.sources.sql_database import sql_table

# Load a single table
table = sql_table(
    credentials="postgresql://user:password@localhost:5432/mydb",
    table="customers",
    schema="public"
)

pipeline.run(table)

Incremental Loading

Load only new or updated records using incremental cursors:
1

Define the cursor column

Choose a column that indicates when records were created or updated:
from dlt.sources.sql_database import sql_table

orders = sql_table(
    credentials="postgresql://user:password@localhost:5432/mydb",
    table="orders",
    incremental=dlt.sources.incremental(
        cursor_path="updated_at",
        initial_value="2024-01-01T00:00:00Z"
    )
)
2

Run the pipeline

On first run, loads all records where updated_at >= initial_value:
pipeline.run(orders)
3

Subsequent runs

On subsequent runs, only loads records updated since the last run:
# Only loads new/updated records
pipeline.run(orders)
The incremental state is automatically persisted between pipeline runs. dlt tracks the maximum cursor value and uses it as the starting point for the next run.

Backend Options

Choose different backends for data extraction:
# Returns data as Python dictionaries
source = sql_database(
    credentials="postgresql://...",
    backend="sqlalchemy",
    chunk_size=10000
)
  • Returns Python dictionaries
  • No additional dependencies
  • Good for general use

Schema Reflection

Control how much schema information is reflected from the source database:
# Only table names, nullability, and primary keys
# Data types inferred from data
source = sql_database(
    credentials="postgresql://...",
    reflection_level="minimal"
)

Advanced Configuration

Filtering Columns

Include or exclude specific columns:
table = sql_table(
    credentials="postgresql://...",
    table="users",
    included_columns=["id", "name", "email", "created_at"],
)

# Or exclude columns
table = sql_table(
    credentials="postgresql://...",
    table="users",
    excluded_columns=["password_hash", "salt"],
)

Custom Query Adapter

Modify the SELECT query before execution:
from sqlalchemy import Select, Table

def query_adapter(select: Select, table: Table) -> Select:
    """Add WHERE clause to filter records"""
    return select.where(table.c.status == "active")

table = sql_table(
    credentials="postgresql://...",
    table="users",
    query_adapter_callback=query_adapter
)

Table Adapter

Modify the table schema before loading:
from sqlalchemy import Table

def table_adapter(table: Table) -> Table:
    """Customize which columns to select"""
    # Remove sensitive columns
    if "password" in table.columns:
        table._columns.remove(table.columns["password"])
    return table

source = sql_database(
    credentials="postgresql://...",
    table_adapter_callback=table_adapter
)

Write Disposition

Control how data is written to the destination:
# Replace entire table on each run
table = sql_table(
    credentials="postgresql://...",
    table="daily_summary",
    write_disposition="replace"
)

# Merge/upsert based on primary key
table = sql_table(
    credentials="postgresql://...",
    table="customers",
    write_disposition="merge",
    primary_key="customer_id"
)

# Append new records (default)
table = sql_table(
    credentials="postgresql://...",
    table="logs",
    write_disposition="append"
)

Complete Example: PostgreSQL to DuckDB

A comprehensive example loading data from PostgreSQL:
import dlt
from dlt.sources.sql_database import sql_database, sql_table

# Load specific tables with different configurations
@dlt.source
def postgres_source():
    """Load data from PostgreSQL with custom configurations"""

    # Full table replacement
    yield sql_table(
        credentials=dlt.secrets["postgres_credentials"],
        table="product_catalog",
        write_disposition="replace"
    )

    # Incremental loading for orders
    yield sql_table(
        credentials=dlt.secrets["postgres_credentials"],
        table="orders",
        incremental=dlt.sources.incremental(
            cursor_path="updated_at",
            initial_value="2024-01-01T00:00:00Z"
        ),
        write_disposition="merge",
        primary_key="order_id"
    )

    # Incremental loading for customers
    yield sql_table(
        credentials=dlt.secrets["postgres_credentials"],
        table="customers",
        incremental=dlt.sources.incremental(
            cursor_path="updated_at",
            initial_value="2024-01-01T00:00:00Z"
        ),
        write_disposition="merge",
        primary_key="customer_id",
        excluded_columns=["password_hash"]  # Exclude sensitive data
    )

# Create and run the pipeline
pipeline = dlt.pipeline(
    pipeline_name="postgres_to_duckdb",
    destination="duckdb",
    dataset_name="ecommerce"
)

load_info = pipeline.run(postgres_source())
print(load_info)

Supported Databases

The sql_database source supports any database with a SQLAlchemy driver:

PostgreSQL

"postgresql://user:pass@host:5432/db"

MySQL

"mysql+pymysql://user:pass@host:3306/db"

SQL Server

"mssql+pyodbc://user:pass@host/db?driver=ODBC+Driver+17+for+SQL+Server"

Oracle

"oracle+cx_oracle://user:pass@host:1521/db"

SQLite

"sqlite:///path/to/database.db"

Redshift

"redshift+psycopg2://user:pass@host:5439/db"
Make sure to install the appropriate database driver (e.g., psycopg2 for PostgreSQL, pymysql for MySQL).

Best Practices

Always use incremental loading for tables that grow over time:
table = sql_table(
    credentials="postgresql://...",
    table="events",
    incremental=dlt.sources.incremental(
        cursor_path="created_at",
        initial_value="2024-01-01T00:00:00Z"
    )
)
This avoids reprocessing all historical data on each run.
  • Use sqlalchemy for general purpose loading
  • Use pyarrow for better type preservation and performance
  • Use connectorx for maximum speed on large tables
  • Use pandas if you need DataFrame operations
Balance memory usage and performance with appropriate chunk sizes:
# Smaller chunks for memory-constrained environments
source = sql_database(credentials="...", chunk_size=5000)

# Larger chunks for better throughput
source = sql_database(credentials="...", chunk_size=50000)
Never hardcode credentials. Use secrets.toml or environment variables:
# secrets.toml
[sources.sql_database.credentials]
drivername = "postgresql"
username = "user"
password = "secure_password"
host = "localhost"
database = "mydb"

Troubleshooting

Ensure the connection string format is correct and the database is accessible:
# Test connection
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@host:5432/db")
with engine.connect() as conn:
    result = conn.execute("SELECT 1")
    print(result.fetchone())
Check that table names and schema are correct:
from sqlalchemy import MetaData
metadata = MetaData()
metadata.reflect(bind=engine)
print(metadata.tables.keys())  # List all available tables
Use reflection_level="minimal" to infer types from data instead of reflection:
source = sql_database(
    credentials="...",
    reflection_level="minimal"
)

Next Steps

Build docs developers (and LLMs) love