Skip to main content

Overview

dlt is built around four core concepts that work together to move data from source to destination:

Pipelines

Orchestrate the entire data loading process

Sources

Define where and how to extract data

Resources

Individual data endpoints that yield data

Destinations

Where your data gets loaded
Understanding these concepts is essential to building effective data pipelines with dlt.

Pipelines

A pipeline is the main object that orchestrates the data loading process. It manages state, configuration, and the flow of data from source to destination.

Creating a Pipeline

import dlt

pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)
Type Signature:
def pipeline(
    pipeline_name: str = None,
    pipelines_dir: str = None,
    pipeline_salt: TSecretStrValue = None,
    destination: TDestinationReferenceArg = None,
    staging: TDestinationReferenceArg = None,
    dataset_name: str = None,
    import_schema_path: str = None,
    export_schema_path: str = None,
    dev_mode: bool = False,
    refresh: TRefreshMode = None,
    progress: TCollectorArg = _NULL_COLLECTOR,
) -> Pipeline

Pipeline Parameters

pipeline_name
str
required
A unique name for your pipeline. Used to identify the pipeline in monitoring and to restore state on subsequent runs. Defaults to the script filename with dlt_ prefix.
destination
str | Destination
The destination where data will be loaded. Can be a string name (e.g., 'duckdb', 'postgres') or a destination module imported from dlt.destinations.
dataset_name
str
The name of the dataset (schema in relational databases). Defaults to pipeline_name if not specified.
dev_mode
bool
default:"False"
When True, each run creates a fresh dataset with a datetime suffix. Useful during development to start clean on each run.
staging
str | Destination
Optional staging destination for two-stage loading (e.g., staging to S3 before loading to Redshift).
pipelines_dir
str
Working directory for pipeline state and temp files. Defaults to ~/.dlt/pipelines/.

Running a Pipeline

The run method executes the pipeline:
info = pipeline.run(
    data,
    table_name='player',
    write_disposition='append'
)
Returns: LoadInfo object with execution details:
print(info)
# Pipeline chess_pipeline load step completed in 0.89 seconds
# 1 load package(s) were loaded to destination duckdb
# Load package 1234567890 is LOADED and contains no failed jobs

Pipeline State

dlt automatically manages state across runs:
# Access pipeline state
state = pipeline.state

# State is persisted between runs
state['last_run_time'] = datetime.now()
Pipeline state is stored locally in the .dlt directory by default. This enables incremental loading and resuming from failures.

Sources

A source is a logical grouping of resources that represents a complete data extraction from an API, database, or other data provider.

Defining a Source

Use the @dlt.source decorator to create a source:
import dlt
from dlt.sources.helpers import requests

@dlt.source
def chess(
    chess_url: str = dlt.config.value,
    title: str = "GM",
    max_players: int = 2,
    year: int = 2022,
    month: int = 10,
):
    """Chess.com data source"""
    
    @dlt.resource(write_disposition="replace")
    def players():
        """Fetch top players by title"""
        r = requests.get(f"{chess_url}/titled/{title}")
        r.raise_for_status()
        for player in r.json()["players"][:max_players]:
            yield player
    
    @dlt.transformer(data_from=players, write_disposition="replace")
    def players_profiles(username: str):
        """Fetch detailed profiles for each player"""
        r = requests.get(f"{chess_url}/player/{username}")
        r.raise_for_status()
        return r.json()
    
    return players(), players_profiles
Type Signature:
def source(
    func: Optional[Callable] = None,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = None,
    schema: Schema = None,
    schema_contract: TSchemaContract = None,
    spec: Type[BaseConfiguration] = None,
) -> DltSource

Source Parameters

name
str
Source name. Defaults to the decorated function name.
max_table_nesting
int
Maximum depth for nested data normalization. Controls how many levels deep dlt will create separate tables for nested structures.
@dlt.source(max_table_nesting=2)
def my_source():
    # Nested data will create tables up to 2 levels deep
    pass
