Testing dlt Pipelines
Write comprehensive tests for your dlt pipelines using pytest and dlt’s testing utilities. Ensure data quality and pipeline reliability.Testing Philosophy
dlt pipelines are Python code and should be tested like any other software:
- Unit tests for individual sources and resources
- Integration tests for end-to-end pipeline execution
- Data quality tests for schema validation and data integrity
Unit Testing Sources and Resources
Test Resource Output
Verify that resources produce expected data:import pytest
import dlt
from my_pipeline import users_resource
def test_users_resource_output():
"""Test that users resource yields correct data"""
# Materialize the resource
result = list(users_resource())
# Assertions
assert len(result) > 0, "Resource should yield data"
assert all('user_id' in user for user in result), "All users should have user_id"
assert all('email' in user for user in result), "All users should have email"
# Check data types
first_user = result[0]
assert isinstance(first_user['user_id'], int)
assert isinstance(first_user['email'], str)
Test Transformers
Test that transformers correctly process data:import dlt
@dlt.resource
def raw_users():
yield [{'id': 1, 'status': 'active'}, {'id': 2, 'status': 'inactive'}]
@dlt.transformer(data_from=raw_users)
def active_users(user):
if user['status'] == 'active':
yield user
def test_active_users_transformer():
"""Test that transformer filters correctly"""
result = list(active_users)
assert len(result) == 1, "Should only yield active users"
assert result[0]['id'] == 1
assert result[0]['status'] == 'active'
Test Sources
Test complete source configurations:import dlt
from my_pipeline import my_source
def test_source_configuration():
"""Test source returns correct resources"""
source = my_source()
# Check resources exist
assert 'users' in source.resources
assert 'orders' in source.resources
# Check resources are selected
assert source.resources['users'].selected
assert source.resources['orders'].selected
def test_source_with_parameters():
"""Test source with different parameters"""
# Test with custom start date
source = my_source(start_date='2024-01-01')
# Verify configuration
users = list(source.resources['users'])
assert all(
user['created_at'] >= '2024-01-01'
for user in users
), "All users should be from 2024 onwards"
Integration Testing
Test with DuckDB
Use DuckDB as a fast, in-memory testing destination:import pytest
import dlt
from my_pipeline import my_source
@pytest.fixture
def test_pipeline():
"""Create a test pipeline with DuckDB"""
pipeline = dlt.pipeline(
pipeline_name='test_pipeline',
destination='duckdb',
dataset_name='test_data',
dev_mode=True
)
yield pipeline
# Cleanup
pipeline.drop()
def test_pipeline_run(test_pipeline):
"""Test complete pipeline execution"""
# Run pipeline
load_info = test_pipeline.run(my_source())
# Verify execution
assert load_info.has_failed_jobs is False
assert len(load_info.loads_ids) == 1
# Query loaded data
with test_pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) as cnt FROM users") as cursor:
count = cursor.fetchone()[0]
assert count > 0, "Should have loaded users"
Test Schema Evolution
Verify that schema changes are handled correctly:import dlt
import pytest
def test_schema_evolution():
"""Test that new columns are detected"""
pipeline = dlt.pipeline(
pipeline_name='schema_test',
destination='duckdb',
dataset_name='test_schema',
dev_mode=True
)
# Initial load
@dlt.resource
def data_v1():
yield [{'id': 1, 'name': 'Alice'}]
pipeline.run(data_v1())
# Load with new column
@dlt.resource
def data_v2():
yield [{'id': 2, 'name': 'Bob', 'email': '[email protected]'}]
load_info = pipeline.run(data_v2())
assert not load_info.has_failed_jobs
# Verify schema updated
schema = pipeline.default_schema
table = schema.get_table('data_v2')
assert 'email' in table['columns']
pipeline.drop()
Test Write Dispositions
Verify replace, append, and merge behaviors:import dlt
import pytest
def test_replace_disposition():
"""Test that replace overwrites existing data"""
pipeline = dlt.pipeline(
pipeline_name='replace_test',
destination='duckdb',
dataset_name='test_replace',
dev_mode=True
)
@dlt.resource(write_disposition='replace')
def items():
yield [{'id': 1}, {'id': 2}]
# First load
pipeline.run(items())
# Second load with different data
@dlt.resource(write_disposition='replace')
def new_items():
yield [{'id': 3}]
pipeline.run(new_items())
# Verify data was replaced
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM new_items") as cursor:
assert cursor.fetchone()[0] == 1
pipeline.drop()
def test_merge_disposition():
"""Test that merge updates records correctly"""
pipeline = dlt.pipeline(
pipeline_name='merge_test',
destination='duckdb',
dataset_name='test_merge',
dev_mode=True
)
@dlt.resource(
write_disposition='merge',
primary_key='id'
)
def users():
yield [
{'id': 1, 'name': 'Alice', 'version': 1},
{'id': 2, 'name': 'Bob', 'version': 1}
]
pipeline.run(users())
# Update existing record
@dlt.resource(
write_disposition='merge',
primary_key='id'
)
def updated_users():
yield [{'id': 1, 'name': 'Alice Updated', 'version': 2}]
pipeline.run(updated_users())
# Verify merge happened
with pipeline.sql_client() as client:
with client.execute_query(
"SELECT name, version FROM updated_users WHERE id = 1"
) as cursor:
row = cursor.fetchone()
assert row[0] == 'Alice Updated'
assert row[1] == 2
pipeline.drop()
Testing with Fixtures
Create reusable test fixtures:import pytest
import dlt
from dlt.common.utils import uniq_id
@pytest.fixture(scope="function")
def test_pipeline():
"""Create isolated test pipeline"""
pipeline = dlt.pipeline(
pipeline_name=f'test_{uniq_id()}',
destination='duckdb',
dataset_name=f'test_dataset_{uniq_id()}',
dev_mode=True
)
yield pipeline
# Cleanup
pipeline.drop()
@pytest.fixture
def sample_data():
"""Provide sample test data"""
return [
{'id': 1, 'name': 'Alice', 'email': '[email protected]'},
{'id': 2, 'name': 'Bob', 'email': '[email protected]'},
{'id': 3, 'name': 'Charlie', 'email': '[email protected]'},
]
def test_with_fixtures(test_pipeline, sample_data):
"""Test using fixtures"""
@dlt.resource
def users():
yield sample_data
load_info = test_pipeline.run(users())
assert not load_info.has_failed_jobs
# Verify count
with test_pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM users") as cursor:
assert cursor.fetchone()[0] == len(sample_data)
Mocking External APIs
Use mocks to test without hitting real APIs:import dlt
import pytest
from unittest.mock import patch, MagicMock
@dlt.resource
def api_users():
"""Resource that calls external API"""
import requests
response = requests.get('https://api.example.com/users')
yield response.json()
@patch('requests.get')
def test_api_users_with_mock(mock_get):
"""Test resource with mocked API"""
# Configure mock
mock_response = MagicMock()
mock_response.json.return_value = [
{'id': 1, 'name': 'Test User'}
]
mock_get.return_value = mock_response
# Test resource
result = list(api_users())
assert len(result) == 1
assert result[0]['name'] == 'Test User'
mock_get.assert_called_once_with('https://api.example.com/users')
Testing Error Handling
Verify that errors are handled gracefully:import dlt
import pytest
from dlt.pipeline.exceptions import PipelineStepFailed
@dlt.resource
def failing_resource():
"""Resource that raises an error"""
yield {'id': 1}
raise ValueError("Simulated error")
def test_error_handling():
"""Test pipeline handles errors correctly"""
pipeline = dlt.pipeline(
pipeline_name='error_test',
destination='duckdb',
dataset_name='test_errors',
dev_mode=True
)
with pytest.raises(PipelineStepFailed):
pipeline.run(failing_resource())
pipeline.drop()
Data Quality Tests
Validate data quality after loading:import dlt
import pytest
def test_data_quality(test_pipeline, sample_data):
"""Test data quality constraints"""
@dlt.resource
def users():
yield sample_data
test_pipeline.run(users())
with test_pipeline.sql_client() as client:
# Test: No null emails
with client.execute_query(
"SELECT COUNT(*) FROM users WHERE email IS NULL"
) as cursor:
assert cursor.fetchone()[0] == 0, "No users should have null email"
# Test: All IDs are unique
with client.execute_query(
"SELECT COUNT(DISTINCT id) as unique_ids, COUNT(*) as total FROM users"
) as cursor:
row = cursor.fetchone()
assert row[0] == row[1], "All IDs should be unique"
# Test: Email format
with client.execute_query(
"SELECT COUNT(*) FROM users WHERE email NOT LIKE '%@%'"
) as cursor:
assert cursor.fetchone()[0] == 0, "All emails should be valid"
Best Practices
Testing recommendations:
- Use
dev_mode=Truefor faster test execution - Test with DuckDB for speed (local, in-memory)
- Mock external API calls to avoid rate limits
- Create isolated test datasets with unique names
- Clean up test pipelines in fixtures
- Test both success and failure scenarios
- Validate data quality, not just pipeline execution
Common testing pitfalls:
- Testing against production destinations
- Not cleaning up test data
- Hardcoding test data instead of using fixtures
- Skipping error handling tests
- Not testing schema evolution
Running Tests
Run your tests with pytest:# Run all tests
pytest
# Run specific test file
pytest tests/test_pipeline.py
# Run with verbose output
pytest -v
# Run with coverage
pytest --cov=my_pipeline
# Run specific test
pytest tests/test_pipeline.py::test_users_resource_output