Skip to main content

Testing dlt Pipelines

Write comprehensive tests for your dlt pipelines using pytest and dlt’s testing utilities. Ensure data quality and pipeline reliability.

Testing Philosophy

dlt pipelines are Python code and should be tested like any other software:
  • Unit tests for individual sources and resources
  • Integration tests for end-to-end pipeline execution
  • Data quality tests for schema validation and data integrity

Unit Testing Sources and Resources

Test Resource Output

Verify that resources produce expected data:
import pytest
import dlt
from my_pipeline import users_resource

def test_users_resource_output():
    """Test that users resource yields correct data"""
    # Materialize the resource
    result = list(users_resource())
    
    # Assertions
    assert len(result) > 0, "Resource should yield data"
    assert all('user_id' in user for user in result), "All users should have user_id"
    assert all('email' in user for user in result), "All users should have email"
    
    # Check data types
    first_user = result[0]
    assert isinstance(first_user['user_id'], int)
    assert isinstance(first_user['email'], str)

Test Transformers

Test that transformers correctly process data:
import dlt

@dlt.resource
def raw_users():
    yield [{'id': 1, 'status': 'active'}, {'id': 2, 'status': 'inactive'}]

@dlt.transformer(data_from=raw_users)
def active_users(user):
    if user['status'] == 'active':
        yield user

def test_active_users_transformer():
    """Test that transformer filters correctly"""
    result = list(active_users)
    
    assert len(result) == 1, "Should only yield active users"
    assert result[0]['id'] == 1
    assert result[0]['status'] == 'active'

Test Sources

Test complete source configurations:
import dlt
from my_pipeline import my_source

def test_source_configuration():
    """Test source returns correct resources"""
    source = my_source()
    
    # Check resources exist
    assert 'users' in source.resources
    assert 'orders' in source.resources
    
    # Check resources are selected
    assert source.resources['users'].selected
    assert source.resources['orders'].selected

def test_source_with_parameters():
    """Test source with different parameters"""
    # Test with custom start date
    source = my_source(start_date='2024-01-01')
    
    # Verify configuration
    users = list(source.resources['users'])
    assert all(
        user['created_at'] >= '2024-01-01'
        for user in users
    ), "All users should be from 2024 onwards"

Integration Testing

Test with DuckDB

Use DuckDB as a fast, in-memory testing destination:
import pytest
import dlt
from my_pipeline import my_source

@pytest.fixture
def test_pipeline():
    """Create a test pipeline with DuckDB"""
    pipeline = dlt.pipeline(
        pipeline_name='test_pipeline',
        destination='duckdb',
        dataset_name='test_data',
        dev_mode=True
    )
    yield pipeline
    # Cleanup
    pipeline.drop()

def test_pipeline_run(test_pipeline):
    """Test complete pipeline execution"""
    # Run pipeline
    load_info = test_pipeline.run(my_source())
    
    # Verify execution
    assert load_info.has_failed_jobs is False
    assert len(load_info.loads_ids) == 1
    
    # Query loaded data
    with test_pipeline.sql_client() as client:
        with client.execute_query("SELECT COUNT(*) as cnt FROM users") as cursor:
            count = cursor.fetchone()[0]
            assert count > 0, "Should have loaded users"

Test Schema Evolution

Verify that schema changes are handled correctly:
import dlt
import pytest

def test_schema_evolution():
    """Test that new columns are detected"""
    pipeline = dlt.pipeline(
        pipeline_name='schema_test',
        destination='duckdb',
        dataset_name='test_schema',
        dev_mode=True
    )
    
    # Initial load
    @dlt.resource
    def data_v1():
        yield [{'id': 1, 'name': 'Alice'}]
    
    pipeline.run(data_v1())
    
    # Load with new column
    @dlt.resource
    def data_v2():
        yield [{'id': 2, 'name': 'Bob', 'email': '[email protected]'}]
    
    load_info = pipeline.run(data_v2())
    assert not load_info.has_failed_jobs
    
    # Verify schema updated
    schema = pipeline.default_schema
    table = schema.get_table('data_v2')
    assert 'email' in table['columns']
    
    pipeline.drop()

Test Write Dispositions

Verify replace, append, and merge behaviors:
import dlt
import pytest

def test_replace_disposition():
    """Test that replace overwrites existing data"""
    pipeline = dlt.pipeline(
        pipeline_name='replace_test',
        destination='duckdb',
        dataset_name='test_replace',
        dev_mode=True
    )
    
    @dlt.resource(write_disposition='replace')
    def items():
        yield [{'id': 1}, {'id': 2}]
    
    # First load
    pipeline.run(items())
    
    # Second load with different data
    @dlt.resource(write_disposition='replace')
    def new_items():
        yield [{'id': 3}]
    
    pipeline.run(new_items())
    
    # Verify data was replaced
    with pipeline.sql_client() as client:
        with client.execute_query("SELECT COUNT(*) FROM new_items") as cursor:
            assert cursor.fetchone()[0] == 1
    
    pipeline.drop()

