Testing is essential for building reliable data pipelines. Dagster provides tools and patterns for testing assets, ops, resources, and entire pipelines.
Unit Testing Assets
Assets are just Python functions, so you can test them directly:
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
return orders_df
# Test the asset
def test_orders():
result = orders()
assert len(result) == 2
assert "order_id" in result.columns
assert "item_id" in result.columns
Testing Assets with Dependencies
For assets with dependencies, pass the upstream values as function arguments:
import pandas as pd
import dagster as dg
@dg.asset
def upstream_data():
return pd.DataFrame({"value": [1, 2, 3]})
@dg.asset
def downstream_data(upstream_data: pd.DataFrame):
return upstream_data["value"].sum()
# Test with mock data
def test_downstream_data():
mock_upstream = pd.DataFrame({"value": [10, 20, 30]})
result = downstream_data(mock_upstream)
assert result == 60
Testing Assets with Resources
Mock resources to avoid external dependencies during testing:
from dagster_aws.s3 import S3FileHandle, S3FileManager
import dagster as dg
from unittest import mock
@dg.asset
def loaded_file(file_manager: S3FileManager) -> str:
return file_manager.read_data(S3FileHandle("bucket", "path.txt"))
# Test with mocked resource
def test_file() -> None:
mocked_resource = mock.Mock(spec=S3FileManager)
mocked_resource.read_data.return_value = "contents"
assert loaded_file(mocked_resource) == "contents"
assert mocked_resource.read_data.called_once_with(
S3FileHandle("bucket", "path.txt")
)
Use unittest.mock or pytest-mock to create mocked resources that match the interface of production resources without making actual external calls.
Testing Assets with Context
When your asset uses AssetExecutionContext, you need to provide a test context:
from dagster import AssetExecutionContext, asset, build_asset_context
import pandas as pd
@asset
def partitioned_asset(context: AssetExecutionContext) -> None:
partition_key = context.partition_key
df = pd.DataFrame({"date": [partition_key], "value": [100]})
context.log.info(f"Processing partition {partition_key}")
df.to_csv(f"data_{partition_key}.csv")
# Test with build_asset_context
def test_partitioned_asset():
context = build_asset_context(partition_key="2024-01-01")
partitioned_asset(context)
# Verify the file was created
df = pd.read_csv("data_2024-01-01.csv")
assert len(df) == 1
assert df["date"][0] == "2024-01-01"
Testing with Definitions
Test that your definitions load correctly:
from src.quickstart_etl.definitions import defs
def test_def_can_load():
assert defs.get_job_def("all_assets_job")
This simple test ensures your code can be imported and all definitions are valid.
Testing Asset Materialization
Use materialize to test the full execution path:
from dagster import materialize, asset
import pandas as pd
@asset
def orders():
return pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
@asset
def order_count(orders: pd.DataFrame) -> int:
return len(orders)
def test_asset_materialization():
result = materialize([orders, order_count])
assert result.success
# Access materialized values
order_count_value = result.output_for_node("order_count")
assert order_count_value == 2
Testing Asset Checks
Test your data quality checks independently:
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)
# Test the check
def test_orders_check_passes():
# First materialize the asset
orders()
# Then run the check
result = orders_id_has_no_nulls()
assert result.passed
def test_orders_check_fails_with_nulls():
# Create data with nulls
orders_df = pd.DataFrame({"order_id": [1, None], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
result = orders_id_has_no_nulls()
assert not result.passed
Integration Testing
Test multiple assets together to verify integration:
from dagster import materialize, DagsterInstance
import tempfile
import os
def test_pipeline_integration():
# Use a temporary directory for test outputs
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
# Materialize the full pipeline
result = materialize(
[topstory_ids, topstories, most_frequent_words],
)
assert result.success
# Verify outputs exist
assert os.path.exists("data/topstory_ids.json")
assert os.path.exists("data/topstories.csv")
assert os.path.exists("data/most_frequent_words.json")
Testing with Custom Resources
Provide test-specific resource configurations:
import dagster as dg
from dagster import Definitions, materialize
class DatabaseResource(dg.ConfigurableResource):
connection_string: str
@dg.asset
def users(database: DatabaseResource):
# Use database connection
return database.query("SELECT * FROM users")
def test_users_asset():
# Use a test database
test_resources = {
"database": DatabaseResource(
connection_string="sqlite:///:memory:"
)
}
result = materialize(
[users],
resources=test_resources,
)
assert result.success
Testing Ops
Ops can be tested similarly to assets:
from dagster import op, job, In, Out
@op(ins={"df": In()}, out=Out())
def process_payroll(df):
return len(df)
def test_process_payroll():
import pandas as pd
test_df = pd.DataFrame({"employee_id": [1, 2, 3]})
result = process_payroll(test_df)
assert result == 3
Testing Schedules and Sensors
Test that schedules and sensors produce the expected run requests:
import dagster as dg
from dagster import build_sensor_context, validate_run_config
@dg.sensor(job=my_job, minimum_interval_seconds=5)
def new_file_sensor():
new_files = check_for_new_files()
if new_files:
for filename in new_files:
yield dg.RunRequest(run_key=filename)
else:
yield dg.SkipReason("No new files found")
def test_sensor_yields_run_requests():
# Mock the file check to return files
with mock.patch('check_for_new_files', return_value=['file1.csv']):
context = build_sensor_context()
run_requests = list(new_file_sensor(context))
assert len(run_requests) == 1
assert run_requests[0].run_key == 'file1.csv'
def test_sensor_skips_when_no_files():
with mock.patch('check_for_new_files', return_value=[]):
context = build_sensor_context()
results = list(new_file_sensor(context))
assert len(results) == 1
assert isinstance(results[0], dg.SkipReason)
Write unit tests for individual assets
Test assets as pure functions with mocked dependencies.
Add integration tests for pipelines
Test multiple assets together to verify end-to-end behavior.
Use test doubles for databases, APIs, and file systems.
Verify behavior with empty data, null values, and error conditions.
Create reusable test fixtures for common setup and teardown.
Best Practices
Separate Test Data
Keep test data in dedicated fixtures or files:
import pytest
import pandas as pd
@pytest.fixture
def sample_orders():
return pd.DataFrame({
"order_id": [1, 2, 3],
"item_id": [100, 200, 300],
"quantity": [1, 2, 1]
})
def test_order_processing(sample_orders):
result = process_orders(sample_orders)
assert result["total_quantity"] == 4
Use Temporary Directories
Avoid polluting your working directory with test outputs:
import tempfile
import os
def test_file_output():
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
# Run your test
create_output_file()
assert os.path.exists("output.csv")
# tmpdir is automatically cleaned up
Test for Failures
Verify that your code fails appropriately:
import pytest
def test_invalid_input_raises_error():
with pytest.raises(ValueError, match="Invalid order ID"):
process_order(order_id=-1)
Parametrize Tests
Test multiple scenarios efficiently:
import pytest
@pytest.mark.parametrize("input_val,expected", [
(10, 20),
(5, 10),
(0, 0),
])
def test_double_value(input_val, expected):
assert double(input_val) == expected
Avoid testing implementation details. Focus on the public interface and expected behavior. If you refactor the internal implementation, your tests should still pass.
Continuous Integration
Run tests in CI to catch issues early:
# .github/workflows/test.yml
name: Test
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- run: pip install -e ".[dev]"
- run: pytest tests/ -v
Next Steps