Schedules & Sensors
Schedules and sensors enable automatic execution of your Dagster pipelines:
Schedules : Trigger runs on a fixed time-based cadence (e.g., daily at 9 AM)
Sensors : Trigger runs in response to events (e.g., new files, external system changes)
Both provide declarative ways to automate pipeline execution without manual intervention.
Schedules
Schedules trigger runs based on cron expressions or simple time intervals:
Basic Schedule
Define a schedule using ScheduleDefinition:
import dagster as dg
basic_schedule = dg.ScheduleDefinition(
name = "basic_schedule" ,
cron_schedule = "0 0 * * *" , # Every day at midnight
target = "*" , # Materialize all assets
)
Schedule Decorator
For dynamic run configuration, use the @schedule decorator:
import dagster as dg
class AssetConfig ( dg . Config ):
scheduled_date: str
@dg.asset
def configurable_asset ( context : dg.AssetExecutionContext, config : AssetConfig):
context.log.info( f "Processing data for { config.scheduled_date } " )
return fetch_data(config.scheduled_date)
@dg.schedule ( target = configurable_asset, cron_schedule = "0 0 * * *" )
def configurable_job_schedule ( context : dg.ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime( "%Y-%m- %d " )
return dg.RunRequest(
run_key = None ,
run_config = {
"ops" : {
"configurable_asset" : {
"config" : { "scheduled_date" : scheduled_date}
}
}
},
tags = { "date" : scheduled_date},
)
Cron Syntax
Schedules use standard cron syntax:
import dagster as dg
# Every day at 9 AM
daily_schedule = dg.ScheduleDefinition(
name = "daily_9am" ,
cron_schedule = "0 9 * * *" ,
target = "*" ,
)
# Every Monday at 8 AM
weekly_schedule = dg.ScheduleDefinition(
name = "weekly_monday" ,
cron_schedule = "0 8 * * 1" ,
target = "*" ,
)
# Every hour
hourly_schedule = dg.ScheduleDefinition(
name = "hourly" ,
cron_schedule = "0 * * * *" ,
target = "*" ,
)
# Every 15 minutes
frequent_schedule = dg.ScheduleDefinition(
name = "every_15_min" ,
cron_schedule = "*/15 * * * *" ,
target = "*" ,
)
Cron format: minute hour day_of_month month day_of_week
* means “every”
*/n means “every n”
0 9 * * * = 9:00 AM every day
0 0 * * 1 = midnight every Monday
Multiple Cron Schedules
Run on the union of multiple cron expressions:
import dagster as dg
# Run at 11:45 PM Saturday AND 9:30 AM Sunday
weekend_schedule = dg.ScheduleDefinition(
name = "weekend_schedule" ,
cron_schedule = [ "45 23 * * 6" , "30 9 * * 0" ],
target = "*" ,
)
Timezone Configuration
Specify the timezone for schedule execution:
import dagster as dg
my_timezone_schedule = dg.ScheduleDefinition(
name = "my_timezone_schedule" ,
target = "*" ,
cron_schedule = "0 9 * * *" ,
execution_timezone = "America/Los_Angeles" ,
)
Targeting Assets
Schedule specific assets or asset selections:
import dagster as dg
# Schedule a specific asset group
basic_asset_schedule = dg.ScheduleDefinition(
name = "basic_asset_schedule" ,
cron_schedule = "0 0 * * *" ,
target = dg.AssetSelection.groups( "some_asset_group" ),
)
# Schedule specific assets
specific_assets_schedule = dg.ScheduleDefinition(
name = "specific_assets" ,
cron_schedule = "0 6 * * *" ,
target = dg.AssetSelection.assets( "asset1" , "asset2" ),
)
Default Status
Schedules can start running immediately:
import dagster as dg
my_running_schedule = dg.ScheduleDefinition(
name = "my_running_schedule" ,
target = "*" ,
cron_schedule = "0 9 * * *" ,
default_status = dg.DefaultScheduleStatus. RUNNING ,
)
By default, schedules are created in a STOPPED state. Set default_status=DefaultScheduleStatus.RUNNING to activate them automatically when deployed.
Schedule Evaluation Context
Access runtime information in schedule functions:
import dagster as dg
@dg.schedule ( target = "*" , cron_schedule = "0 0 * * *" )
def context_aware_schedule ( context : dg.ScheduleEvaluationContext):
context.log.info( f "Scheduled time: { context.scheduled_execution_time } " )
context.log.info( f "Cursor: { context.cursor } " )
# Skip execution conditionally
if should_skip(context.scheduled_execution_time):
return dg.SkipReason( "Skipping due to business logic" )
return dg.RunRequest( run_key = None )
Logging in Schedules
import dagster as dg
@dg.schedule ( target = "*" , cron_schedule = "* * * * *" )
def logs_then_skips ( context ):
context.log.info( "Logging from a dg.schedule!" )
return dg.SkipReason( "Nothing to do" )
Sensors
Sensors check for events and trigger runs reactively:
Basic Sensor
Define a sensor using the @sensor decorator:
import os
import dagster as dg
from dagster import sensor, RunRequest, RunConfig
class FileConfig ( dg . Config ):
filename: str
@dg.op
def process_file ( context : dg.OpExecutionContext, config : FileConfig):
context.log.info( f "Processing { config.filename } " )
@dg.job
def log_file_job ():
process_file()
MY_DIRECTORY = "./data"
@sensor ( target = log_file_job)
def my_directory_sensor ():
for filename in os.listdir( MY_DIRECTORY ):
filepath = os.path.join( MY_DIRECTORY , filename)
if os.path.isfile(filepath):
yield RunRequest(
run_key = filename,
run_config = RunConfig(
ops = { "process_file" : FileConfig( filename = filename)}
),
)
Asset Sensors
Trigger runs when assets are materialized:
import dagster as dg
@dg.asset
def upstream_asset ():
return [ 1 , 2 , 3 ]
@dg.asset
def downstream_asset ():
return [ 4 , 5 , 6 ]
@dg.asset_sensor (
asset_key = dg.AssetKey( "upstream_asset" ),
target = [downstream_asset],
)
def upstream_materialization_sensor ( context , asset_event ):
# Triggered whenever upstream_asset is materialized
yield RunRequest(
run_key = context.cursor,
tags = { "triggered_by" : "upstream_asset" },
)
Multi-Asset Sensors
React to multiple asset materializations:
import dagster as dg
@dg.asset
def asset_a ():
return 1
@dg.asset
def asset_b ():
return 2
@dg.asset
def combined_asset ():
return 3
@dg.multi_asset_sensor (
monitored_assets = [dg.AssetKey( "asset_a" ), dg.AssetKey( "asset_b" )],
target = [combined_asset],
)
def multi_asset_sensor ( context ):
# Get latest materialization for each asset
asset_events = context.latest_materialization_records_by_key()
if len (asset_events) == 2 : # Both assets materialized
yield RunRequest( run_key = context.cursor)
Sensor Evaluation Interval
Control how frequently sensors run:
import dagster as dg
@dg.sensor (
target = my_job,
minimum_interval_seconds = 30 , # Evaluate every 30 seconds
)
def frequent_sensor ():
if should_trigger():
yield RunRequest( run_key = None )
By default, sensors evaluate every 30 seconds. Set minimum_interval_seconds to control the polling frequency.
Sensor Context
Access state and metadata in sensors:
import dagster as dg
@dg.sensor ( target = my_job)
def stateful_sensor ( context : dg.SensorEvaluationContext):
# Get last cursor (for tracking state)
last_cursor = context.cursor or "0"
# Fetch new events since last cursor
new_events = fetch_events_since(last_cursor)
for event in new_events:
yield RunRequest(
run_key = event.id,
run_config = { "event_id" : event.id},
)
# Update cursor for next evaluation
if new_events:
context.update_cursor( str (new_events[ - 1 ].id))
Run Status Sensors
React to job completion or failure:
import dagster as dg
@dg.run_status_sensor (
run_status = dg.DagsterRunStatus. SUCCESS ,
monitored_jobs = [my_job],
)
def success_sensor ( context : dg.RunStatusSensorContext):
context.log.info( f "Job { context.dagster_run.job_name } succeeded!" )
# Send notification
send_slack_message(
f "Job completed successfully: { context.dagster_run.run_id } "
)
@dg.run_failure_sensor ()
def failure_sensor ( context : dg.RunFailureSensorContext):
context.log.error( f "Job failed: { context.failure_event.message } " )
# Send alert
send_pagerduty_alert(
message = context.failure_event.message,
run_id = context.dagster_run.run_id,
)
Default Status
Sensors can start running immediately:
import dagster as dg
@dg.sensor (
target = [my_asset],
default_status = dg.DefaultSensorStatus. RUNNING ,
)
def my_running_sensor ():
if condition_met():
yield RunRequest()
Skipping Sensor Runs
Provide reasons for skipping:
import dagster as dg
@dg.sensor ( target = my_job)
def conditional_sensor ():
if not should_trigger():
return dg.SkipReason( "Conditions not met" )
yield RunRequest( run_key = None )
Testing Schedules
Test schedule evaluation:
import dagster as dg
from datetime import datetime
@dg.schedule ( target = my_job, cron_schedule = "0 0 * * *" )
def my_schedule ( context ):
return dg.RunRequest(
run_config = { "date" : context.scheduled_execution_time.isoformat()}
)
def test_my_schedule ():
# Build evaluation context
context = dg.build_schedule_context(
scheduled_execution_time = datetime( 2024 , 1 , 15 , 0 , 0 ),
)
# Evaluate schedule
run_request = my_schedule(context)
assert run_request is not None
assert "date" in run_request.run_config
Testing Sensors
Test sensor evaluation:
import dagster as dg
@dg.sensor ( target = log_file_job)
def sensor_to_test ():
yield RunRequest(
run_key = "foo" ,
run_config = { "ops" : { "process_file" : { "config" : { "filename" : "foo" }}}},
)
def test_sensor ():
# Evaluate sensor
run_requests = list (sensor_to_test())
assert len (run_requests) == 1
assert run_requests[ 0 ].run_key == "foo"
Test with context:
import dagster as dg
@dg.sensor ( target = my_job)
def stateful_sensor ( context ):
# Use context.cursor to track state
last_id = int (context.cursor or "0" )
new_items = fetch_items_after(last_id)
for item in new_items:
yield RunRequest( run_key = str (item.id))
if new_items:
context.update_cursor( str (new_items[ - 1 ].id))
def test_stateful_sensor ():
# Build sensor context with initial cursor
context = dg.build_sensor_context( cursor = "100" )
# Evaluate sensor
run_requests = list (stateful_sensor(context))
assert len (run_requests) > 0
assert context.cursor != "100" # Cursor should update
Schedule vs Sensor: When to Use Each
You need time-based execution (daily reports, nightly ETL)
Your pipeline runs on a fixed cadence
Timing is predictable and regular
You want simple cron-based scheduling
You need event-based execution (new files, API updates)
Timing is unpredictable or depends on external systems
You want to react to asset materializations
You need to check external systems for changes
Schedule & Sensor Management
Starting/Stopping via UI
Navigate to the Schedules or Sensors page
Toggle the schedule/sensor on or off
View execution history and logs
Starting/Stopping via CLI
# Start a schedule
dagster schedule start my_schedule
# Stop a schedule
dagster schedule stop my_schedule
# Start a sensor
dagster sensor start my_sensor
# Stop a sensor
dagster sensor stop my_sensor
# List all schedules
dagster schedule list
# List all sensors
dagster sensor list
Partitioned Schedules
Schedules can materialize specific partitions:
import dagster as dg
@dg.asset (
partitions_def = dg.DailyPartitionsDefinition( start_date = "2024-01-01" )
)
def daily_asset ( context ):
return fetch_data_for_date(context.partition_key)
@dg.schedule (
target = [daily_asset],
cron_schedule = "0 1 * * *" , # 1 AM every day
)
def daily_partition_schedule ( context : dg.ScheduleEvaluationContext):
# Get yesterday's date for the partition
scheduled_date = context.scheduled_execution_time.strftime( "%Y-%m- %d " )
return dg.RunRequest(
partition_key = scheduled_date,
)
Advanced Patterns
Conditional Schedule Execution
import dagster as dg
@dg.schedule ( target = my_job, cron_schedule = "0 0 * * *" )
def conditional_schedule ( context : dg.ScheduleEvaluationContext):
# Skip weekends
if context.scheduled_execution_time.weekday() >= 5 :
return dg.SkipReason( "Skipping weekend execution" )
# Skip holidays
if is_holiday(context.scheduled_execution_time):
return dg.SkipReason( "Skipping holiday" )
return dg.RunRequest()
Dynamic Run Requests
Generate multiple run requests from a single evaluation:
import dagster as dg
@dg.sensor ( target = my_job)
def multi_run_sensor ():
# Check multiple sources
sources = [ "source_a" , "source_b" , "source_c" ]
for source in sources:
if has_new_data(source):
yield RunRequest(
run_key = f " { source } _ { get_timestamp() } " ,
run_config = { "source" : source},
tags = { "source" : source},
)
Sensor Cursor for State Management
import dagster as dg
import json
@dg.sensor ( target = my_job)
def cursor_sensor ( context : dg.SensorEvaluationContext):
# Load state from cursor
state = json.loads(context.cursor) if context.cursor else { "last_id" : 0 }
# Fetch new items
new_items = fetch_items_since(state[ "last_id" ])
for item in new_items:
yield RunRequest(
run_key = str (item.id),
run_config = { "item_id" : item.id},
)
# Update state
if new_items:
state[ "last_id" ] = new_items[ - 1 ].id
context.update_cursor(json.dumps(state))
Best Practices
Run keys prevent duplicate runs. Use identifiers like filenames, timestamps, or event IDs to ensure idempotency.
Wrap sensor logic in try-except blocks and return SkipReason on errors to avoid marking sensors as failed.
Keep sensor evaluation fast
Sensors should check for events quickly. Avoid expensive computations in sensor code - save those for the job execution.
Use run status sensors for monitoring
Implement run status sensors to send notifications on success or failure, providing observability into pipeline health.
Test schedules and sensors
Use build_schedule_context() and build_sensor_context() to write unit tests for your automation logic.
API Reference