A pipeline is the main object in dlt that orchestrates the entire data loading process. It connects your data source to a destination, manages schemas, handles state, and executes the extract-normalize-load workflow.
What is a Pipeline?
A pipeline in dlt is responsible for:
- Extracting data from sources
- Normalizing data into a structured format
- Loading data into destinations
- Managing schemas that describe your data structure
- Persisting state across runs for incremental loading
- Tracking metadata about load operations
Think of a pipeline as the control center that coordinates all data movement from API to database.
Creating a Pipeline
Create a pipeline using the dlt.pipeline() function:
import dlt
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="player_data"
)
Key Parameters
pipeline_name (str): Identifies the pipeline in monitoring and state storage. Defaults to the script filename with dlt_ prefix.
destination (str | Destination): Where data will be loaded (e.g., “duckdb”, “bigquery”, “postgres”).
dataset_name (str): Logical grouping of tables (schema in SQL databases). Defaults to pipeline_name.
pipelines_dir (str): Working directory for pipeline state and temp files. Defaults to ~/.dlt/pipelines/.
Running a Pipeline
Simple Run
The most common way to use a pipeline is with the run() method:
import dlt
from dlt.sources.helpers import requests
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="player_data"
)
# Fetch data
data = []
for player in ["magnuscarlsen", "rpragchess"]:
response = requests.get(f"https://api.chess.com/pub/player/{player}")
data.append(response.json())
# Extract, normalize, and load
load_info = pipeline.run(data, table_name="player")
print(load_info)
Using with Sources and Resources
import dlt
@dlt.resource(write_disposition="replace")
def github_issues(api_secret_key: str = dlt.secrets.value):
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
for page in paginate(url, auth=auth, params={"state": "open"}):
yield page
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_data"
)
load_info = pipeline.run(github_issues())
Pipeline Methods
run()
Executes the complete extract-normalize-load workflow:
load_info = pipeline.run(
data, # Source data, resource, or list of resources
destination="duckdb", # Override pipeline destination
dataset_name="my_dataset", # Override dataset name
table_name="my_table", # Table name for raw data
write_disposition="append", # How to write: append, replace, merge
schema=my_schema, # Custom schema
loader_file_format="parquet" # File format: jsonl, parquet, etc.
)
Returns: LoadInfo object with details about loaded packages and any failed jobs.
Extract data without normalizing or loading:
extract_info = pipeline.extract(github_issues())
normalize()
Normalize previously extracted data:
normalize_info = pipeline.normalize()
load()
Load previously normalized data:
load_info = pipeline.load()
Separate extract-normalize-load steps give you granular control, but run() is recommended for most use cases.
Advanced Features
Development Mode
Start fresh on each run with timestamped datasets:
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="player_data",
dev_mode=True # Creates dataset_name_YYYYMMDD_HHMMSS
)
Refresh Mode
Reset data or state during pipeline runs:
# Drop all source data and state
load_info = pipeline.run(my_source(), refresh="drop_sources")
# Drop specific resource data and state
load_info = pipeline.run(my_source(), refresh="drop_resources")
# Wipe data but keep schema
load_info = pipeline.run(my_source(), refresh="drop_data")
Attach to Existing Pipeline
Reconnect to a pipeline created in a previous run:
pipeline = dlt.attach(
pipeline_name="chess_pipeline",
pipelines_dir="/path/to/pipelines"
)
Get Current Pipeline
Access the most recently created or attached pipeline:
# Create a pipeline
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb"
)
# Later, get reference to it
pipeline = dlt.pipeline() # No arguments returns current pipeline
Shorthand: dlt.run()
For quick one-off loads, use dlt.run() without creating an explicit pipeline:
import dlt
# Creates a pipeline automatically
load_info = dlt.run(
[1, 2, 3],
destination="duckdb",
dataset_name="numbers",
table_name="my_numbers"
)
Access pipeline properties:
print(pipeline.pipeline_name) # Pipeline name
print(pipeline.dataset_name) # Current dataset
print(pipeline.default_schema_name) # Default schema name
print(pipeline.working_dir) # Pipeline working directory
print(pipeline.first_run) # True if never run before
Type Signature
From /home/daytona/workspace/source/dlt/pipeline/__init__.py:32-46:
def pipeline(
pipeline_name: str = None,
pipelines_dir: str = None,
pipeline_salt: TSecretStrValue = None,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
import_schema_path: str = None,
export_schema_path: str = None,
full_refresh: bool = None,
dev_mode: bool = False,
refresh: TRefreshMode = None,
progress: TCollectorArg = _NULL_COLLECTOR,
_impl_cls: Type[TPipeline] = Pipeline,
) -> TPipeline
Best Practices
Use descriptive pipeline names
Choose names that reflect the data source and purpose: github_issues_pipeline, stripe_payments_pipeline
Keep datasets organized
Use consistent dataset naming conventions to group related tables
Use dev_mode during development
Enable dev_mode to avoid polluting production datasets while testing
Monitor load_info
Always check the returned LoadInfo object for errors and metrics
State Persistence: Pipeline state is stored locally in pipelines_dir. Don’t delete this directory if you need incremental loads or want to preserve schemas.
- Source - Groups related resources
- Resource - Individual data streams
- Destination - Where data is loaded
- Schema - Describes data structure
- State - Tracks incremental loading progress