Skip to main content
Data quality is critical for reliable pipelines. Dagster provides asset checks, integration with testing frameworks, and patterns for validating your data.

Asset Checks

Asset checks verify data quality when assets are materialized.

Single Asset Check

Define a check for one quality dimension:
import pandas as pd
import dagster as dg

@dg.asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")

@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    
    return dg.AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
        metadata={
            "null_count": int(num_null_order_ids),
            "total_rows": len(orders_df),
        }
    )

Multiple Asset Checks

Check multiple quality dimensions efficiently:
from collections.abc import Iterable
import pandas as pd
import dagster as dg

@dg.asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")

@dg.multi_asset_check(
    specs=[
        dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
        dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
    ]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
    orders_df = pd.read_csv("orders.csv")
    
    # Check for null order_id values
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield dg.AssetCheckResult(
        check_name="orders_id_has_no_nulls",
        passed=bool(num_null_order_ids == 0),
        asset_key="orders",
        metadata={"null_count": int(num_null_order_ids)}
    )
    
    # Check for null item_id values
    num_null_item_ids = orders_df["item_id"].isna().sum()
    yield dg.AssetCheckResult(
        check_name="items_id_has_no_nulls",
        passed=bool(num_null_item_ids == 0),
        asset_key="orders",
        metadata={"null_count": int(num_null_item_ids)}
    )
Use multi_asset_check when multiple checks share expensive data loading. Load the data once and perform all validations.

Common Data Quality Checks

Null Checks

@dg.asset_check(asset=my_data)
def no_nulls_in_key_columns():
    df = pd.read_csv("my_data.csv")
    key_columns = ["user_id", "timestamp", "event_type"]
    
    null_counts = {col: df[col].isna().sum() for col in key_columns}
    total_nulls = sum(null_counts.values())
    
    return dg.AssetCheckResult(
        passed=total_nulls == 0,
        metadata={
            "null_counts_by_column": null_counts,
            "total_nulls": total_nulls,
        }
    )

Uniqueness Checks

@dg.asset_check(asset=users)
def user_ids_are_unique():
    df = pd.read_csv("users.csv")
    duplicates = df["user_id"].duplicated().sum()
    
    return dg.AssetCheckResult(
        passed=duplicates == 0,
        metadata={
            "duplicate_count": int(duplicates),
            "unique_count": df["user_id"].nunique(),
            "total_rows": len(df),
        }
    )

Range Checks

@dg.asset_check(asset=metrics)
def values_in_expected_range():
    df = pd.read_csv("metrics.csv")
    
    min_value = df["revenue"].min()
    max_value = df["revenue"].max()
    
    # Revenue should be positive and less than $1M
    in_range = (min_value >= 0) and (max_value <= 1_000_000)
    
    return dg.AssetCheckResult(
        passed=in_range,
        metadata={
            "min_value": float(min_value),
            "max_value": float(max_value),
            "expected_min": 0,
            "expected_max": 1_000_000,
        }
    )

Row Count Checks

@dg.asset_check(asset=daily_events)
def has_minimum_rows():
    df = pd.read_csv("daily_events.csv")
    min_expected = 1000
    
    return dg.AssetCheckResult(
        passed=len(df) >= min_expected,
        metadata={
            "row_count": len(df),
            "minimum_expected": min_expected,
        }
    )

Schema Checks

@dg.asset_check(asset=customer_data)
def schema_matches_expected():
    df = pd.read_csv("customer_data.csv")
    
    expected_columns = {"customer_id", "name", "email", "created_at"}
    actual_columns = set(df.columns)
    
    missing = expected_columns - actual_columns
    extra = actual_columns - expected_columns
    
    return dg.AssetCheckResult(
        passed=len(missing) == 0 and len(extra) == 0,
        metadata={
            "expected_columns": list(expected_columns),
            "actual_columns": list(actual_columns),
            "missing_columns": list(missing),
            "extra_columns": list(extra),
        }
    )

Freshness Checks

Ensure data is updated within expected timeframes:
import datetime
from dagster import asset, build_last_update_freshness_checks

@asset
def daily_metrics():
    return compute_daily_metrics()

@asset
def hourly_metrics():
    return compute_hourly_metrics()

# Automatically create freshness checks
freshness_checks = build_last_update_freshness_checks(
    assets=[daily_metrics, hourly_metrics],
    lower_bound_delta=datetime.timedelta(days=2),
)

defs = dg.Definitions(
    assets=[daily_metrics, hourly_metrics],
    asset_checks=freshness_checks,
)

Custom Freshness Check

from datetime import datetime, timedelta

@dg.asset_check(asset=real_time_data)
def data_is_fresh():
    df = pd.read_csv("real_time_data.csv")
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    
    latest_timestamp = df["timestamp"].max()
    age = datetime.now() - latest_timestamp
    max_age = timedelta(hours=1)
    
    return dg.AssetCheckResult(
        passed=age <= max_age,
        metadata={
            "latest_timestamp": latest_timestamp.isoformat(),
            "age_hours": age.total_seconds() / 3600,
            "max_age_hours": max_age.total_seconds() / 3600,
        }
    )

Great Expectations Integration

Integrate with Great Expectations for advanced data validation:
from dagster import Config, job, op
from dagster_ge.factory import ge_validation_op_factory
from pandas import read_csv

class GEOpConfig(Config):
    csv_path: str = "./data/succeed.csv"

@op
def read_in_datafile(config: GEOpConfig):
    return read_csv(config.csv_path)

@op
def process_payroll(df):
    return len(df)

@op
def postprocess_payroll(numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError("Validation failed")

# Create validation op from Great Expectations suite
payroll_expectations = ge_validation_op_factory(
    name="ge_validation_op",
    datasource_name="getest",
    data_connector_name="my_runtime_data_connector",
    data_asset_name="test_asset",
    suite_name="basic.warning",
    batch_identifiers={"foo": "bar"},
)

@job
def payroll_data():
    output_df = read_in_datafile()
    postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))

