SQL Database Source
The sql_database source loads data from SQL databases using SQLAlchemy. It supports any database with a SQLAlchemy driver, including PostgreSQL, MySQL, SQL Server, Oracle, and SQLite.
Quick Start
Load all tables from a PostgreSQL database:
import dlt
from dlt.sources.sql_database import sql_database
# Create the source
source = sql_database(
credentials = "postgresql://user:password@localhost:5432/mydb" ,
)
# Load all tables
pipeline = dlt.pipeline(
pipeline_name = "postgres_pipeline" ,
destination = "duckdb" ,
dataset_name = "postgres_data"
)
load_info = pipeline.run(source)
print (load_info)
Connection Methods
Connect to your database using different credential formats:
Connection String
Credentials Object
From Configuration
SQLAlchemy Engine
source = sql_database(
credentials = "postgresql://user:password@localhost:5432/mydb"
)
from dlt.sources.credentials import ConnectionStringCredentials
credentials = ConnectionStringCredentials(
"postgresql://user:password@localhost:5432/mydb"
)
source = sql_database( credentials = credentials)
# In secrets.toml
# [sources.sql_database.credentials]
# drivername = "postgresql"
# username = "user"
# password = "password"
# host = "localhost"
# port = 5432
# database = "mydb"
source = sql_database() # Reads from configuration
from sqlalchemy import create_engine
engine = create_engine( "postgresql://user:password@localhost:5432/mydb" )
source = sql_database( credentials = engine)
Selecting Tables
Control which tables to load:
All Tables
Specific Tables
Specific Schema
Include Views
# Load all tables in the default schema
source = sql_database(
credentials = "postgresql://user:password@localhost:5432/mydb"
)
Loading Individual Tables
Use sql_table to load a single table with more control:
from dlt.sources.sql_database import sql_table
# Load a single table
table = sql_table(
credentials = "postgresql://user:password@localhost:5432/mydb" ,
table = "customers" ,
schema = "public"
)
pipeline.run(table)
Incremental Loading
Load only new or updated records using incremental cursors:
Define the cursor column
Choose a column that indicates when records were created or updated: from dlt.sources.sql_database import sql_table
orders = sql_table(
credentials = "postgresql://user:password@localhost:5432/mydb" ,
table = "orders" ,
incremental = dlt.sources.incremental(
cursor_path = "updated_at" ,
initial_value = "2024-01-01T00:00:00Z"
)
)
Run the pipeline
On first run, loads all records where updated_at >= initial_value:
Subsequent runs
On subsequent runs, only loads records updated since the last run: # Only loads new/updated records
pipeline.run(orders)
The incremental state is automatically persisted between pipeline runs. dlt tracks the maximum cursor value and uses it as the starting point for the next run.
Backend Options
Choose different backends for data extraction:
SQLAlchemy (Default)
PyArrow
Pandas
ConnectorX
# Returns data as Python dictionaries
source = sql_database(
credentials = "postgresql://..." ,
backend = "sqlalchemy" ,
chunk_size = 10000
)
Returns Python dictionaries
No additional dependencies
Good for general use
# Returns data as Arrow tables
source = sql_database(
credentials = "postgresql://..." ,
backend = "pyarrow" ,
chunk_size = 10000
)
Returns Arrow tables
Better type preservation
Faster for large datasets
Requires pip install pyarrow
# Returns data as Pandas DataFrames
source = sql_database(
credentials = "postgresql://..." ,
backend = "pandas" ,
chunk_size = 10000
)
Returns Pandas DataFrames
Useful if you need DataFrame operations
Requires pip install pandas
# Fastest extraction using ConnectorX
source = sql_database(
credentials = "postgresql://..." ,
backend = "connectorx"
)
Fastest extraction speed
Ignores chunk_size (loads full table)
Best for large tables
Requires pip install connectorx
Schema Reflection
Control how much schema information is reflected from the source database:
Minimal Reflection
Full Reflection (Default)
Full with Precision
# Only table names, nullability, and primary keys
# Data types inferred from data
source = sql_database(
credentials = "postgresql://..." ,
reflection_level = "minimal"
)
Advanced Configuration
Filtering Columns
Include or exclude specific columns:
table = sql_table(
credentials = "postgresql://..." ,
table = "users" ,
included_columns = [ "id" , "name" , "email" , "created_at" ],
)
# Or exclude columns
table = sql_table(
credentials = "postgresql://..." ,
table = "users" ,
excluded_columns = [ "password_hash" , "salt" ],
)
Custom Query Adapter
Modify the SELECT query before execution:
from sqlalchemy import Select, Table
def query_adapter ( select : Select, table : Table) -> Select:
"""Add WHERE clause to filter records"""
return select.where(table.c.status == "active" )
table = sql_table(
credentials = "postgresql://..." ,
table = "users" ,
query_adapter_callback = query_adapter
)
Table Adapter
Modify the table schema before loading:
from sqlalchemy import Table
def table_adapter ( table : Table) -> Table:
"""Customize which columns to select"""
# Remove sensitive columns
if "password" in table.columns:
table._columns.remove(table.columns[ "password" ])
return table
source = sql_database(
credentials = "postgresql://..." ,
table_adapter_callback = table_adapter
)
Write Disposition
Control how data is written to the destination:
# Replace entire table on each run
table = sql_table(
credentials = "postgresql://..." ,
table = "daily_summary" ,
write_disposition = "replace"
)
# Merge/upsert based on primary key
table = sql_table(
credentials = "postgresql://..." ,
table = "customers" ,
write_disposition = "merge" ,
primary_key = "customer_id"
)
# Append new records (default)
table = sql_table(
credentials = "postgresql://..." ,
table = "logs" ,
write_disposition = "append"
)
Complete Example: PostgreSQL to DuckDB
A comprehensive example loading data from PostgreSQL:
import dlt
from dlt.sources.sql_database import sql_database, sql_table
# Load specific tables with different configurations
@dlt.source
def postgres_source ():
"""Load data from PostgreSQL with custom configurations"""
# Full table replacement
yield sql_table(
credentials = dlt.secrets[ "postgres_credentials" ],
table = "product_catalog" ,
write_disposition = "replace"
)
# Incremental loading for orders
yield sql_table(
credentials = dlt.secrets[ "postgres_credentials" ],
table = "orders" ,
incremental = dlt.sources.incremental(
cursor_path = "updated_at" ,
initial_value = "2024-01-01T00:00:00Z"
),
write_disposition = "merge" ,
primary_key = "order_id"
)
# Incremental loading for customers
yield sql_table(
credentials = dlt.secrets[ "postgres_credentials" ],
table = "customers" ,
incremental = dlt.sources.incremental(
cursor_path = "updated_at" ,
initial_value = "2024-01-01T00:00:00Z"
),
write_disposition = "merge" ,
primary_key = "customer_id" ,
excluded_columns = [ "password_hash" ] # Exclude sensitive data
)
# Create and run the pipeline
pipeline = dlt.pipeline(
pipeline_name = "postgres_to_duckdb" ,
destination = "duckdb" ,
dataset_name = "ecommerce"
)
load_info = pipeline.run(postgres_source())
print (load_info)
Supported Databases
The sql_database source supports any database with a SQLAlchemy driver:
PostgreSQL "postgresql://user:pass@host:5432/db"
MySQL "mysql+pymysql://user:pass@host:3306/db"
SQL Server "mssql+pyodbc://user:pass@host/db?driver=ODBC+Driver+17+for+SQL+Server"
Oracle "oracle+cx_oracle://user:pass@host:1521/db"
SQLite "sqlite:///path/to/database.db"
Redshift "redshift+psycopg2://user:pass@host:5439/db"
Make sure to install the appropriate database driver (e.g., psycopg2 for PostgreSQL, pymysql for MySQL).
Best Practices
Use incremental loading for large tables
Always use incremental loading for tables that grow over time: table = sql_table(
credentials = "postgresql://..." ,
table = "events" ,
incremental = dlt.sources.incremental(
cursor_path = "created_at" ,
initial_value = "2024-01-01T00:00:00Z"
)
)
This avoids reprocessing all historical data on each run.
Use sqlalchemy for general purpose loading
Use pyarrow for better type preservation and performance
Use connectorx for maximum speed on large tables
Use pandas if you need DataFrame operations
Adjust chunk size for performance
Secure database credentials
Never hardcode credentials. Use secrets.toml or environment variables: # secrets.toml
[ sources . sql_database . credentials ]
drivername = "postgresql"
username = "user"
password = "secure_password"
host = "localhost"
database = "mydb"
Troubleshooting
Ensure the connection string format is correct and the database is accessible: # Test connection
from sqlalchemy import create_engine
engine = create_engine( "postgresql://user:pass@host:5432/db" )
with engine.connect() as conn:
result = conn.execute( "SELECT 1" )
print (result.fetchone())
Check that table names and schema are correct: from sqlalchemy import MetaData
metadata = MetaData()
metadata.reflect( bind = engine)
print (metadata.tables.keys()) # List all available tables
Use reflection_level="minimal" to infer types from data instead of reflection: source = sql_database(
credentials = "..." ,
reflection_level = "minimal"
)
Next Steps