Automation
Automation in Dagster allows you to declaratively specify when assets should be materialized without writing manual schedules or sensors. Using automation conditions , you can define rules that determine when assets need updates based on:
Changes to upstream dependencies
Time-based schedules
Custom business logic
Missing data
Code changes
Automation provides a higher-level abstraction than schedules and sensors, making it easier to build self-updating data pipelines.
Why Use Automation?
Traditional approaches require explicit scheduling:
# ❌ Manual scheduling - must coordinate multiple schedules
@schedule ( cron_schedule = "0 1 * * *" )
def schedule_asset_a ():
return RunRequest( asset_selection = [asset_a])
@schedule ( cron_schedule = "0 2 * * *" ) # Must run after asset_a
def schedule_asset_b ():
return RunRequest( asset_selection = [asset_b])
Automation conditions handle coordination automatically:
# ✅ Declarative automation - automatic coordination
@asset ( automation_condition = AutomationCondition.eager())
def asset_a ():
return load_data()
@asset ( automation_condition = AutomationCondition.eager())
def asset_b ( asset_a ): # Automatically runs after asset_a updates
return process(asset_a)
Core Automation Conditions
Eager Automation
The eager condition materializes assets as soon as their dependencies update:
import dagster as dg
@dg.asset (
deps = [ "upstream" ],
automation_condition = dg.AutomationCondition.eager(),
)
def eager_asset () -> None :
# Materializes immediately when 'upstream' is updated
...
Eager assets are ideal for:
Real-time data pipelines
Assets that should always reflect the latest data
Critical downstream dependencies
On Cron Automation
Schedule assets using cron expressions:
import dagster as dg
@dg.asset (
automation_condition = dg.AutomationCondition.on_cron( "0 9 * * *" ),
)
def daily_report ():
# Materializes every day at 9 AM
return generate_report()
Cron-based automation is useful for:
Daily, weekly, or monthly reports
Time-sensitive computations
Rate-limited external APIs
On Missing Automation
Materialize only when the asset doesn’t exist:
import dagster as dg
@dg.asset (
automation_condition = dg.AutomationCondition.on_missing(),
)
def one_time_asset ():
# Only materializes if not already materialized
return expensive_computation()
Useful for:
One-time migrations
Expensive seed data
Historical backfills
Custom Conditions
Combine conditions using logical operators:
import dagster as dg
@dg.asset (
automation_condition = (
# Run daily at 9 AM OR when upstream changes
dg.AutomationCondition.on_cron( "0 9 * * *" )
| dg.AutomationCondition.eager()
),
)
def flexible_asset ():
return compute_data()
Automation Condition Operators
Combine conditions using logical operators:
AND Operator
OR Operator
NOT Operator
import dagster as dg
@dg.asset (
automation_condition = (
# Must satisfy BOTH conditions
dg.AutomationCondition.eager()
& dg.AutomationCondition.on_cron( "0 9 * * *" )
),
)
def selective_asset ():
# Only runs if upstream updated AND it's 9 AM
return compute()
import dagster as dg
@dg.asset (
automation_condition = (
# Satisfies EITHER condition
dg.AutomationCondition.eager()
| dg.AutomationCondition.on_cron( "0 0 * * *" )
),
)
def opportunistic_asset ():
# Runs when upstream updates OR at midnight
return compute()
import dagster as dg
@dg.asset (
automation_condition = (
# Inverts the condition
~ dg.AutomationCondition.on_missing()
),
)
def always_refresh_asset ():
# Only runs if already materialized (inverse of on_missing)
return compute()
Code Version Tracking
Track when asset code changes using code_version:
import dagster as dg
@dg.asset (
automation_condition = dg.AutomationCondition.eager(),
code_version = "v2" , # Increment when logic changes
)
def versioned_asset ():
return compute_with_new_logic()
When code_version changes:
Dagster detects the asset is “stale”
Automation conditions can trigger rematerialization
Helps track which code produced each materialization
Working with Partitioned Assets
Automation conditions work with partitioned assets:
import dagster as dg
@dg.asset (
partitions_def = dg.DailyPartitionsDefinition( start_date = "2024-01-01" ),
automation_condition = dg.AutomationCondition.eager(),
)
def partitioned_asset ( context : dg.AssetExecutionContext):
# Each partition materializes eagerly when upstream updates
date = context.partition_key
return fetch_data(date)
@dg.asset (
partitions_def = dg.DailyPartitionsDefinition( start_date = "2024-01-01" ),
automation_condition = dg.AutomationCondition.on_cron( "0 2 * * *" ),
)
def scheduled_partitioned_asset ( context : dg.AssetExecutionContext):
# Latest partition materializes daily at 2 AM
date = context.partition_key
return process_data(date)
Asset Groups with Automation
Apply automation to entire asset groups:
import dagster as dg
@dg.asset ( group_name = "raw_data" )
def raw_users ():
return fetch_users()
@dg.asset ( group_name = "raw_data" )
def raw_orders ():
return fetch_orders()
@dg.asset (
group_name = "processed" ,
deps = [ "raw_users" , "raw_orders" ],
automation_condition = dg.AutomationCondition.eager(),
)
def combined_data ( raw_users , raw_orders ):
return merge(raw_users, raw_orders)
# All assets in 'processed' group update automatically
Automation with Dependencies
Allowing Missing Upstreams
Materialize even if some upstreams are missing:
import dagster as dg
@dg.asset (
automation_condition = (
dg.AutomationCondition.eager()
.allow_missing_upstream()
),
)
def tolerant_asset ( optional_upstream = None ):
# Runs even if optional_upstream isn't materialized
if optional_upstream:
return process(optional_upstream)
return default_value()
Allowing Updated Dependencies
Materialize when downstream dependencies have been updated:
import dagster as dg
@dg.asset (
automation_condition = (
dg.AutomationCondition.eager()
.allow_dependencies()
),
)
def inclusive_asset ():
# Considers both upstream and downstream changes
return compute()
Blocking Conditions
Prevent materialization based on asset checks:
import dagster as dg
@dg.asset
def validated_asset ():
return load_data()
@dg.asset_check ( asset = validated_asset)
def quality_check ():
# Validate data quality
data = load_data()
passed = validate(data)
return dg.AssetCheckResult( passed = passed)
@dg.asset (
deps = [validated_asset],
automation_condition = (
dg.AutomationCondition.eager()
.without_blocking_checks() # Only run if checks pass
),
)
def downstream_asset ( validated_asset ):
return process(validated_asset)
Testing Automation
Test automation conditions in unit tests:
import dagster as dg
@dg.asset ( automation_condition = dg.AutomationCondition.eager())
def my_asset ():
return [ 1 , 2 , 3 ]
def test_automation ():
# Materialize and verify automation
result = dg.materialize([my_asset])
assert result.success
# Check that automation condition is set
assert my_asset.automation_condition is not None
Observability
Automation Evaluation History
View why assets were or weren’t materialized:
In the UI : Navigate to the Asset Details page → Automation tab
View evaluation results : See which conditions were satisfied
Debug skipped runs : Understand why assets weren’t materialized
Automation Logs
Automation evaluations are logged:
import dagster as dg
@dg.asset (
automation_condition = dg.AutomationCondition.eager(),
)
def logged_asset ():
# Automation evaluations appear in the run logs
return compute_data()
Logs show:
When conditions were evaluated
Which conditions were true/false
Why runs were requested or skipped
Advanced Patterns
Time-Window Conditions
Materialize only during specific time windows:
import dagster as dg
from datetime import time
@dg.asset (
automation_condition = (
dg.AutomationCondition.eager()
& dg.AutomationCondition.on_cron( "0 9-17 * * 1-5" ) # Business hours
),
)
def business_hours_asset ():
# Only updates during 9 AM - 5 PM, Monday-Friday
return compute()
Multi-Condition Assets
Complex automation logic:
import dagster as dg
@dg.asset (
automation_condition = (
# Run in any of these scenarios:
dg.AutomationCondition.on_cron( "0 0 * * *" ) # Daily at midnight
| (
# OR when upstream updates during business hours
dg.AutomationCondition.eager()
& dg.AutomationCondition.on_cron( "0 9-17 * * 1-5" )
)
| dg.AutomationCondition.on_missing() # OR when missing
),
)
def sophisticated_asset ():
return compute_complex_logic()
Asset Check Integration
Use asset checks to control automation:
import dagster as dg
@dg.asset
def source_data ():
return load_external_data()
@dg.asset_check ( asset = source_data)
def freshness_check ():
data = load_external_data()
is_fresh = check_freshness(data)
return dg.AssetCheckResult( passed = is_fresh)
@dg.asset (
deps = [source_data],
automation_condition = (
dg.AutomationCondition.eager()
.with_blocking_checks([ "freshness_check" ])
),
)
def processed_data ( source_data ):
# Only runs if source_data passed freshness_check
return process(source_data)
Automation vs Schedules vs Sensors
You want declarative, condition-based execution
Assets should update based on dependency changes
You need automatic coordination between related assets
You want to express “when” logic without writing code
You need precise control over timing
Complex run configuration is required
You want explicit, imperative scheduling
Your schedule logic is very custom
You need to react to external events
You’re integrating with third-party systems
You need stateful polling logic
Timing is unpredictable
Enabling Automation
Automation requires the asset daemon to be running:
In Development
# The dagster dev command runs the daemon automatically
dagster dev
In Production
Ensure your deployment runs the daemon:
# Run the daemon separately
dagster-daemon run
For Dagster Cloud or Kubernetes deployments, the daemon runs automatically.
Monitoring Automation
Asset Health
The Dagster UI shows:
Automation status : Whether automation is enabled
Last evaluation : When conditions were last checked
Next expected run : When the asset will likely materialize
Evaluation history : Past decisions and reasons
Daemon Logs
View daemon logs to debug automation:
# View daemon logs
dagster-daemon logs
Logs show:
Automation evaluations
Requested runs
Skipped runs and reasons
Errors or warnings
Best Practices
Start with eager automation
For most assets, AutomationCondition.eager() provides the right behavior: update when dependencies change.
Use code versions to track changes
Increment code_version when you change asset logic. This helps track which code produced each materialization.
Combine cron with eager for flexibility
Use eager | on_cron to get both reactive updates and guaranteed periodic refreshes.
Test automation in development
Run dagster dev locally to see how automation behaves before deploying to production.
Ensure the asset daemon is running and healthy. Check daemon logs if automation isn’t working as expected.
Use asset checks for quality gates
Block downstream automation until upstream data passes quality checks using without_blocking_checks().
Migration from Schedules/Sensors
Convert existing schedules to automation:
# Before: Using schedules
@asset
def my_asset ():
return load_data()
@schedule ( target = [my_asset], cron_schedule = "0 0 * * *" )
def my_schedule ():
return RunRequest()
# After: Using automation
@asset (
automation_condition = AutomationCondition.on_cron( "0 0 * * *" )
)
def my_asset ():
return load_data()
Benefits of migration:
Simpler code (no separate schedule definition)
Better visibility in the UI
Automatic coordination with dependencies
Declarative condition composition
API Reference
@asset - Define assets with automation conditions
Partitions - Partition definitions for automation
Schedules - Alternative time-based automation