Skip to main content

Performance Optimization

Optimize your dlt pipelines for speed, scalability, and memory efficiency. This guide covers extract, normalize, and load stage optimizations.

Extract Stage Optimization

Yield Pages Instead of Rows

Yielding data in batches significantly improves performance by reducing function call overhead.
import dlt

@dlt.resource
def database_cursor():
    """Yields one row at a time - SLOW"""
    for row_id in range(10000):
        yield {"row": row_id, "data": f"row_{row_id}"}
Yielding pages of 1000-5000 items provides the best performance for most use cases. Adjust based on item size and memory constraints.

Parallel Extraction

Extract data concurrently using thread or async pools.

Thread-based Parallelism

Use parallelized=True to execute resources in parallel threads:
import dlt
import time
from threading import current_thread

@dlt.resource(parallelized=True)
def list_users(n_users):
    """Fetch user IDs in parallel"""
    for i in range(1, n_users + 1):
        if i % 10 == 0:
            time.sleep(0.1)  # Simulate API call
        yield i

@dlt.transformer(parallelized=True)
def get_user_details(user_id):
    """Fetch user details in parallel"""
    time.sleep(0.1)  # Simulate API call
    print(f"Processing user {user_id} in thread {current_thread().name}")
    return {"id": user_id, "name": f"user_{user_id}"}

@dlt.source
def user_data():
    return list_users(100) | get_user_details

# Resources execute in parallel
pipeline = dlt.pipeline(
    pipeline_name='parallel_users',
    destination='duckdb'
)
pipeline.run(user_data())

Async Parallelism

Use async/await for I/O-bound operations:
import dlt
import asyncio

@dlt.resource
async def async_list_items(start, limit):
    """Async resource - automatically parallelized"""
    for i in range(start, start + limit):
        await asyncio.sleep(0.1)  # Simulate async API call
        yield i

@dlt.transformer
async def async_get_details(item_id):
    """Async transformer"""
    await asyncio.sleep(0.1)
    return {"id": item_id, "details": f"item_{item_id}"}

# Async resources run concurrently
pipeline.run(async_list_items(0, 50) | async_get_details)
Async generators are automatically wrapped and evaluated concurrently. Use parallelized=True only for sync generators.

Configure Worker Pools

Control the number of workers for parallel extraction:
# config.toml
[extract]
workers = 10  # Default is 5

# Per-source configuration
[sources.my_api.extract]
workers = 15

Resource Extraction Order

Choose between round_robin (default) and fifo extraction:
# config.toml
[extract]
# round_robin: Extract resources in parallel (default)
# fifo: Extract resources sequentially  
item_mode = "fifo"
Use fifo when debugging complex sources with many resources and transformers. It provides deterministic, sequential execution.

Use Built-in HTTP Clients

The built-in HTTP clients provide automatic retry logic and error handling:
import dlt
from dlt.sources.helpers import requests

@dlt.resource
def api_data():
    # Automatic retry on network errors
    response = requests.get(
        "https://api.example.com/data",
        timeout=30,
        # Retries with exponential backoff
    )
    yield response.json()

Memory and Disk Management

Control Buffer Sizes

Adjust in-memory buffer sizes to optimize memory usage:
# config.toml
[data_writer]
buffer_max_items = 10000  # Default is 5000
Increasing buffer sizes improves performance but uses more memory. Monitor your system’s memory usage when adjusting these values.

File Rotation

Enable file rotation to parallelize normalization and loading:
# config.toml
[normalize.data_writer]
# Rotate after 100K items
file_max_items = 100000

# Or rotate when file exceeds 10MB
file_max_bytes = 10485760
File rotation is critical for large datasets. Without rotation, dlt creates a single file per resource, which cannot be processed in parallel. Enable rotation to leverage parallel processing.

Disable Compression

Disable compression for faster processing (at the cost of disk space):
# config.toml
[normalize.data_writer]
disable_compression = true

Clean Up Load Packages

Free disk space by removing completed load packages:
import dlt

pipeline = dlt.pipeline(
    pipeline_name='cleanup_demo',
    destination='postgres',
    dataset_name='data'
)

# Enable automatic cleanup
info = pipeline.run(
    my_source(),
    loader_file_format="parquet"
)

# Manually delete completed packages
pipeline.drop_pending_packages(delete_completed=True)
Or configure automatic deletion:
# config.toml
[load]
delete_completed_jobs = true

Normalize Stage Optimization

Parallel Normalization

Use process pools to normalize files concurrently:
# config.toml
[normalize]
workers = 4  # Default is 1 (no parallelization)
Normalization uses process pools (not threads) because it’s CPU-intensive. Each extracted file is sent to a worker process.

Loader File Format

Choose efficient file formats for your destination:
pipeline.run(
    source(),
    loader_file_format="parquet"  # Columnar, compressed
)

Load Stage Optimization

Parallel Loading

Load files concurrently to your destination:
# config.toml
[load]
workers = 8  # Default is 20
parallelism_strategy = "table-sequential"  # or "parallel"

Monitoring and Profiling

Enable performance monitoring:
# config.toml
progress = "log"

[runtime]
log_level = "INFO"
Or in code:
import dlt
from dlt.pipeline.progress import log

pipeline = dlt.pipeline(
    pipeline_name='monitored',
    destination='bigquery',
    dataset_name='analytics',
    progress=log  # Log progress with CPU/memory stats
)

info = pipeline.run(source())

# Access metrics
print(f"Extracted {info.extract.metrics.bytes_extracted} bytes")
print(f"Normalized {info.normalize.metrics.files_written} files")
print(f"Loaded {info.load.metrics.files_loaded} files")

Best Practices

1

Profile your pipeline

Identify bottlenecks using the progress="log" option and examine metrics.
2

Optimize extract first

Yield pages, enable parallelism, and use efficient HTTP clients.
3

Enable file rotation

Crucial for large datasets to enable parallel normalization and loading.
4

Tune worker pools

Adjust extract, normalize, and load workers based on your workload.
5

Choose the right format

Use Parquet for analytics, JSONL for flexibility, insert_values for transactions.
Common pitfalls:
  • Yielding single rows instead of pages
  • Not enabling file rotation for large datasets
  • Using default buffer sizes for high-volume pipelines
  • Running CPU-intensive operations in async code

Build docs developers (and LLMs) love