Skip to main content

Deployment Strategies

Deploy dlt pipelines to production using orchestrators like Airflow, cloud functions, or cron jobs. dlt is designed to integrate seamlessly with existing workflows.

Airflow Integration

dlt provides native Airflow integration through the PipelineTasksGroup helper, which maps dlt resources to Airflow tasks.

Deploy with Airflow

1

Install CLI dependencies

First, install the dlt CLI tools:
pip install "dlt[cli]"
2

Run deploy command

Deploy your pipeline to Airflow:
dlt deploy my_pipeline.py airflow-composer
This creates:
  • dags/dag_my_pipeline.py - Airflow DAG file
  • build/cloudbuild.yaml - Cloud Build configuration (for GCP)
3

Configure the DAG

Edit the generated DAG file:
import dlt
from airflow.decorators import dag
from dlt.common import pendulum
from dlt.helpers.airflow_helper import PipelineTasksGroup
from tenacity import Retrying, stop_after_attempt

default_task_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': '[email protected]',
    'email_on_failure': True,
    'retries': 3,
}

@dag(
    schedule='@daily',
    start_date=pendulum.datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    default_args=default_task_args
)
def load_pipeline_data():
    # Configure task group
    tasks = PipelineTasksGroup(
        pipeline_name="my_pipeline",
        use_data_folder=False,
        wipe_local_data=True,
        use_task_logger=True,
        retry_policy=Retrying(
            stop=stop_after_attempt(3),
            reraise=True
        ),
    )

    # Import your source
    from my_pipeline import my_source

    # Create pipeline
    pipeline = dlt.pipeline(
        pipeline_name='my_pipeline',
        destination='bigquery',
        dataset_name='production_data',
        dev_mode=False
    )

    # Add pipeline run to task group
    tasks.add_run(
        pipeline=pipeline,
        data=my_source(),
        decompose="serialize",
        trigger_rule="all_done",
        retries=0,
        provide_context=True
    )

load_pipeline_data()

Task Decomposition Strategies

Control how dlt resources map to Airflow tasks:
# Resources execute sequentially
# Each connected component becomes one task
tasks.add_run(
    pipeline=pipeline,
    data=source(),
    decompose="serialize",
    trigger_rule="all_done"
)

PipelineTasksGroup Configuration

Configure the task group behavior:
from dlt.helpers.airflow_helper import PipelineTasksGroup

tasks = PipelineTasksGroup(
    pipeline_name="my_pipeline",
    
    # Storage
    use_data_folder=False,  # Use GCS bucket (Composer only)
    local_data_folder="/tmp",  # Local temp folder
    wipe_local_data=True,  # Clean up after completion
    
    # Logging
    use_task_logger=True,  # Redirect dlt logs to Airflow
    log_progress_period=30.0,  # Log progress every 30s
    
    # Performance
    buffer_max_items=1000,  # Buffer size for data items
    
    # Error handling
    abort_task_if_any_job_failed=True,  # Fail task on job failure
    retry_policy=Retrying(
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    ),
    retry_pipeline_steps=["load"],  # Only retry load step
    
    # Observability
    save_load_info=False,  # Save load info to destination
    save_trace_info=False,  # Save trace info to destination
)

Passing Credentials

There are two ways to pass credentials to Airflow:
Store secrets in the dlt_secrets_toml Airflow variable:
  1. Generate deployment with TOML format:
dlt deploy my_pipeline.py airflow-composer --secrets-format toml
  1. Copy the output TOML string to Airflow UI:
    • Navigate to AdminVariables
    • Create variable dlt_secrets_toml
    • Paste the TOML content:
[sources.my_api]
api_key = "your_api_key_here"

[destination.bigquery.credentials]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "[email protected]"
The dlt_secrets_toml variable acts as a replacement for your local secrets.toml file. It can contain all your configuration values.

Cloud Function Deployment

Deploy dlt pipelines as serverless functions.

