Deployment Strategies
Deploy dlt pipelines to production using orchestrators like Airflow, cloud functions, or cron jobs. dlt is designed to integrate seamlessly with existing workflows.
Airflow Integration
dlt provides native Airflow integration through the PipelineTasksGroup helper, which maps dlt resources to Airflow tasks.
Deploy with Airflow
Install CLI dependencies
First, install the dlt CLI tools:
Run deploy command
Deploy your pipeline to Airflow: dlt deploy my_pipeline.py airflow-composer
This creates:
dags/dag_my_pipeline.py - Airflow DAG file
build/cloudbuild.yaml - Cloud Build configuration (for GCP)
Configure the DAG
Edit the generated DAG file: import dlt
from airflow.decorators import dag
from dlt.common import pendulum
from dlt.helpers.airflow_helper import PipelineTasksGroup
from tenacity import Retrying, stop_after_attempt
default_task_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'email' : '[email protected] ' ,
'email_on_failure' : True ,
'retries' : 3 ,
}
@dag (
schedule = '@daily' ,
start_date = pendulum.datetime( 2024 , 1 , 1 ),
catchup = False ,
max_active_runs = 1 ,
default_args = default_task_args
)
def load_pipeline_data ():
# Configure task group
tasks = PipelineTasksGroup(
pipeline_name = "my_pipeline" ,
use_data_folder = False ,
wipe_local_data = True ,
use_task_logger = True ,
retry_policy = Retrying(
stop = stop_after_attempt( 3 ),
reraise = True
),
)
# Import your source
from my_pipeline import my_source
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name = 'my_pipeline' ,
destination = 'bigquery' ,
dataset_name = 'production_data' ,
dev_mode = False
)
# Add pipeline run to task group
tasks.add_run(
pipeline = pipeline,
data = my_source(),
decompose = "serialize" ,
trigger_rule = "all_done" ,
retries = 0 ,
provide_context = True
)
load_pipeline_data()
Task Decomposition Strategies
Control how dlt resources map to Airflow tasks:
Serialize (Sequential)
Parallel
Parallel-Isolated
None
# Resources execute sequentially
# Each connected component becomes one task
tasks.add_run(
pipeline = pipeline,
data = source(),
decompose = "serialize" ,
trigger_rule = "all_done"
)
PipelineTasksGroup Configuration
Configure the task group behavior:
from dlt.helpers.airflow_helper import PipelineTasksGroup
tasks = PipelineTasksGroup(
pipeline_name = "my_pipeline" ,
# Storage
use_data_folder = False , # Use GCS bucket (Composer only)
local_data_folder = "/tmp" , # Local temp folder
wipe_local_data = True , # Clean up after completion
# Logging
use_task_logger = True , # Redirect dlt logs to Airflow
log_progress_period = 30.0 , # Log progress every 30s
# Performance
buffer_max_items = 1000 , # Buffer size for data items
# Error handling
abort_task_if_any_job_failed = True , # Fail task on job failure
retry_policy = Retrying(
stop = stop_after_attempt( 5 ),
wait = wait_exponential( multiplier = 1 , min = 4 , max = 10 ),
reraise = True
),
retry_pipeline_steps = [ "load" ], # Only retry load step
# Observability
save_load_info = False , # Save load info to destination
save_trace_info = False , # Save trace info to destination
)
Passing Credentials
There are two ways to pass credentials to Airflow:
Airflow Variables (TOML)
Environment Variables
Store secrets in the dlt_secrets_toml Airflow variable:
Generate deployment with TOML format:
dlt deploy my_pipeline.py airflow-composer --secrets-format toml
Copy the output TOML string to Airflow UI:
Navigate to Admin → Variables
Create variable dlt_secrets_toml
Paste the TOML content:
[ sources . my_api ]
api_key = "your_api_key_here"
[ destination . bigquery . credentials ]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY----- \n ..."
client_email = "[email protected] "
Store secrets as environment variables:
Generate deployment with env format:
dlt deploy my_pipeline.py airflow-composer --secrets-format env
Set environment variables in Composer/Airflow:
SOURCES__MY_API__API_KEY=your_api_key_here
DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID=my-project
DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY=-----BEGIN PRIVATE KEY-----...
The dlt_secrets_toml variable acts as a replacement for your local secrets.toml file. It can contain all your configuration values.
Cloud Function Deployment
Deploy dlt pipelines as serverless functions.
Google Cloud Functions
# main.py
import dlt
from my_source import my_source
def run_pipeline ( request ):
"""HTTP Cloud Function entrypoint"""
pipeline = dlt.pipeline(
pipeline_name = 'cloud_function_pipeline' ,
destination = 'bigquery' ,
dataset_name = 'production'
)
# Run pipeline
load_info = pipeline.run(my_source())
return {
'status' : 'success' ,
'packages' : len (load_info.loads_ids),
'dataset' : pipeline.dataset_name
}
# requirements.txt
dlt[bigquery]>=0.4.0
Deploy:
gcloud functions deploy run_pipeline \
--runtime python311 \
--trigger-http \
--entry-point run_pipeline \
--set-env-vars DESTINATION__BIGQUERY__CREDENTIALS='{...}'
AWS Lambda
# lambda_function.py
import dlt
from my_source import my_source
def lambda_handler ( event , context ):
pipeline = dlt.pipeline(
pipeline_name = 'lambda_pipeline' ,
destination = 'redshift' ,
dataset_name = 'production'
)
load_info = pipeline.run(my_source())
return {
'statusCode' : 200 ,
'body' : f 'Loaded { len (load_info.loads_ids) } packages'
}
GitHub Actions
Run pipelines on a schedule using GitHub Actions:
# .github/workflows/run_pipeline.yml
name : Run dlt Pipeline
on :
schedule :
- cron : '0 2 * * *' # Daily at 2 AM UTC
workflow_dispatch : # Manual trigger
jobs :
run_pipeline :
runs-on : ubuntu-latest
steps :
- uses : actions/checkout@v3
- name : Set up Python
uses : actions/setup-python@v4
with :
python-version : '3.11'
- name : Install dependencies
run : |
pip install -r requirements.txt
- name : Run pipeline
env :
DESTINATION__BIGQUERY__CREDENTIALS : ${{ secrets.GCP_CREDENTIALS }}
SOURCES__MY_API__API_KEY : ${{ secrets.API_KEY }}
run : |
python my_pipeline.py
Kubernetes CronJob
Deploy as a Kubernetes CronJob:
# cronjob.yaml
apiVersion : batch/v1
kind : CronJob
metadata :
name : dlt-pipeline
spec :
schedule : "0 2 * * *"
jobTemplate :
spec :
template :
spec :
containers :
- name : pipeline
image : my-registry/dlt-pipeline:latest
env :
- name : DESTINATION__POSTGRES__CREDENTIALS
valueFrom :
secretKeyRef :
name : dlt-secrets
key : postgres-credentials
command : [ "python" , "my_pipeline.py" ]
restartPolicy : OnFailure
Best Practices
Use dev_mode=False in production
Always disable dev mode for production pipelines: pipeline = dlt.pipeline(
pipeline_name = 'prod_pipeline' ,
destination = 'snowflake' ,
dataset_name = 'production' ,
dev_mode = False # Critical for production
)
Implement retry logic
Use retry policies for resilient pipelines: from tenacity import Retrying, stop_after_attempt, retry_if_exception
from dlt.pipeline.helpers import retry_load
for attempt in Retrying(
stop = stop_after_attempt( 3 ),
retry = retry_if_exception(retry_load(())),
reraise = True
):
with attempt:
pipeline.run(source())
Monitor pipeline health
Save load info and traces for observability: load_info = pipeline.run(source())
# Save to destination
load_info.raise_on_failed_jobs()
pipeline.run([load_info], table_name = "_load_info" , write_disposition = "append" )
# Access metrics
print ( f "Rows loaded: { load_info.metrics.get( 'rows' ) } " )
print ( f "Bytes processed: { load_info.metrics.get( 'bytes' ) } " )
Use secrets management
Never hardcode credentials. Use environment variables or secret managers: import os
import dlt
# Credentials from environment
pipeline = dlt.pipeline(
pipeline_name = 'secure_pipeline' ,
destination = 'bigquery' ,
dataset_name = os.getenv( 'DATASET_NAME' )
)
Production checklist:
Set dev_mode=False
Configure retry policies
Use secret management (not hardcoded credentials)
Enable monitoring and alerting
Clean up completed load packages
Set appropriate log levels
Test with production-like data volumes