Skip to main content
A pipeline is the main object in dlt that orchestrates the entire data loading process. It connects your data source to a destination, manages schemas, handles state, and executes the extract-normalize-load workflow.

What is a Pipeline?

A pipeline in dlt is responsible for:
  • Extracting data from sources
  • Normalizing data into a structured format
  • Loading data into destinations
  • Managing schemas that describe your data structure
  • Persisting state across runs for incremental loading
  • Tracking metadata about load operations
Think of a pipeline as the control center that coordinates all data movement from API to database.

Creating a Pipeline

Create a pipeline using the dlt.pipeline() function:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="chess_pipeline",
    destination="duckdb",
    dataset_name="player_data"
)

Key Parameters

pipeline_name (str): Identifies the pipeline in monitoring and state storage. Defaults to the script filename with dlt_ prefix.
destination (str | Destination): Where data will be loaded (e.g., “duckdb”, “bigquery”, “postgres”).
dataset_name (str): Logical grouping of tables (schema in SQL databases). Defaults to pipeline_name.
pipelines_dir (str): Working directory for pipeline state and temp files. Defaults to ~/.dlt/pipelines/.

Running a Pipeline

Simple Run

The most common way to use a pipeline is with the run() method:
import dlt
from dlt.sources.helpers import requests

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name="chess_pipeline",
    destination="duckdb",
    dataset_name="player_data"
)

# Fetch data
data = []
for player in ["magnuscarlsen", "rpragchess"]:
    response = requests.get(f"https://api.chess.com/pub/player/{player}")
    data.append(response.json())

# Extract, normalize, and load
load_info = pipeline.run(data, table_name="player")
print(load_info)

Using with Sources and Resources

import dlt

@dlt.resource(write_disposition="replace")
def github_issues(api_secret_key: str = dlt.secrets.value):
    from dlt.sources.helpers.rest_client import paginate
    from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
    
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
    
    for page in paginate(url, auth=auth, params={"state": "open"}):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="github_data"
)

load_info = pipeline.run(github_issues())

Pipeline Methods

run()

Executes the complete extract-normalize-load workflow:
load_info = pipeline.run(
    data,                          # Source data, resource, or list of resources
    destination="duckdb",          # Override pipeline destination
    dataset_name="my_dataset",     # Override dataset name
    table_name="my_table",         # Table name for raw data
    write_disposition="append",    # How to write: append, replace, merge
    schema=my_schema,              # Custom schema
    loader_file_format="parquet"   # File format: jsonl, parquet, etc.
)
Returns: LoadInfo object with details about loaded packages and any failed jobs.

extract()

Extract data without normalizing or loading:
extract_info = pipeline.extract(github_issues())

normalize()

Normalize previously extracted data:
normalize_info = pipeline.normalize()

load()

Load previously normalized data:
load_info = pipeline.load()
Separate extract-normalize-load steps give you granular control, but run() is recommended for most use cases.

Advanced Features

Development Mode

Start fresh on each run with timestamped datasets:
pipeline = dlt.pipeline(
    pipeline_name="chess_pipeline",
    destination="duckdb",
    dataset_name="player_data",
    dev_mode=True  # Creates dataset_name_YYYYMMDD_HHMMSS
)

Refresh Mode

Reset data or state during pipeline runs:
# Drop all source data and state
load_info = pipeline.run(my_source(), refresh="drop_sources")

# Drop specific resource data and state
load_info = pipeline.run(my_source(), refresh="drop_resources")

# Wipe data but keep schema
load_info = pipeline.run(my_source(), refresh="drop_data")

Attach to Existing Pipeline

Reconnect to a pipeline created in a previous run:
pipeline = dlt.attach(
    pipeline_name="chess_pipeline",
    pipelines_dir="/path/to/pipelines"
)

Get Current Pipeline

Access the most recently created or attached pipeline:
# Create a pipeline
pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb"
)

# Later, get reference to it
pipeline = dlt.pipeline()  # No arguments returns current pipeline

Shorthand: dlt.run()

For quick one-off loads, use dlt.run() without creating an explicit pipeline:
import dlt

# Creates a pipeline automatically
load_info = dlt.run(
    [1, 2, 3],
    destination="duckdb",
    dataset_name="numbers",
    table_name="my_numbers"
)

Pipeline State and Metadata

Access pipeline properties:
print(pipeline.pipeline_name)      # Pipeline name
print(pipeline.dataset_name)       # Current dataset
print(pipeline.default_schema_name) # Default schema name
print(pipeline.working_dir)        # Pipeline working directory
print(pipeline.first_run)          # True if never run before

Type Signature

From /home/daytona/workspace/source/dlt/pipeline/__init__.py:32-46:
def pipeline(
    pipeline_name: str = None,
    pipelines_dir: str = None,
    pipeline_salt: TSecretStrValue = None,
    destination: TDestinationReferenceArg = None,
    staging: TDestinationReferenceArg = None,
    dataset_name: str = None,
    import_schema_path: str = None,
    export_schema_path: str = None,
    full_refresh: bool = None,
    dev_mode: bool = False,
    refresh: TRefreshMode = None,
    progress: TCollectorArg = _NULL_COLLECTOR,
    _impl_cls: Type[TPipeline] = Pipeline,
) -> TPipeline

Best Practices

1

Use descriptive pipeline names

Choose names that reflect the data source and purpose: github_issues_pipeline, stripe_payments_pipeline
2

Keep datasets organized

Use consistent dataset naming conventions to group related tables
3

Use dev_mode during development

Enable dev_mode to avoid polluting production datasets while testing
4

Monitor load_info

Always check the returned LoadInfo object for errors and metrics
State Persistence: Pipeline state is stored locally in pipelines_dir. Don’t delete this directory if you need incremental loads or want to preserve schemas.
  • Source - Groups related resources
  • Resource - Individual data streams
  • Destination - Where data is loaded
  • Schema - Describes data structure
  • State - Tracks incremental loading progress

Build docs developers (and LLMs) love