Asset Checks
Asset checks verify data quality when assets are materialized.Single Asset Check
Define a check for one quality dimension:import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
metadata={
"null_count": int(num_null_order_ids),
"total_rows": len(orders_df),
}
)
Multiple Asset Checks
Check multiple quality dimensions efficiently:from collections.abc import Iterable
import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")
# Check for null order_id values
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
metadata={"null_count": int(num_null_order_ids)}
)
# Check for null item_id values
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
metadata={"null_count": int(num_null_item_ids)}
)
Use
multi_asset_check when multiple checks share expensive data loading. Load the data once and perform all validations.Common Data Quality Checks
Null Checks
@dg.asset_check(asset=my_data)
def no_nulls_in_key_columns():
df = pd.read_csv("my_data.csv")
key_columns = ["user_id", "timestamp", "event_type"]
null_counts = {col: df[col].isna().sum() for col in key_columns}
total_nulls = sum(null_counts.values())
return dg.AssetCheckResult(
passed=total_nulls == 0,
metadata={
"null_counts_by_column": null_counts,
"total_nulls": total_nulls,
}
)
Uniqueness Checks
@dg.asset_check(asset=users)
def user_ids_are_unique():
df = pd.read_csv("users.csv")
duplicates = df["user_id"].duplicated().sum()
return dg.AssetCheckResult(
passed=duplicates == 0,
metadata={
"duplicate_count": int(duplicates),
"unique_count": df["user_id"].nunique(),
"total_rows": len(df),
}
)
Range Checks
@dg.asset_check(asset=metrics)
def values_in_expected_range():
df = pd.read_csv("metrics.csv")
min_value = df["revenue"].min()
max_value = df["revenue"].max()
# Revenue should be positive and less than $1M
in_range = (min_value >= 0) and (max_value <= 1_000_000)
return dg.AssetCheckResult(
passed=in_range,
metadata={
"min_value": float(min_value),
"max_value": float(max_value),
"expected_min": 0,
"expected_max": 1_000_000,
}
)
Row Count Checks
@dg.asset_check(asset=daily_events)
def has_minimum_rows():
df = pd.read_csv("daily_events.csv")
min_expected = 1000
return dg.AssetCheckResult(
passed=len(df) >= min_expected,
metadata={
"row_count": len(df),
"minimum_expected": min_expected,
}
)
Schema Checks
@dg.asset_check(asset=customer_data)
def schema_matches_expected():
df = pd.read_csv("customer_data.csv")
expected_columns = {"customer_id", "name", "email", "created_at"}
actual_columns = set(df.columns)
missing = expected_columns - actual_columns
extra = actual_columns - expected_columns
return dg.AssetCheckResult(
passed=len(missing) == 0 and len(extra) == 0,
metadata={
"expected_columns": list(expected_columns),
"actual_columns": list(actual_columns),
"missing_columns": list(missing),
"extra_columns": list(extra),
}
)
Freshness Checks
Ensure data is updated within expected timeframes:import datetime
from dagster import asset, build_last_update_freshness_checks
@asset
def daily_metrics():
return compute_daily_metrics()
@asset
def hourly_metrics():
return compute_hourly_metrics()
# Automatically create freshness checks
freshness_checks = build_last_update_freshness_checks(
assets=[daily_metrics, hourly_metrics],
lower_bound_delta=datetime.timedelta(days=2),
)
defs = dg.Definitions(
assets=[daily_metrics, hourly_metrics],
asset_checks=freshness_checks,
)
Custom Freshness Check
from datetime import datetime, timedelta
@dg.asset_check(asset=real_time_data)
def data_is_fresh():
df = pd.read_csv("real_time_data.csv")
df["timestamp"] = pd.to_datetime(df["timestamp"])
latest_timestamp = df["timestamp"].max()
age = datetime.now() - latest_timestamp
max_age = timedelta(hours=1)
return dg.AssetCheckResult(
passed=age <= max_age,
metadata={
"latest_timestamp": latest_timestamp.isoformat(),
"age_hours": age.total_seconds() / 3600,
"max_age_hours": max_age.total_seconds() / 3600,
}
)
Great Expectations Integration
Integrate with Great Expectations for advanced data validation:from dagster import Config, job, op
from dagster_ge.factory import ge_validation_op_factory
from pandas import read_csv
class GEOpConfig(Config):
csv_path: str = "./data/succeed.csv"
@op
def read_in_datafile(config: GEOpConfig):
return read_csv(config.csv_path)
@op
def process_payroll(df):
return len(df)
@op
def postprocess_payroll(numrows, expectation):
if expectation["success"]:
return numrows
else:
raise ValueError("Validation failed")
# Create validation op from Great Expectations suite
payroll_expectations = ge_validation_op_factory(
name="ge_validation_op",
datasource_name="getest",
data_connector_name="my_runtime_data_connector",
data_asset_name="test_asset",
suite_name="basic.warning",
batch_identifiers={"foo": "bar"},
)
@job
def payroll_data():
output_df = read_in_datafile()
postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))
Blocking Downstream Assets
Prevent downstream assets from running when checks fail:import dagster as dg
import pandas as pd
@dg.asset
def raw_data():
return pd.DataFrame({"value": [1, 2, 3, None]})
@dg.asset_check(
asset=raw_data,
blocking=True, # Block downstream if this fails
)
def raw_data_quality():
df = raw_data()
has_nulls = df["value"].isna().any()
return dg.AssetCheckResult(
passed=not has_nulls,
description="Raw data should not contain nulls"
)
@dg.asset(deps=[raw_data])
def processed_data():
# Only runs if raw_data_quality passes
df = pd.read_csv("raw_data.csv")
return df.fillna(0)
Blocking checks prevent downstream assets from materializing when they fail. Use this carefully for critical quality gates.
Statistical Validation
Detect anomalies using statistical methods:import numpy as np
import pandas as pd
@dg.asset_check(asset=daily_revenue)
def revenue_within_normal_range():
df = pd.read_csv("daily_revenue.csv")
# Calculate statistics
mean = df["revenue"].mean()
std = df["revenue"].std()
# Check latest value against 3-sigma rule
latest = df["revenue"].iloc[-1]
lower_bound = mean - 3 * std
upper_bound = mean + 3 * std
in_range = lower_bound <= latest <= upper_bound
return dg.AssetCheckResult(
passed=in_range,
metadata={
"latest_revenue": float(latest),
"mean_revenue": float(mean),
"std_dev": float(std),
"lower_bound": float(lower_bound),
"upper_bound": float(upper_bound),
"z_score": float((latest - mean) / std),
}
)
Check Factories
Generate checks programmatically:from typing import List
import dagster as dg
def create_null_checks(asset_name: str, columns: List[str]):
"""Factory to create null checks for multiple columns."""
checks = []
for column in columns:
@dg.asset_check(
asset=asset_name,
name=f"{column}_no_nulls"
)
def check_column_nulls():
df = pd.read_csv(f"{asset_name}.csv")
null_count = df[column].isna().sum()
return dg.AssetCheckResult(
passed=null_count == 0,
metadata={"null_count": int(null_count)}
)
checks.append(check_column_nulls)
return checks
# Generate checks for critical columns
orders_null_checks = create_null_checks(
"orders",
["order_id", "customer_id", "order_date", "total_amount"]
)
Data Quality Metrics
Track quality over time:from dagster import MaterializeResult, MetadataValue
import pandas as pd
@dg.asset
def customer_data() -> MaterializeResult:
df = pd.read_csv("raw_customers.csv")
# Calculate quality metrics
total_rows = len(df)
null_rows = df.isnull().any(axis=1).sum()
duplicate_rows = df.duplicated().sum()
# Clean the data
cleaned_df = df.drop_duplicates().dropna()
cleaned_df.to_csv("customer_data.csv", index=False)
return MaterializeResult(
metadata={
"total_rows": total_rows,
"rows_with_nulls": null_rows,
"duplicate_rows": duplicate_rows,
"rows_after_cleaning": len(cleaned_df),
"data_quality_score": MetadataValue.float(
(total_rows - null_rows - duplicate_rows) / total_rows
),
}
)
Best Practices
Separate Validation from Transformation
# Good: Separate concerns
@dg.asset
def raw_data():
return load_data()
@dg.asset_check(asset=raw_data)
def validate_raw_data():
return check_quality()
@dg.asset
def cleaned_data(raw_data):
return clean(raw_data)
Use Descriptive Check Names
# Good: Clear what is being checked
@dg.asset_check(asset=orders, name="order_total_is_positive")
def check_order_total():
...
# Bad: Unclear what is checked
@dg.asset_check(asset=orders, name="check1")
def check1():
...
Include Diagnostic Metadata
@dg.asset_check(asset=my_asset)
def comprehensive_check():
df = load_data()
return dg.AssetCheckResult(
passed=is_valid(df),
metadata={
"rows_checked": len(df),
"failures": get_failure_count(df),
"sample_failures": get_sample_failures(df),
"check_timestamp": datetime.now().isoformat(),
}
)
Test Your Checks
import pytest
def test_null_check_passes():
# Create valid test data
pd.DataFrame({"id": [1, 2, 3]}).to_csv("test_data.csv")
result = check_no_nulls()
assert result.passed
def test_null_check_fails():
# Create invalid test data
pd.DataFrame({"id": [1, None, 3]}).to_csv("test_data.csv")
result = check_no_nulls()
assert not result.passed
Integration with Schedules
Run checks on a schedule:@dg.asset
def production_data():
return load_production_data()
@dg.asset_check(asset=production_data)
def hourly_quality_check():
return validate_production_data()
# Schedule checks to run hourly
quality_check_schedule = dg.ScheduleDefinition(
name="hourly_quality_check",
cron_schedule="0 * * * *",
target=[production_data],
)
Next Steps
- Set up Observability to monitor check results
- Configure Deployment for production quality checks
- Add Integrations with testing frameworks
- Write Tests for your check logic
