Skip to main content
The Pipeline class is the core component of dlt that orchestrates the data loading process. It manages the extract, normalize, and load steps, handles state synchronization, and provides methods to run complete data loading workflows.

Creating a Pipeline

Pipelines are typically created using the dlt.pipeline() function rather than instantiating the class directly.
import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="bigquery",
    dataset_name="my_dataset"
)

Properties

pipeline_name

The name of the pipeline.
name = pipeline.pipeline_name

dataset_name

The name of the dataset where data will be loaded.
dataset = pipeline.dataset_name

default_schema

Returns the default schema associated with the pipeline.
schema = pipeline.default_schema
Returns: Schema - The default schema object Source: ~/workspace/source/dlt/pipeline/pipeline.py:961

state

Returns the current pipeline state as a dictionary.
state = pipeline.state
Returns: TPipelineState - Dictionary with the pipeline state Source: ~/workspace/source/dlt/pipeline/pipeline.py:965

has_data

Tells if the pipeline contains any data: schemas, extracted files, load packages or loaded packages.
if pipeline.has_data:
    print("Pipeline contains data")
Returns: bool - True if pipeline has data Source: ~/workspace/source/dlt/pipeline/pipeline.py:938

has_pending_data

Tells if the pipeline contains any pending packages to be normalized or loaded.
if pipeline.has_pending_data:
    print("Pipeline has pending data")
Returns: bool - True if pipeline has pending data Source: ~/workspace/source/dlt/pipeline/pipeline.py:948

last_trace

Returns or loads last trace generated by pipeline.
trace = pipeline.last_trace
Returns: PipelineTrace - The last trace object Source: ~/workspace/source/dlt/pipeline/pipeline.py:975

Methods

run()

Loads data from a source into the destination. This is the primary method for executing a complete ETL pipeline.
info = pipeline.run(
    data,
    table_name="users",
    write_disposition="append"
)
data
Any
Data to be loaded to destination. Can be a list, iterator, generator, dlt.resource, or dlt.source.
destination
TDestinationReferenceArg
default:"None"
A name of the destination or a destination module. If not provided, uses the value from pipeline initialization.
staging
TDestinationReferenceArg
default:"None"
A name of the staging destination for intermediate data loading.
dataset_name
str
default:"None"
Dataset name where data will be loaded. Defaults to pipeline_name if not specified.
credentials
Any
default:"None"
Credentials for the destination. Usually inferred from secrets.toml or environment variables.
table_name
str
default:"None"
Name of the table to load data into. Required for lists/iterables without name attribute.
write_disposition
TWriteDispositionConfig
default:"None"
Controls how to write data: “append”, “replace”, “skip”, or “merge”.
columns
TAnySchemaColumns
default:"None"
Column schemas with names, data types, and performance hints.
primary_key
TColumnNames
default:"None"
Column name(s) that comprise the primary key. Used with merge write disposition.
schema
Schema
default:"None"
An explicit Schema object to group table schemas.
loader_file_format
TLoaderFileFormat
default:"None"
The file format for load packages (e.g., “jsonl”, “parquet”).
table_format
TTableFormat
default:"None"
The table format used by destination (“delta” or “iceberg”).
schema_contract
TSchemaContract
default:"None"
Schema contract settings override for all tables.
refresh
TRefreshMode
default:"None"
Refresh mode: “drop_sources”, “drop_resources”, or “drop_data”.
Returns: LoadInfo - Information about the loaded data including package ids and job statuses. Source: ~/workspace/source/dlt/pipeline/pipeline.py:637

extract()

Extracts data and prepares it for normalization. Does not require destination configuration.
info = pipeline.extract(
    data,
    table_name="users"
)
Accepts the same parameters as run() except for destination-related parameters. Returns: ExtractInfo - Information about the extraction step Source: ~/workspace/source/dlt/pipeline/pipeline.py:424

normalize()

Normalizes extracted data, infers schema, and creates load packages.
info = pipeline.normalize(workers=4)
workers
int
default:"1"
Number of worker processes for normalization.
Returns: NormalizeInfo - Information about the normalization step Source: ~/workspace/source/dlt/pipeline/pipeline.py:527

load()

Loads prepared packages into the destination.
info = pipeline.load(
    destination="bigquery",
    dataset_name="my_dataset"
)
destination
TDestinationReferenceArg
default:"None"
Destination to load data into.
dataset_name
str
default:"None"
Dataset name at the destination.
credentials
Any
default:"None"
Credentials for the destination.
workers
int
default:"20"
Number of parallel loading workers.
raise_on_failed_jobs
bool
default:"ConfigValue"
Whether to raise exception on failed jobs.
Returns: LoadInfo - Information about the load step Source: ~/workspace/source/dlt/pipeline/pipeline.py:578