Blocking Downstream Assets

Prevent downstream assets from running when checks fail:
import dagster as dg
import pandas as pd

@dg.asset
def raw_data():
    return pd.DataFrame({"value": [1, 2, 3, None]})

@dg.asset_check(
    asset=raw_data,
    blocking=True,  # Block downstream if this fails
)
def raw_data_quality():
    df = raw_data()
    has_nulls = df["value"].isna().any()
    
    return dg.AssetCheckResult(
        passed=not has_nulls,
        description="Raw data should not contain nulls"
    )

@dg.asset(deps=[raw_data])
def processed_data():
    # Only runs if raw_data_quality passes
    df = pd.read_csv("raw_data.csv")
    return df.fillna(0)
Blocking checks prevent downstream assets from materializing when they fail. Use this carefully for critical quality gates.

Statistical Validation

Detect anomalies using statistical methods:
import numpy as np
import pandas as pd

@dg.asset_check(asset=daily_revenue)
def revenue_within_normal_range():
    df = pd.read_csv("daily_revenue.csv")
    
    # Calculate statistics
    mean = df["revenue"].mean()
    std = df["revenue"].std()
    
    # Check latest value against 3-sigma rule
    latest = df["revenue"].iloc[-1]
    lower_bound = mean - 3 * std
    upper_bound = mean + 3 * std
    
    in_range = lower_bound <= latest <= upper_bound
    
    return dg.AssetCheckResult(
        passed=in_range,
        metadata={
            "latest_revenue": float(latest),
            "mean_revenue": float(mean),
            "std_dev": float(std),
            "lower_bound": float(lower_bound),
            "upper_bound": float(upper_bound),
            "z_score": float((latest - mean) / std),
        }
    )

Check Factories

Generate checks programmatically:
from typing import List
import dagster as dg