schema_contract
TSchemaContract
Define how schema changes are handled:
  • "evolve" - Allow new tables and columns (default)
  • "freeze" - Reject any schema changes
  • "discard_row" - Discard rows with new columns
  • "discard_value" - Discard new column values only

Using Sources

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name='chess_games',
    destination='postgres',
    dataset_name='chess'
)

# Run with source
info = pipeline.run(chess(max_players=5, month=9))
print(info)

Resources

A resource is a function that yields data items. Resources are the fundamental building blocks that produce data to be loaded.

Defining a Resource

@dlt.resource(
    primary_key="id",
    write_disposition="append",
    columns={"created_at": {"data_type": "timestamp"}}
)
def events():
    """Yield event data"""
    for event in fetch_events():
        yield event
Type Signature:
def resource(
    data: Optional[Any] = None,
    name: str = None,
    primary_key: TColumnNames = None,
    write_disposition: TWriteDisposition = None,
    columns: TAnySchemaColumns = None,
    schema: Schema = None,
    max_table_nesting: int = None,
    schema_contract: TSchemaContract = None,
    table_name: str = None,
    file_format: TLoaderFileFormat = None,
    selected: bool = True,
) -> DltResource

Resource Parameters

primary_key
str | list[str]
Column(s) that uniquely identify each row. Used for deduplication and updates.
@dlt.resource(primary_key="user_id")
def users():
    yield {"user_id": 1, "name": "Alice"}

# Composite primary key
@dlt.resource(primary_key=["tenant_id", "user_id"])
def multi_tenant_users():
    yield {"tenant_id": "org1", "user_id": 1, "name": "Alice"}
write_disposition
'append' | 'replace' | 'merge'
How data should be written to the destination:
  • "append" - Add new rows to existing data (default)
  • "replace" - Replace all existing data
  • "merge" - Update existing rows and insert new ones (requires primary_key)
@dlt.resource(write_disposition="replace")
def full_snapshot():
    yield fetch_all_data()

@dlt.resource(write_disposition="merge", primary_key="id")
def updates():
    yield fetch_changed_records()
columns
dict
Explicit column schema definitions. Useful for specifying data types, nullability, or hints.
@dlt.resource(columns={
    "id": {"data_type": "bigint", "nullable": False},
    "created_at": {"data_type": "timestamp"},
    "email": {"data_type": "text", "unique": True}
})
def users():
    yield {"id": 1, "created_at": "2024-01-01", "email": "[email protected]"}

Resource Variants

Standalone Resource

@dlt.resource
def standalone_data():
    yield {"id": 1, "value": "data"}

pipeline.run(standalone_data())

Transformer Resource

Transformers take data from other resources:
@dlt.resource
def users():
    yield from fetch_users()

@dlt.transformer(data_from=users)
def user_emails(user):
    # Process each user
    return {"email": user["email"], "verified": check_email(user["email"])}

Incremental Loading

Load only new or changed data efficiently:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(
    primary_key="id",
    write_disposition="append"
)
def ticket_events(
    timestamp: dlt.sources.incremental[int] = dlt.sources.incremental(
        "timestamp",
        initial_value=0
    )
):
    """Load events incrementally based on timestamp"""
    response = requests.get(
        "https://api.example.com/events",
        params={"start_time": timestamp.last_value}
    )
    yield response.json()["events"]
Type Signature:
class incremental(Generic[TCursorValue]):
    def __init__(
        self,
        cursor_path: str,
        initial_value: Optional[TCursorValue] = None,
        end_value: Optional[TCursorValue] = None,
        allow_external_schedulers: bool = False,
        row_order: TSortOrder = None,
    )

Incremental Parameters

cursor_path
str
required
JSONPath to the field used as the incremental cursor (e.g., "timestamp", "updated_at").
initial_value
any
Starting value for the first run. On subsequent runs, dlt uses the last seen value.
end_value
any
Optional end value for backfilling. When set, loading stops when this value is reached.
allow_external_schedulers
bool
default:"False"
Allow external schedulers (like Airflow) to control incremental state.
dlt tracks incremental state automatically. The state is stored in the pipeline’s local storage and persists across runs.