activate()

Activates the pipeline, making it the active context for dlt components.
pipeline.activate()
Source: ~/workspace/source/dlt/pipeline/pipeline.py:909

deactivate()

Deactivates the currently active pipeline.
pipeline.deactivate()
Source: ~/workspace/source/dlt/pipeline/pipeline.py:929

drop()

Deletes local pipeline state, schemas, and working files. Re-initializes the pipeline.
pipeline = pipeline.drop()
# Or create new pipeline with different name
pipeline = pipeline.drop(pipeline_name="new_name")
pipeline_name
str
default:"None"
Optional new pipeline name. Creates and activates new instance if different from current name.
Returns: Pipeline - The pipeline instance (self or new instance) Source: ~/workspace/source/dlt/pipeline/pipeline.py:375

sync_destination()

Synchronizes pipeline state with the destination’s state.
pipeline.sync_destination(
    destination="bigquery",
    dataset_name="my_dataset"
)
destination
TDestinationReferenceArg
default:"None"
Destination to sync with.
staging
TDestinationReferenceArg
default:"None"
Staging destination.
dataset_name
str
default:"None"
Dataset name.
Source: ~/workspace/source/dlt/pipeline/pipeline.py:774

sync_schema()

Synchronizes a schema with the destination.
tables = pipeline.sync_schema("my_schema")
schema_name
str
default:"None"
Name of schema to sync. Uses default schema if not provided.
Returns: TSchemaTables - Dictionary of synchronized tables Source: ~/workspace/source/dlt/pipeline/pipeline.py:1062

list_extracted_load_packages()

Returns a list of all load package IDs that are or will be normalized.
packages = pipeline.list_extracted_load_packages()
Returns: Sequence[str] - List of load package IDs Source: ~/workspace/source/dlt/pipeline/pipeline.py:1021

list_normalized_load_packages()

Returns a list of all load package IDs that are or will be loaded.
packages = pipeline.list_normalized_load_packages()
Returns: Sequence[str] - List of load package IDs Source: ~/workspace/source/dlt/pipeline/pipeline.py:1025

list_completed_load_packages()

Returns a list of all load package IDs that are completely loaded.
packages = pipeline.list_completed_load_packages()
Returns: Sequence[str] - List of load package IDs Source: ~/workspace/source/dlt/pipeline/pipeline.py:1029

get_load_package_info()

Returns information about a specific load package.
info = pipeline.get_load_package_info("1234567890")
load_id
str
required
The load package ID.
Returns: LoadPackageInfo - Information about the package including jobs and statuses Source: ~/workspace/source/dlt/pipeline/pipeline.py:1033

drop_pending_packages()

Deletes all extracted and normalized packages, including partially loaded ones by default.
pipeline.drop_pending_packages(with_partial_loads=True)
with_partial_loads
bool
default:"True"
Whether to also delete partially loaded packages.
Source: ~/workspace/source/dlt/pipeline/pipeline.py:1048

set_local_state_val()

Sets a value in local state (not synchronized with destination).
pipeline.set_local_state_val("last_run", "2024-01-01")
key
str
required
State key.
value
Any
required
Value to store.
Source: ~/workspace/source/dlt/pipeline/pipeline.py:1079

get_local_state_val()

Gets a value from local state.
value = pipeline.get_local_state_val("last_run")
key
str
required
State key to retrieve.
Returns: Any - The stored value Source: ~/workspace/source/dlt/pipeline/pipeline.py:1090

Example Usage

Basic Pipeline

import dlt

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="bigquery",
    dataset_name="github_data"
)

# Load data
info = pipeline.run(
    data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
    table_name="users",
    write_disposition="append"
)

print(f"Loaded {info.load_packages[0].jobs[0].completed_jobs} jobs")

Incremental Loading

import dlt
from dlt.sources.helpers import incremental

@dlt.resource
def users(updated_at=dlt.sources.incremental("updated_at")):
    # Your data fetching logic
    yield from api.get_users(since=updated_at.last_value)

pipeline = dlt.pipeline(
    pipeline_name="incremental_pipeline",
    destination="postgres",
    dataset_name="production"
)

info = pipeline.run(users())

Step-by-Step Processing

import dlt

pipeline = dlt.pipeline(
    pipeline_name="step_pipeline",
    destination="duckdb",
    dataset_name="data"
)

# Extract
extract_info = pipeline.extract(source_data)
print(f"Extracted: {extract_info.loads_ids}")

# Normalize
normalize_info = pipeline.normalize()
print(f"Normalized: {normalize_info.loads_ids}")

# Load
load_info = pipeline.load()
print(f"Loaded: {load_info.loads_ids}")

Build docs developers (and LLMs) love