def create_null_checks(asset_name: str, columns: List[str]):
    """Factory to create null checks for multiple columns."""
    checks = []
    
    for column in columns:
        @dg.asset_check(
            asset=asset_name,
            name=f"{column}_no_nulls"
        )
        def check_column_nulls():
            df = pd.read_csv(f"{asset_name}.csv")
            null_count = df[column].isna().sum()
            
            return dg.AssetCheckResult(
                passed=null_count == 0,
                metadata={"null_count": int(null_count)}
            )
        
        checks.append(check_column_nulls)
    
    return checks

# Generate checks for critical columns
orders_null_checks = create_null_checks(
    "orders",
    ["order_id", "customer_id", "order_date", "total_amount"]
)

Data Quality Metrics

Track quality over time:
from dagster import MaterializeResult, MetadataValue
import pandas as pd

@dg.asset
def customer_data() -> MaterializeResult:
    df = pd.read_csv("raw_customers.csv")
    
    # Calculate quality metrics
    total_rows = len(df)
    null_rows = df.isnull().any(axis=1).sum()
    duplicate_rows = df.duplicated().sum()
    
    # Clean the data
    cleaned_df = df.drop_duplicates().dropna()
    cleaned_df.to_csv("customer_data.csv", index=False)
    
    return MaterializeResult(
        metadata={
            "total_rows": total_rows,
            "rows_with_nulls": null_rows,
            "duplicate_rows": duplicate_rows,
            "rows_after_cleaning": len(cleaned_df),
            "data_quality_score": MetadataValue.float(
                (total_rows - null_rows - duplicate_rows) / total_rows
            ),
        }
    )
1
Identify critical data quality dimensions
2
Determine what makes your data valid (nulls, ranges, schema, etc.).
3
Define asset checks
4
Create checks for each quality dimension.
5
Add metadata
6
Include diagnostic information in check results.
7
Set up blocking checks
8
Use blocking checks for critical quality gates.
9
Monitor check results
10
Track check failures over time to identify trends.
11
Alert on failures
12
Use run status sensors to alert when checks fail.

Best Practices

Separate Validation from Transformation

# Good: Separate concerns
@dg.asset
def raw_data():
    return load_data()

@dg.asset_check(asset=raw_data)
def validate_raw_data():
    return check_quality()

@dg.asset
def cleaned_data(raw_data):
    return clean(raw_data)

Use Descriptive Check Names

# Good: Clear what is being checked
@dg.asset_check(asset=orders, name="order_total_is_positive")
def check_order_total():
    ...

# Bad: Unclear what is checked
@dg.asset_check(asset=orders, name="check1")
def check1():
    ...

Include Diagnostic Metadata

@dg.asset_check(asset=my_asset)
def comprehensive_check():
    df = load_data()
    
    return dg.AssetCheckResult(
        passed=is_valid(df),
        metadata={
            "rows_checked": len(df),
            "failures": get_failure_count(df),
            "sample_failures": get_sample_failures(df),
            "check_timestamp": datetime.now().isoformat(),
        }
    )

Test Your Checks

import pytest

def test_null_check_passes():
    # Create valid test data
    pd.DataFrame({"id": [1, 2, 3]}).to_csv("test_data.csv")
    result = check_no_nulls()
    assert result.passed

def test_null_check_fails():
    # Create invalid test data
    pd.DataFrame({"id": [1, None, 3]}).to_csv("test_data.csv")
    result = check_no_nulls()
    assert not result.passed

Integration with Schedules

Run checks on a schedule:
@dg.asset
def production_data():
    return load_production_data()

@dg.asset_check(asset=production_data)
def hourly_quality_check():
    return validate_production_data()

# Schedule checks to run hourly
quality_check_schedule = dg.ScheduleDefinition(
    name="hourly_quality_check",
    cron_schedule="0 * * * *",
    target=[production_data],
)

Next Steps

Build docs developers (and LLMs) love