Performance Optimization
Optimize your dlt pipelines for speed, scalability, and memory efficiency. This guide covers extract, normalize, and load stage optimizations.
Yield Pages Instead of Rows
Yielding data in batches significantly improves performance by reducing function call overhead.
Inefficient - Yielding rows
Efficient - Yielding pages
import dlt
@dlt.resource
def database_cursor ():
"""Yields one row at a time - SLOW"""
for row_id in range ( 10000 ):
yield { "row" : row_id, "data" : f "row_ { row_id } " }
Yielding pages of 1000-5000 items provides the best performance for most use cases. Adjust based on item size and memory constraints.
Extract data concurrently using thread or async pools.
Thread-based Parallelism
Use parallelized=True to execute resources in parallel threads:
import dlt
import time
from threading import current_thread
@dlt.resource ( parallelized = True )
def list_users ( n_users ):
"""Fetch user IDs in parallel"""
for i in range ( 1 , n_users + 1 ):
if i % 10 == 0 :
time.sleep( 0.1 ) # Simulate API call
yield i
@dlt.transformer ( parallelized = True )
def get_user_details ( user_id ):
"""Fetch user details in parallel"""
time.sleep( 0.1 ) # Simulate API call
print ( f "Processing user { user_id } in thread { current_thread().name } " )
return { "id" : user_id, "name" : f "user_ { user_id } " }
@dlt.source
def user_data ():
return list_users( 100 ) | get_user_details
# Resources execute in parallel
pipeline = dlt.pipeline(
pipeline_name = 'parallel_users' ,
destination = 'duckdb'
)
pipeline.run(user_data())
Async Parallelism
Use async/await for I/O-bound operations:
import dlt
import asyncio
@dlt.resource
async def async_list_items ( start , limit ):
"""Async resource - automatically parallelized"""
for i in range (start, start + limit):
await asyncio.sleep( 0.1 ) # Simulate async API call
yield i
@dlt.transformer
async def async_get_details ( item_id ):
"""Async transformer"""
await asyncio.sleep( 0.1 )
return { "id" : item_id, "details" : f "item_ { item_id } " }
# Async resources run concurrently
pipeline.run(async_list_items( 0 , 50 ) | async_get_details)
Async generators are automatically wrapped and evaluated concurrently. Use parallelized=True only for sync generators.
Control the number of workers for parallel extraction:
Thread pool workers
Async concurrency
Environment variables
# config.toml
[ extract ]
workers = 10 # Default is 5
# Per-source configuration
[ sources . my_api . extract ]
workers = 15
Choose between round_robin (default) and fifo extraction:
# config.toml
[ extract ]
# round_robin: Extract resources in parallel (default)
# fifo: Extract resources sequentially
item_mode = "fifo"
Use fifo when debugging complex sources with many resources and transformers. It provides deterministic, sequential execution.
Use Built-in HTTP Clients
The built-in HTTP clients provide automatic retry logic and error handling:
import dlt
from dlt.sources.helpers import requests
@dlt.resource
def api_data ():
# Automatic retry on network errors
response = requests.get(
"https://api.example.com/data" ,
timeout = 30 ,
# Retries with exponential backoff
)
yield response.json()
Memory and Disk Management
Control Buffer Sizes
Adjust in-memory buffer sizes to optimize memory usage:
Global buffer size
Extract vs normalize buffers
Per-source configuration
# config.toml
[ data_writer ]
buffer_max_items = 10000 # Default is 5000
Increasing buffer sizes improves performance but uses more memory. Monitor your system’s memory usage when adjusting these values.
File Rotation
Enable file rotation to parallelize normalization and loading:
# config.toml
[ normalize . data_writer ]
# Rotate after 100K items
file_max_items = 100000
# Or rotate when file exceeds 10MB
file_max_bytes = 10485760
File rotation is critical for large datasets. Without rotation, dlt creates a single file per resource, which cannot be processed in parallel. Enable rotation to leverage parallel processing.
Disable Compression
Disable compression for faster processing (at the cost of disk space):
# config.toml
[ normalize . data_writer ]
disable_compression = true
Clean Up Load Packages
Free disk space by removing completed load packages:
import dlt
pipeline = dlt.pipeline(
pipeline_name = 'cleanup_demo' ,
destination = 'postgres' ,
dataset_name = 'data'
)
# Enable automatic cleanup
info = pipeline.run(
my_source(),
loader_file_format = "parquet"
)
# Manually delete completed packages
pipeline.drop_pending_packages( delete_completed = True )
Or configure automatic deletion:
# config.toml
[ load ]
delete_completed_jobs = true
Normalize Stage Optimization
Parallel Normalization
Use process pools to normalize files concurrently:
# config.toml
[ normalize ]
workers = 4 # Default is 1 (no parallelization)
Normalization uses process pools (not threads) because it’s CPU-intensive. Each extracted file is sent to a worker process.
Choose efficient file formats for your destination:
Parquet - Best for analytics
JSONL - Universal compatibility
Insert values - Transactional
pipeline.run(
source(),
loader_file_format = "parquet" # Columnar, compressed
)
Load Stage Optimization
Parallel Loading
Load files concurrently to your destination:
# config.toml
[ load ]
workers = 8 # Default is 20
parallelism_strategy = "table-sequential" # or "parallel"
Monitoring and Profiling
Enable performance monitoring:
# config.toml
progress = "log"
[ runtime ]
log_level = "INFO"
Or in code:
import dlt
from dlt.pipeline.progress import log
pipeline = dlt.pipeline(
pipeline_name = 'monitored' ,
destination = 'bigquery' ,
dataset_name = 'analytics' ,
progress = log # Log progress with CPU/memory stats
)
info = pipeline.run(source())
# Access metrics
print ( f "Extracted { info.extract.metrics.bytes_extracted } bytes" )
print ( f "Normalized { info.normalize.metrics.files_written } files" )
print ( f "Loaded { info.load.metrics.files_loaded } files" )
Best Practices
Profile your pipeline
Identify bottlenecks using the progress="log" option and examine metrics.
Optimize extract first
Yield pages, enable parallelism, and use efficient HTTP clients.
Enable file rotation
Crucial for large datasets to enable parallel normalization and loading.
Tune worker pools
Adjust extract, normalize, and load workers based on your workload.
Choose the right format
Use Parquet for analytics, JSONL for flexibility, insert_values for transactions.
Common pitfalls:
Yielding single rows instead of pages
Not enabling file rotation for large datasets
Using default buffer sizes for high-volume pipelines
Running CPU-intensive operations in async code