Skip to main content

Function Signature

def attach(
    pipeline_name: str = None,
    pipelines_dir: str = None,
    pipeline_salt: TSecretStrValue = None,
    destination: TDestinationReferenceArg = None,
    staging: TDestinationReferenceArg = None,
    progress: TCollectorArg = _NULL_COLLECTOR,
    dataset_name: str = None,
) -> Pipeline

Description

Attaches to the working folder of pipeline_name in pipelines_dir or in the default directory. This function is used to reconnect to an existing pipeline that was previously created, allowing you to resume operations or inspect pipeline state. Pre-configured destination and staging factories may be provided. If not present, default factories are created from the pipeline state. If no local pipeline state is found, dlt will attempt to restore the pipeline from the provided destination and dataset.

Parameters

pipeline_name
str
default:"None"
The name of the pipeline to attach to. This should match the name of an existing pipeline. If not provided, dlt will look for configuration in environment variables or config.toml.
pipelines_dir
str
default:"None"
The directory where pipeline working folders are stored. If not provided, defaults to the user home directory: ~/dlt/pipelines/.
pipeline_salt
TSecretStrValue
default:"None"
A random value used for deterministic hashing during data anonymization. Must match the salt used when the pipeline was originally created. Defaults to a value derived from the pipeline name.
destination
TDestinationReferenceArg
default:"None"
A name of the destination, or a destination module imported from dlt.destination. If not provided, the destination configuration is read from the pipeline state. Can be a string (destination name), a Destination instance, a callable returning a Destination, or None.This parameter is useful when restoring a pipeline from a remote destination where local state is not available.
staging
TDestinationReferenceArg
default:"None"
A name of the staging destination, or a destination module imported from dlt.destination. If not provided, staging configuration is read from the pipeline state.
progress
TCollectorArg
default:"_NULL_COLLECTOR"
A progress monitor that shows progress bars, console or log messages. Pass a string with a collector name or configure your own from the dlt.progress module.Supported: tqdm, enlighten, alive_progress, or log.
dataset_name
str
default:"None"
The name of the dataset. If provided along with destination, this is used to restore the pipeline state from the remote destination when local state is not found.

Returns

pipeline
Pipeline
An instance of the Pipeline class that is attached to the existing working folder. The pipeline state, schemas, and configuration are loaded from the working directory or restored from the destination.

Raises

CannotRestorePipelineException
Exception
Raised when:
  • No local pipeline state is found at the specified location
  • The pipeline cannot be restored from the destination (no destination provided or pipeline not found in destination)
  • The destination and dataset do not contain state for this pipeline

Behavior Details

Local State Available

When local pipeline state exists in the working folder:
  1. The pipeline state and schemas are loaded from the local files
  2. The destination and staging configurations are taken from the state (unless overridden by parameters)
  3. The pipeline is activated and ready to use

Local State Not Available

When no local state is found, attach will attempt to restore from the destination:
  1. A new pipeline instance is created with the provided parameters
  2. If destination is provided, attach attempts to sync state from the remote destination
  3. The pipeline state and schemas are downloaded from the destination
  4. If the remote state is not found, the working folder is wiped and CannotRestorePipelineException is raised

Important Notes

  • The pipeline is always created with dev_mode=False to prevent wiping the working folder
  • When attaching, the dev_mode setting is restored from the pipeline state
  • The attached pipeline becomes the active pipeline instance

Examples

Attach to an existing pipeline

import dlt

# Attach to a pipeline that was previously created
pipeline = dlt.attach(pipeline_name="my_pipeline")

# Continue working with the pipeline
load_info = pipeline.run(new_data, table_name="updates")

Attach with custom pipelines directory

import dlt

pipeline = dlt.attach(
    pipeline_name="production_pipeline",
    pipelines_dir="/opt/dlt/pipelines"
)

Attach and restore from destination

import dlt

# If local state is missing, restore from BigQuery
pipeline = dlt.attach(
    pipeline_name="data_pipeline",
    destination="bigquery",
    dataset_name="production_data"
)

Attach with progress monitoring

import dlt

pipeline = dlt.attach(
    pipeline_name="monitored_pipeline",
    progress="tqdm"
)

Inspect attached pipeline state

import dlt

pipeline = dlt.attach(pipeline_name="my_pipeline")

# Check pipeline properties
print(f"Pipeline: {pipeline.pipeline_name}")
print(f"Destination: {pipeline.destination}")
print(f"Dataset: {pipeline.dataset_name}")
print(f"Schemas: {pipeline.schema_names}")

# Check for pending data
if pipeline.has_pending_data:
    print("Pipeline has pending data to load")
    # Complete pending loads
    pipeline.run(None)

Handle attachment errors

import dlt
from dlt.pipeline.exceptions import CannotRestorePipelineException

try:
    pipeline = dlt.attach(pipeline_name="my_pipeline")
except CannotRestorePipelineException as e:
    print(f"Cannot attach to pipeline: {e}")
    # Create a new pipeline instead
    pipeline = dlt.pipeline(
        pipeline_name="my_pipeline",
        destination="duckdb",
        dataset_name="my_data"
    )

Attach in production environment

import dlt
import os

# Use environment variables for configuration
pipeline = dlt.attach(
    pipeline_name=os.getenv("PIPELINE_NAME"),
    pipelines_dir=os.getenv("PIPELINES_DIR", "/var/lib/dlt/pipelines")
)

# Run incremental load
load_info = pipeline.run(my_source())

Use Cases

Resuming a Pipeline

Use attach to resume a pipeline that was interrupted or to continue loading data in a subsequent run:
import dlt

# First run (creates pipeline)
pipeline = dlt.pipeline(
    pipeline_name="incremental_load",
    destination="postgres",
    dataset_name="events"
)
pipeline.run(get_events())

# Later run (attaches to existing pipeline)
pipeline = dlt.attach(pipeline_name="incremental_load")
pipeline.run(get_events())  # Continues from last state

Multi-Process Pipelines

Use attach in worker processes to share pipeline state:
import dlt
from multiprocessing import Process

def worker_process(pipeline_name):
    # Each worker attaches to the same pipeline
    pipeline = dlt.attach(pipeline_name=pipeline_name)
    # Perform work...
    
if __name__ == "__main__":
    # Main process creates the pipeline
    pipeline = dlt.pipeline(
        pipeline_name="shared_pipeline",
        destination="snowflake",
        dataset_name="data"
    )
    
    # Workers attach to it
    processes = [
        Process(target=worker_process, args=("shared_pipeline",))
        for _ in range(4)
    ]
    for p in processes:
        p.start()

Pipeline Inspection and Debugging

import dlt

pipeline = dlt.attach(pipeline_name="my_pipeline")

# Inspect state
state = pipeline.state
print(f"Last run at: {state.get('_state_version')}")

# Check schemas
for schema_name in pipeline.schema_names:
    schema = pipeline.schemas[schema_name]
    print(f"Schema {schema_name} has {len(schema.tables)} tables")

# List load packages
for package_info in pipeline.list_normalized_load_packages():
    print(f"Package {package_info.load_id}: {package_info.state}")

See Also

  • pipeline - Create a new pipeline instance
  • run - Load data using a pipeline

Build docs developers (and LLMs) love