Destinations

Destinations are where your data gets loaded. dlt supports 20+ destinations out of the box.

Configuring Destinations

In code:
pipeline = dlt.pipeline(
    destination='postgres',
    dataset_name='my_data'
)
Using destination factories:
from dlt.destinations import postgres, bigquery

# PostgreSQL with custom config
pipeline = dlt.pipeline(
    destination=postgres(
        credentials="postgresql://user:pass@localhost:5432/db"
    ),
    dataset_name='my_data'
)

# BigQuery
pipeline = dlt.pipeline(
    destination=bigquery(
        location="US"
    ),
    dataset_name='my_data'
)

Destination Configuration Files

Store credentials in .dlt/secrets.toml:
.dlt/secrets.toml
# PostgreSQL
[destination.postgres.credentials]
host = "localhost"
port = 5432
username = "myuser"
password = "mypassword"
database = "mydatabase"

# BigQuery
[destination.bigquery.credentials]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "[email protected]"

Available Destinations

DuckDB

Local analytics database

PostgreSQL

Popular relational database

BigQuery

Google Cloud data warehouse

Snowflake

Cloud data platform

Redshift

Amazon data warehouse

Databricks

Lakehouse platform

MotherDuck

Serverless DuckDB

Filesystem

S3, GCS, Azure Blob

Qdrant

Vector database

Schema Management

dlt automatically infers and manages schemas:
from dlt.common.schema import Schema

# Access pipeline schema
schema = pipeline.default_schema

# Get table schema
table_schema = schema.get_table("users")
print(table_schema["columns"])

# Export schema
pipeline.default_schema.to_pretty_yaml("schema.yaml")

Schema Evolution

By default, dlt evolves schemas automatically:
# First run: creates table with columns: id, name
pipeline.run([{"id": 1, "name": "Alice"}], table_name="users")

# Second run: automatically adds 'email' column
pipeline.run([{"id": 2, "name": "Bob", "email": "[email protected]"}], table_name="users")

Schema Contracts

Control schema evolution behavior:
@dlt.source(schema_contract={"tables": "freeze", "columns": "evolve"})
def strict_source():
    # New tables rejected, new columns allowed
    pass

@dlt.source(schema_contract="freeze")
def frozen_source():
    # No schema changes allowed
    pass

Data Types

dlt supports rich type inference:
from dlt.common.schema import TColumnSchema

# Automatic type inference
data = [
    {
        "id": 1,                           # -> bigint
        "name": "Alice",                   # -> text
        "balance": 123.45,                 # -> double
        "is_active": True,                 # -> bool
        "created": "2024-01-01T10:00:00Z", # -> timestamp
        "tags": ["python", "data"],        # -> json (or array)
        "metadata": {"key": "value"}       # -> json
    }
]

pipeline.run(data, table_name="users")

Explicit Type Hints

@dlt.resource(
    columns={
        "id": {"data_type": "bigint", "nullable": False},
        "created_at": {"data_type": "timestamp"},
        "amount": {"data_type": "decimal", "precision": 10, "scale": 2}
    }
)
def typed_data():
    yield {"id": 1, "created_at": "2024-01-01", "amount": "123.45"}

Error Handling

dlt provides detailed error information:
try:
    info = pipeline.run(source())
except Exception as e:
    print(f"Pipeline failed: {e}")
    # Access detailed trace
    trace = pipeline.last_trace
    print(trace.last_normalize_info)
    print(trace.last_load_info)

Next Steps

Now that you understand the core concepts:

Incremental Loading

Master efficient incremental patterns

Schema Evolution

Control how schemas change over time

Destinations

Explore all supported destinations

Examples

Browse real-world code examples

Build docs developers (and LLMs) love