Google Cloud Functions

# main.py
import dlt
from my_source import my_source

def run_pipeline(request):
    """HTTP Cloud Function entrypoint"""
    pipeline = dlt.pipeline(
        pipeline_name='cloud_function_pipeline',
        destination='bigquery',
        dataset_name='production'
    )
    
    # Run pipeline
    load_info = pipeline.run(my_source())
    
    return {
        'status': 'success',
        'packages': len(load_info.loads_ids),
        'dataset': pipeline.dataset_name
    }
# requirements.txt
dlt[bigquery]>=0.4.0
Deploy:
gcloud functions deploy run_pipeline \
  --runtime python311 \
  --trigger-http \
  --entry-point run_pipeline \
  --set-env-vars DESTINATION__BIGQUERY__CREDENTIALS='{...}'

AWS Lambda

# lambda_function.py
import dlt
from my_source import my_source

def lambda_handler(event, context):
    pipeline = dlt.pipeline(
        pipeline_name='lambda_pipeline',
        destination='redshift',
        dataset_name='production'
    )
    
    load_info = pipeline.run(my_source())
    
    return {
        'statusCode': 200,
        'body': f'Loaded {len(load_info.loads_ids)} packages'
    }

GitHub Actions

Run pipelines on a schedule using GitHub Actions:
# .github/workflows/run_pipeline.yml
name: Run dlt Pipeline

on:
  schedule:
    - cron: '0 2 * * *'  # Daily at 2 AM UTC
  workflow_dispatch:  # Manual trigger

jobs:
  run_pipeline:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
      
      - name: Run pipeline
        env:
          DESTINATION__BIGQUERY__CREDENTIALS: ${{ secrets.GCP_CREDENTIALS }}
          SOURCES__MY_API__API_KEY: ${{ secrets.API_KEY }}
        run: |
          python my_pipeline.py

Kubernetes CronJob

Deploy as a Kubernetes CronJob:
# cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: dlt-pipeline
spec:
  schedule: "0 2 * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: pipeline
            image: my-registry/dlt-pipeline:latest
            env:
            - name: DESTINATION__POSTGRES__CREDENTIALS
              valueFrom:
                secretKeyRef:
                  name: dlt-secrets
                  key: postgres-credentials
            command: ["python", "my_pipeline.py"]
          restartPolicy: OnFailure

Best Practices

1

Use dev_mode=False in production

Always disable dev mode for production pipelines:
pipeline = dlt.pipeline(
    pipeline_name='prod_pipeline',
    destination='snowflake',
    dataset_name='production',
    dev_mode=False  # Critical for production
)
2

Implement retry logic

Use retry policies for resilient pipelines:
from tenacity import Retrying, stop_after_attempt, retry_if_exception
from dlt.pipeline.helpers import retry_load

for attempt in Retrying(
    stop=stop_after_attempt(3),
    retry=retry_if_exception(retry_load(())),
    reraise=True
):
    with attempt:
        pipeline.run(source())
3

Monitor pipeline health

Save load info and traces for observability:
load_info = pipeline.run(source())

# Save to destination
load_info.raise_on_failed_jobs()
pipeline.run([load_info], table_name="_load_info", write_disposition="append")

# Access metrics
print(f"Rows loaded: {load_info.metrics.get('rows')}")
print(f"Bytes processed: {load_info.metrics.get('bytes')}")
4

Use secrets management

Never hardcode credentials. Use environment variables or secret managers:
import os
import dlt

# Credentials from environment
pipeline = dlt.pipeline(
    pipeline_name='secure_pipeline',
    destination='bigquery',
    dataset_name=os.getenv('DATASET_NAME')
)
Production checklist:
  • Set dev_mode=False
  • Configure retry policies
  • Use secret management (not hardcoded credentials)
  • Enable monitoring and alerting
  • Clean up completed load packages
  • Set appropriate log levels
  • Test with production-like data volumes

Build docs developers (and LLMs) love