def test_merge_disposition():
    """Test that merge updates records correctly"""
    pipeline = dlt.pipeline(
        pipeline_name='merge_test',
        destination='duckdb',
        dataset_name='test_merge',
        dev_mode=True
    )
    
    @dlt.resource(
        write_disposition='merge',
        primary_key='id'
    )
    def users():
        yield [
            {'id': 1, 'name': 'Alice', 'version': 1},
            {'id': 2, 'name': 'Bob', 'version': 1}
        ]
    
    pipeline.run(users())
    
    # Update existing record
    @dlt.resource(
        write_disposition='merge',
        primary_key='id'
    )
    def updated_users():
        yield [{'id': 1, 'name': 'Alice Updated', 'version': 2}]
    
    pipeline.run(updated_users())
    
    # Verify merge happened
    with pipeline.sql_client() as client:
        with client.execute_query(
            "SELECT name, version FROM updated_users WHERE id = 1"
        ) as cursor:
            row = cursor.fetchone()
            assert row[0] == 'Alice Updated'
            assert row[1] == 2
    
    pipeline.drop()

Testing with Fixtures

Create reusable test fixtures:
import pytest
import dlt
from dlt.common.utils import uniq_id

@pytest.fixture(scope="function")
def test_pipeline():
    """Create isolated test pipeline"""
    pipeline = dlt.pipeline(
        pipeline_name=f'test_{uniq_id()}',
        destination='duckdb',
        dataset_name=f'test_dataset_{uniq_id()}',
        dev_mode=True
    )
    yield pipeline
    # Cleanup
    pipeline.drop()

@pytest.fixture
def sample_data():
    """Provide sample test data"""
    return [
        {'id': 1, 'name': 'Alice', 'email': '[email protected]'},
        {'id': 2, 'name': 'Bob', 'email': '[email protected]'},
        {'id': 3, 'name': 'Charlie', 'email': '[email protected]'},
    ]

def test_with_fixtures(test_pipeline, sample_data):
    """Test using fixtures"""
    @dlt.resource
    def users():
        yield sample_data
    
    load_info = test_pipeline.run(users())
    assert not load_info.has_failed_jobs
    
    # Verify count
    with test_pipeline.sql_client() as client:
        with client.execute_query("SELECT COUNT(*) FROM users") as cursor:
            assert cursor.fetchone()[0] == len(sample_data)

Mocking External APIs

Use mocks to test without hitting real APIs:
import dlt
import pytest
from unittest.mock import patch, MagicMock

@dlt.resource
def api_users():
    """Resource that calls external API"""
    import requests
    response = requests.get('https://api.example.com/users')
    yield response.json()

@patch('requests.get')
def test_api_users_with_mock(mock_get):
    """Test resource with mocked API"""
    # Configure mock
    mock_response = MagicMock()
    mock_response.json.return_value = [
        {'id': 1, 'name': 'Test User'}
    ]
    mock_get.return_value = mock_response
    
    # Test resource
    result = list(api_users())
    
    assert len(result) == 1
    assert result[0]['name'] == 'Test User'
    mock_get.assert_called_once_with('https://api.example.com/users')

Testing Error Handling

Verify that errors are handled gracefully:
import dlt
import pytest
from dlt.pipeline.exceptions import PipelineStepFailed

@dlt.resource
def failing_resource():
    """Resource that raises an error"""
    yield {'id': 1}
    raise ValueError("Simulated error")

def test_error_handling():
    """Test pipeline handles errors correctly"""
    pipeline = dlt.pipeline(
        pipeline_name='error_test',
        destination='duckdb',
        dataset_name='test_errors',
        dev_mode=True
    )
    
    with pytest.raises(PipelineStepFailed):
        pipeline.run(failing_resource())
    
    pipeline.drop()

Data Quality Tests

Validate data quality after loading:
import dlt
import pytest

def test_data_quality(test_pipeline, sample_data):
    """Test data quality constraints"""
    @dlt.resource
    def users():
        yield sample_data
    
    test_pipeline.run(users())
    
    with test_pipeline.sql_client() as client:
        # Test: No null emails
        with client.execute_query(
            "SELECT COUNT(*) FROM users WHERE email IS NULL"
        ) as cursor:
            assert cursor.fetchone()[0] == 0, "No users should have null email"
        
        # Test: All IDs are unique
        with client.execute_query(
            "SELECT COUNT(DISTINCT id) as unique_ids, COUNT(*) as total FROM users"
        ) as cursor:
            row = cursor.fetchone()
            assert row[0] == row[1], "All IDs should be unique"
        
        # Test: Email format
        with client.execute_query(
            "SELECT COUNT(*) FROM users WHERE email NOT LIKE '%@%'"
        ) as cursor:
            assert cursor.fetchone()[0] == 0, "All emails should be valid"

Best Practices

Testing recommendations:
  • Use dev_mode=True for faster test execution
  • Test with DuckDB for speed (local, in-memory)
  • Mock external API calls to avoid rate limits
  • Create isolated test datasets with unique names
  • Clean up test pipelines in fixtures
  • Test both success and failure scenarios
  • Validate data quality, not just pipeline execution
Common testing pitfalls:
  • Testing against production destinations
  • Not cleaning up test data
  • Hardcoding test data instead of using fixtures
  • Skipping error handling tests
  • Not testing schema evolution

Running Tests

Run your tests with pytest:
# Run all tests
pytest

# Run specific test file
pytest tests/test_pipeline.py

# Run with verbose output
pytest -v

# Run with coverage
pytest --cov=my_pipeline

# Run specific test
pytest tests/test_pipeline.py::test_users_resource_output

Build docs developers (and LLMs) love