Skip to main content
Load data from any SQL database supported by SQLAlchemy. The sql_database source automatically reflects table schemas, handles incremental loading, and supports multiple backends.

Quick Start

Load specific tables from a MySQL database:
import dlt
from dlt.sources.sql_database import sql_database
from dlt.sources.credentials import ConnectionStringCredentials

pipeline = dlt.pipeline(
    pipeline_name="rfam",
    destination="duckdb",
    dataset_name="rfam_data",
)

credentials = ConnectionStringCredentials(
    "mysql+pymysql://[email protected]:4497/Rfam"
)

# Load specific tables
source = sql_database(credentials).with_resources("family", "clan")

load_info = pipeline.run(source, write_disposition="replace")
print(load_info)

Loading Strategies

1

Select Specific Tables

Choose which tables to load:
# Load only specific tables
source = sql_database(credentials).with_resources(
    "family",
    "clan",
    "genome"
)

pipeline.run(source)
2

Load Entire Database

Reflect and load all tables:
# Load all tables from the database
source = sql_database(credentials)

pipeline.run(source, write_disposition="replace")
For large databases, this may take a while. Consider loading specific tables first.
3

Configure Incremental Loading

Add incremental loading to track changes:
source = sql_database(credentials).with_resources("family", "clan")

# Add incremental config
source.family.apply_hints(
    incremental=dlt.sources.incremental("updated")
)
source.clan.apply_hints(
    incremental=dlt.sources.incremental("updated")
)

# Merge updates into existing data
pipeline.run(source, write_disposition="merge")

Using sql_table for Single Tables

Load individual tables with fine-grained control:
from dlt.sources.sql_database import sql_table

pipeline = dlt.pipeline(
    pipeline_name="rfam_database",
    destination="duckdb",
    dataset_name="rfam_data",
)

# Load a single table with incremental loading
family = sql_table(
    credentials=ConnectionStringCredentials(
        "mysql+pymysql://[email protected]:4497/Rfam"
    ),
    table="family",
    incremental=dlt.sources.incremental("updated"),
    reflection_level="full_with_precision",
    defer_table_reflect=True,
)

# Load another table
genome = sql_table(
    credentials="mysql+pymysql://[email protected]:4497/Rfam",
    table="genome",
    reflection_level="full_with_precision",
)

# Load both tables together
pipeline.run([family, genome], write_disposition="merge")

Incremental Loading Patterns

# For tables with updates to existing rows
source = sql_database(credentials).with_resources("family", "clan")

source.family.apply_hints(
    incremental=dlt.sources.incremental("updated")
)
source.clan.apply_hints(
    incremental=dlt.sources.incremental("updated")
)

pipeline.run(source, write_disposition="merge")

Date Range Loading

Load data within a specific date range:
from dlt.common import pendulum

start_date = pendulum.now().subtract(years=1)
end_date = pendulum.now()

family = sql_table(
    credentials="mysql+pymysql://[email protected]:4497/Rfam",
    table="family",
    incremental=dlt.sources.incremental(
        "updated",
        initial_value=start_date,
        end_value=end_date,
        row_order="desc",  # Rows are ordered descending
    ),
    chunk_size=10,
)

# Load only one chunk (10 records)
pipeline.run(family.add_limit(1))

Backend Options

PyArrow Backend (Fast)

Use PyArrow for better performance:
import sqlalchemy as sa

def _double_as_decimal_adapter(table: sa.Table) -> sa.Table:
    """Customize type handling"""
    for column in table.columns.values():
        if hasattr(sa, "Double") and isinstance(column.type, sa.Double):
            column.type.asdecimal = False
    return table

source = sql_database(
    "mysql+pymysql://[email protected]:4497/Rfam",
    backend="pyarrow",
    table_adapter_callback=_double_as_decimal_adapter,
).with_resources("family", "genome")

pipeline.run(source)

Pandas Backend

Use Pandas for specific data processing needs:
source = sql_database(
    "mysql+pymysql://[email protected]:4497/Rfam",
    backend="pandas",
    chunk_size=100000,
    backend_kwargs={
        "coerce_float": False,
        "dtype_backend": "numpy_nullable"
    },
    reflection_level="full_with_precision",
).with_resources("family", "genome")

pipeline.run(source)

ConnectorX Backend (Ultra Fast)

For maximum performance on large datasets:
unsw_table = sql_table(
    "postgresql://loader:loader@localhost:5432/dlt_data",
    "unsw_flow_7",
    "speed_test",
    backend="connectorx",
    reflection_level="full_with_precision",
    backend_kwargs={
        "conn": "postgresql://loader:loader@localhost:5432/dlt_data"
    },
)

pipeline = dlt.pipeline(
    pipeline_name="unsw_download",
    destination="filesystem",
    progress="log",
)

pipeline.run(
    unsw_table,
    dataset_name="speed_test",
    loader_file_format="parquet",
)

Column Selection

Load only specific columns from tables:
import os

# Specify columns via environment variable
os.environ["SOURCES__SQL_DATABASE__FAMILY__INCLUDED_COLUMNS"] = '["rfam_acc", "description"]'

source = sql_database(
    "mysql+pymysql://[email protected]:4497/Rfam",
    backend="pyarrow",
    reflection_level="full_with_precision",
).with_resources("family", "genome")

pipeline.run(source)
Or in .dlt/config.toml:
[sources.sql_database.family]
included_columns = ["rfam_acc", "description"]

Type Adapters

Customize how SQL types are converted:
import sqlalchemy as sa

def type_adapter(sql_type):
    """Convert SQL arrays to JSON"""
    if isinstance(sql_type, sa.ARRAY):
        return sa.JSON()
    return sql_type

source = sql_database(
    "postgresql://loader:loader@localhost:5432/dlt_data",
    backend="pyarrow",
    type_adapter_callback=type_adapter,
    reflection_level="full_with_precision",
).with_resources("table_with_array_column")

pipeline.run(source)

Complete Example: Mixed Load

import dlt
from dlt.sources.sql_database import sql_database
from dlt.sources.credentials import ConnectionStringCredentials

def load_database():
    pipeline = dlt.pipeline(
        pipeline_name="rfam",
        destination="duckdb",
        dataset_name="rfam_data"
    )
    
    credentials = ConnectionStringCredentials(
        "mysql+pymysql://[email protected]:4497/Rfam"
    )
    
    # Load incrementally with merge
    source_1 = sql_database(credentials).with_resources("family", "clan")
    source_1.family.apply_hints(incremental=dlt.sources.incremental("updated"))
    source_1.clan.apply_hints(incremental=dlt.sources.incremental("updated"))
    pipeline.run(source_1, write_disposition="merge")
    
    # Load with replace
    source_2 = sql_database(credentials).with_resources("features", "author")
    pipeline.run(source_2, write_disposition="replace")
    
    # Load append-only table
    source_3 = sql_database(credentials).with_resources("genome")
    source_3.genome.apply_hints(incremental=dlt.sources.incremental("created"))
    pipeline.run(source_3, write_disposition="append")

if __name__ == "__main__":
    load_database()

Next Steps

Incremental Loading

Configure incremental loading strategies

Schema Evolution

Handle schema changes over time

Build docs developers (and LLMs) love