Overview
dlt is built around four core concepts that work together to move data from source to destination:
Pipelines Orchestrate the entire data loading process
Sources Define where and how to extract data
Resources Individual data endpoints that yield data
Destinations Where your data gets loaded
Understanding these concepts is essential to building effective data pipelines with dlt.
Pipelines
A pipeline is the main object that orchestrates the data loading process. It manages state, configuration, and the flow of data from source to destination.
Creating a Pipeline
import dlt
pipeline = dlt.pipeline(
pipeline_name = 'chess_pipeline' ,
destination = 'duckdb' ,
dataset_name = 'player_data'
)
Type Signature:
def pipeline (
pipeline_name : str = None ,
pipelines_dir : str = None ,
pipeline_salt : TSecretStrValue = None ,
destination : TDestinationReferenceArg = None ,
staging : TDestinationReferenceArg = None ,
dataset_name : str = None ,
import_schema_path : str = None ,
export_schema_path : str = None ,
dev_mode : bool = False ,
refresh : TRefreshMode = None ,
progress : TCollectorArg = _NULL_COLLECTOR ,
) -> Pipeline
Pipeline Parameters
A unique name for your pipeline. Used to identify the pipeline in monitoring and to restore state on subsequent runs. Defaults to the script filename with dlt_ prefix.
The destination where data will be loaded. Can be a string name (e.g., 'duckdb', 'postgres') or a destination module imported from dlt.destinations.
The name of the dataset (schema in relational databases). Defaults to pipeline_name if not specified.
When True, each run creates a fresh dataset with a datetime suffix. Useful during development to start clean on each run.
Optional staging destination for two-stage loading (e.g., staging to S3 before loading to Redshift).
Working directory for pipeline state and temp files. Defaults to ~/.dlt/pipelines/.
Running a Pipeline
The run method executes the pipeline:
info = pipeline.run(
data,
table_name = 'player' ,
write_disposition = 'append'
)
Returns: LoadInfo object with execution details:
print (info)
# Pipeline chess_pipeline load step completed in 0.89 seconds
# 1 load package(s) were loaded to destination duckdb
# Load package 1234567890 is LOADED and contains no failed jobs
Pipeline State
dlt automatically manages state across runs:
# Access pipeline state
state = pipeline.state
# State is persisted between runs
state[ 'last_run_time' ] = datetime.now()
Pipeline state is stored locally in the .dlt directory by default. This enables incremental loading and resuming from failures.
Sources
A source is a logical grouping of resources that represents a complete data extraction from an API, database, or other data provider.
Defining a Source
Use the @dlt.source decorator to create a source:
import dlt
from dlt.sources.helpers import requests
@dlt.source
def chess (
chess_url : str = dlt.config.value,
title : str = "GM" ,
max_players : int = 2 ,
year : int = 2022 ,
month : int = 10 ,
):
"""Chess.com data source"""
@dlt.resource ( write_disposition = "replace" )
def players ():
"""Fetch top players by title"""
r = requests.get( f " { chess_url } /titled/ { title } " )
r.raise_for_status()
for player in r.json()[ "players" ][:max_players]:
yield player
@dlt.transformer ( data_from = players, write_disposition = "replace" )
def players_profiles ( username : str ):
"""Fetch detailed profiles for each player"""
r = requests.get( f " { chess_url } /player/ { username } " )
r.raise_for_status()
return r.json()
return players(), players_profiles
Type Signature:
def source (
func : Optional[Callable] = None ,
name : str = None ,
section : str = None ,
max_table_nesting : int = None ,
root_key : bool = None ,
schema : Schema = None ,
schema_contract : TSchemaContract = None ,
spec : Type[BaseConfiguration] = None ,
) -> DltSource
Source Parameters
Source name. Defaults to the decorated function name.
Maximum depth for nested data normalization. Controls how many levels deep dlt will create separate tables for nested structures. @dlt.source ( max_table_nesting = 2 )
def my_source ():
# Nested data will create tables up to 2 levels deep
pass
Define how schema changes are handled:
"evolve" - Allow new tables and columns (default)
"freeze" - Reject any schema changes
"discard_row" - Discard rows with new columns
"discard_value" - Discard new column values only
Using Sources
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name = 'chess_games' ,
destination = 'postgres' ,
dataset_name = 'chess'
)
# Run with source
info = pipeline.run(chess( max_players = 5 , month = 9 ))
print (info)
Resources
A resource is a function that yields data items. Resources are the fundamental building blocks that produce data to be loaded.
Defining a Resource
@dlt.resource (
primary_key = "id" ,
write_disposition = "append" ,
columns = { "created_at" : { "data_type" : "timestamp" }}
)
def events ():
"""Yield event data"""
for event in fetch_events():
yield event
Type Signature:
def resource (
data : Optional[Any] = None ,
name : str = None ,
primary_key : TColumnNames = None ,
write_disposition : TWriteDisposition = None ,
columns : TAnySchemaColumns = None ,
schema : Schema = None ,
max_table_nesting : int = None ,
schema_contract : TSchemaContract = None ,
table_name : str = None ,
file_format : TLoaderFileFormat = None ,
selected : bool = True ,
) -> DltResource
Resource Parameters
Column(s) that uniquely identify each row. Used for deduplication and updates. @dlt.resource ( primary_key = "user_id" )
def users ():
yield { "user_id" : 1 , "name" : "Alice" }
# Composite primary key
@dlt.resource ( primary_key = [ "tenant_id" , "user_id" ])
def multi_tenant_users ():
yield { "tenant_id" : "org1" , "user_id" : 1 , "name" : "Alice" }
write_disposition
'append' | 'replace' | 'merge'
How data should be written to the destination:
"append" - Add new rows to existing data (default)
"replace" - Replace all existing data
"merge" - Update existing rows and insert new ones (requires primary_key)
@dlt.resource ( write_disposition = "replace" )
def full_snapshot ():
yield fetch_all_data()
@dlt.resource ( write_disposition = "merge" , primary_key = "id" )
def updates ():
yield fetch_changed_records()
Explicit column schema definitions. Useful for specifying data types, nullability, or hints. @dlt.resource ( columns = {
"id" : { "data_type" : "bigint" , "nullable" : False },
"created_at" : { "data_type" : "timestamp" },
"email" : { "data_type" : "text" , "unique" : True }
})
def users ():
yield { "id" : 1 , "created_at" : "2024-01-01" , "email" : "[email protected] " }
Resource Variants
Standalone Resource
@dlt.resource
def standalone_data ():
yield { "id" : 1 , "value" : "data" }
pipeline.run(standalone_data())
Transformers take data from other resources:
@dlt.resource
def users ():
yield from fetch_users()
@dlt.transformer ( data_from = users)
def user_emails ( user ):
# Process each user
return { "email" : user[ "email" ], "verified" : check_email(user[ "email" ])}
Incremental Loading
Load only new or changed data efficiently:
import dlt
from dlt.sources.helpers import requests
@dlt.resource (
primary_key = "id" ,
write_disposition = "append"
)
def ticket_events (
timestamp : dlt.sources.incremental[ int ] = dlt.sources.incremental(
"timestamp" ,
initial_value = 0
)
):
"""Load events incrementally based on timestamp"""
response = requests.get(
"https://api.example.com/events" ,
params = { "start_time" : timestamp.last_value}
)
yield response.json()[ "events" ]
Type Signature:
class incremental (Generic[TCursorValue]):
def __init__ (
self ,
cursor_path : str ,
initial_value : Optional[TCursorValue] = None ,
end_value : Optional[TCursorValue] = None ,
allow_external_schedulers : bool = False ,
row_order : TSortOrder = None ,
)
Incremental Parameters
JSONPath to the field used as the incremental cursor (e.g., "timestamp", "updated_at").
Starting value for the first run. On subsequent runs, dlt uses the last seen value.
Optional end value for backfilling. When set, loading stops when this value is reached.
allow_external_schedulers
Allow external schedulers (like Airflow) to control incremental state.
dlt tracks incremental state automatically. The state is stored in the pipeline’s local storage and persists across runs.
Destinations
Destinations are where your data gets loaded. dlt supports 20+ destinations out of the box.
Configuring Destinations
In code:
pipeline = dlt.pipeline(
destination = 'postgres' ,
dataset_name = 'my_data'
)
Using destination factories:
from dlt.destinations import postgres, bigquery
# PostgreSQL with custom config
pipeline = dlt.pipeline(
destination = postgres(
credentials = "postgresql://user:pass@localhost:5432/db"
),
dataset_name = 'my_data'
)
# BigQuery
pipeline = dlt.pipeline(
destination = bigquery(
location = "US"
),
dataset_name = 'my_data'
)
Destination Configuration Files
Store credentials in .dlt/secrets.toml:
# PostgreSQL
[ destination . postgres . credentials ]
host = "localhost"
port = 5432
username = "myuser"
password = "mypassword"
database = "mydatabase"
# BigQuery
[ destination . bigquery . credentials ]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY----- \n ..."
client_email = "[email protected] "
Available Destinations
DuckDB Local analytics database
PostgreSQL Popular relational database
BigQuery Google Cloud data warehouse
Snowflake Cloud data platform
Redshift Amazon data warehouse
Databricks Lakehouse platform
MotherDuck Serverless DuckDB
Filesystem S3, GCS, Azure Blob
Schema Management
dlt automatically infers and manages schemas:
from dlt.common.schema import Schema
# Access pipeline schema
schema = pipeline.default_schema
# Get table schema
table_schema = schema.get_table( "users" )
print (table_schema[ "columns" ])
# Export schema
pipeline.default_schema.to_pretty_yaml( "schema.yaml" )
Schema Evolution
By default, dlt evolves schemas automatically:
# First run: creates table with columns: id, name
pipeline.run([{ "id" : 1 , "name" : "Alice" }], table_name = "users" )
# Second run: automatically adds 'email' column
pipeline.run([{ "id" : 2 , "name" : "Bob" , "email" : "[email protected] " }], table_name = "users" )
Schema Contracts
Control schema evolution behavior:
@dlt.source ( schema_contract = { "tables" : "freeze" , "columns" : "evolve" })
def strict_source ():
# New tables rejected, new columns allowed
pass
@dlt.source ( schema_contract = "freeze" )
def frozen_source ():
# No schema changes allowed
pass
Data Types
dlt supports rich type inference:
from dlt.common.schema import TColumnSchema
# Automatic type inference
data = [
{
"id" : 1 , # -> bigint
"name" : "Alice" , # -> text
"balance" : 123.45 , # -> double
"is_active" : True , # -> bool
"created" : "2024-01-01T10:00:00Z" , # -> timestamp
"tags" : [ "python" , "data" ], # -> json (or array)
"metadata" : { "key" : "value" } # -> json
}
]
pipeline.run(data, table_name = "users" )
Explicit Type Hints
@dlt.resource (
columns = {
"id" : { "data_type" : "bigint" , "nullable" : False },
"created_at" : { "data_type" : "timestamp" },
"amount" : { "data_type" : "decimal" , "precision" : 10 , "scale" : 2 }
}
)
def typed_data ():
yield { "id" : 1 , "created_at" : "2024-01-01" , "amount" : "123.45" }
Error Handling
dlt provides detailed error information:
try :
info = pipeline.run(source())
except Exception as e:
print ( f "Pipeline failed: { e } " )
# Access detailed trace
trace = pipeline.last_trace
print (trace.last_normalize_info)
print (trace.last_load_info)
Next Steps
Now that you understand the core concepts:
Incremental Loading Master efficient incremental patterns
Schema Evolution Control how schemas change over time
Destinations Explore all supported destinations
Examples Browse real-world code examples