Skip to main content

Schedules & Sensors

Schedules and sensors enable automatic execution of your Dagster pipelines:
  • Schedules: Trigger runs on a fixed time-based cadence (e.g., daily at 9 AM)
  • Sensors: Trigger runs in response to events (e.g., new files, external system changes)
Both provide declarative ways to automate pipeline execution without manual intervention.

Schedules

Schedules trigger runs based on cron expressions or simple time intervals:

Basic Schedule

Define a schedule using ScheduleDefinition:
import dagster as dg

basic_schedule = dg.ScheduleDefinition(
    name="basic_schedule",
    cron_schedule="0 0 * * *",  # Every day at midnight
    target="*",  # Materialize all assets
)

Schedule Decorator

For dynamic run configuration, use the @schedule decorator:
import dagster as dg

class AssetConfig(dg.Config):
    scheduled_date: str

@dg.asset
def configurable_asset(context: dg.AssetExecutionContext, config: AssetConfig):
    context.log.info(f"Processing data for {config.scheduled_date}")
    return fetch_data(config.scheduled_date)

@dg.schedule(target=configurable_asset, cron_schedule="0 0 * * *")
def configurable_job_schedule(context: dg.ScheduleEvaluationContext):
    scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    
    return dg.RunRequest(
        run_key=None,
        run_config={
            "ops": {
                "configurable_asset": {
                    "config": {"scheduled_date": scheduled_date}
                }
            }
        },
        tags={"date": scheduled_date},
    )

Cron Syntax

Schedules use standard cron syntax:
import dagster as dg

# Every day at 9 AM
daily_schedule = dg.ScheduleDefinition(
    name="daily_9am",
    cron_schedule="0 9 * * *",
    target="*",
)

# Every Monday at 8 AM
weekly_schedule = dg.ScheduleDefinition(
    name="weekly_monday",
    cron_schedule="0 8 * * 1",
    target="*",
)

# Every hour
hourly_schedule = dg.ScheduleDefinition(
    name="hourly",
    cron_schedule="0 * * * *",
    target="*",
)

# Every 15 minutes
frequent_schedule = dg.ScheduleDefinition(
    name="every_15_min",
    cron_schedule="*/15 * * * *",
    target="*",
)
Cron format: minute hour day_of_month month day_of_week
  • * means “every”
  • */n means “every n”
  • 0 9 * * * = 9:00 AM every day
  • 0 0 * * 1 = midnight every Monday

Multiple Cron Schedules

Run on the union of multiple cron expressions:
import dagster as dg

# Run at 11:45 PM Saturday AND 9:30 AM Sunday
weekend_schedule = dg.ScheduleDefinition(
    name="weekend_schedule",
    cron_schedule=["45 23 * * 6", "30 9 * * 0"],
    target="*",
)

Timezone Configuration

Specify the timezone for schedule execution:
import dagster as dg

my_timezone_schedule = dg.ScheduleDefinition(
    name="my_timezone_schedule",
    target="*",
    cron_schedule="0 9 * * *",
    execution_timezone="America/Los_Angeles",
)

Targeting Assets

Schedule specific assets or asset selections:
import dagster as dg

# Schedule a specific asset group
basic_asset_schedule = dg.ScheduleDefinition(
    name="basic_asset_schedule",
    cron_schedule="0 0 * * *",
    target=dg.AssetSelection.groups("some_asset_group"),
)

# Schedule specific assets
specific_assets_schedule = dg.ScheduleDefinition(
    name="specific_assets",
    cron_schedule="0 6 * * *",
    target=dg.AssetSelection.assets("asset1", "asset2"),
)

Default Status

Schedules can start running immediately:
import dagster as dg

my_running_schedule = dg.ScheduleDefinition(
    name="my_running_schedule",
    target="*",
    cron_schedule="0 9 * * *",
    default_status=dg.DefaultScheduleStatus.RUNNING,
)
By default, schedules are created in a STOPPED state. Set default_status=DefaultScheduleStatus.RUNNING to activate them automatically when deployed.

Schedule Evaluation Context

Access runtime information in schedule functions:
import dagster as dg

@dg.schedule(target="*", cron_schedule="0 0 * * *")
def context_aware_schedule(context: dg.ScheduleEvaluationContext):
    context.log.info(f"Scheduled time: {context.scheduled_execution_time}")
    context.log.info(f"Cursor: {context.cursor}")
    
    # Skip execution conditionally
    if should_skip(context.scheduled_execution_time):
        return dg.SkipReason("Skipping due to business logic")
    
    return dg.RunRequest(run_key=None)

Logging in Schedules

import dagster as dg

@dg.schedule(target="*", cron_schedule="* * * * *")
def logs_then_skips(context):
    context.log.info("Logging from a dg.schedule!")
    return dg.SkipReason("Nothing to do")

Sensors

Sensors check for events and trigger runs reactively:

Basic Sensor

Define a sensor using the @sensor decorator:
import os
import dagster as dg
from dagster import sensor, RunRequest, RunConfig

class FileConfig(dg.Config):
    filename: str

@dg.op
def process_file(context: dg.OpExecutionContext, config: FileConfig):
    context.log.info(f"Processing {config.filename}")

@dg.job
def log_file_job():
    process_file()

MY_DIRECTORY = "./data"

@sensor(target=log_file_job)
def my_directory_sensor():
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config=RunConfig(
                    ops={"process_file": FileConfig(filename=filename)}
                ),
            )

Asset Sensors

Trigger runs when assets are materialized:
import dagster as dg

@dg.asset
def upstream_asset():
    return [1, 2, 3]

@dg.asset
def downstream_asset():
    return [4, 5, 6]

@dg.asset_sensor(
    asset_key=dg.AssetKey("upstream_asset"),
    target=[downstream_asset],
)
def upstream_materialization_sensor(context, asset_event):
    # Triggered whenever upstream_asset is materialized
    yield RunRequest(
        run_key=context.cursor,
        tags={"triggered_by": "upstream_asset"},
    )

Multi-Asset Sensors

React to multiple asset materializations:
import dagster as dg

@dg.asset
def asset_a():
    return 1

@dg.asset
def asset_b():
    return 2

@dg.asset
def combined_asset():
    return 3

@dg.multi_asset_sensor(
    monitored_assets=[dg.AssetKey("asset_a"), dg.AssetKey("asset_b")],
    target=[combined_asset],
)
def multi_asset_sensor(context):
    # Get latest materialization for each asset
    asset_events = context.latest_materialization_records_by_key()
    
    if len(asset_events) == 2:  # Both assets materialized
        yield RunRequest(run_key=context.cursor)

Sensor Evaluation Interval

Control how frequently sensors run:
import dagster as dg

@dg.sensor(
    target=my_job,
    minimum_interval_seconds=30,  # Evaluate every 30 seconds
)
def frequent_sensor():
    if should_trigger():
        yield RunRequest(run_key=None)
By default, sensors evaluate every 30 seconds. Set minimum_interval_seconds to control the polling frequency.

Sensor Context

Access state and metadata in sensors:
import dagster as dg

@dg.sensor(target=my_job)
def stateful_sensor(context: dg.SensorEvaluationContext):
    # Get last cursor (for tracking state)
    last_cursor = context.cursor or "0"
    
    # Fetch new events since last cursor
    new_events = fetch_events_since(last_cursor)
    
    for event in new_events:
        yield RunRequest(
            run_key=event.id,
            run_config={"event_id": event.id},
        )
    
    # Update cursor for next evaluation
    if new_events:
        context.update_cursor(str(new_events[-1].id))

Run Status Sensors

React to job completion or failure:
import dagster as dg

@dg.run_status_sensor(
    run_status=dg.DagsterRunStatus.SUCCESS,
    monitored_jobs=[my_job],
)
def success_sensor(context: dg.RunStatusSensorContext):
    context.log.info(f"Job {context.dagster_run.job_name} succeeded!")
    
    # Send notification
    send_slack_message(
        f"Job completed successfully: {context.dagster_run.run_id}"
    )

@dg.run_failure_sensor()
def failure_sensor(context: dg.RunFailureSensorContext):
    context.log.error(f"Job failed: {context.failure_event.message}")
    
    # Send alert
    send_pagerduty_alert(
        message=context.failure_event.message,
        run_id=context.dagster_run.run_id,
    )

Default Status

Sensors can start running immediately:
import dagster as dg

@dg.sensor(
    target=[my_asset],
    default_status=dg.DefaultSensorStatus.RUNNING,
)
def my_running_sensor():
    if condition_met():
        yield RunRequest()

Skipping Sensor Runs

Provide reasons for skipping:
import dagster as dg

@dg.sensor(target=my_job)
def conditional_sensor():
    if not should_trigger():
        return dg.SkipReason("Conditions not met")
    
    yield RunRequest(run_key=None)

Testing Schedules

Test schedule evaluation:
import dagster as dg
from datetime import datetime

@dg.schedule(target=my_job, cron_schedule="0 0 * * *")
def my_schedule(context):
    return dg.RunRequest(
        run_config={"date": context.scheduled_execution_time.isoformat()}
    )

def test_my_schedule():
    # Build evaluation context
    context = dg.build_schedule_context(
        scheduled_execution_time=datetime(2024, 1, 15, 0, 0),
    )
    
    # Evaluate schedule
    run_request = my_schedule(context)
    
    assert run_request is not None
    assert "date" in run_request.run_config

Testing Sensors

Test sensor evaluation:
import dagster as dg

@dg.sensor(target=log_file_job)
def sensor_to_test():
    yield RunRequest(
        run_key="foo",
        run_config={"ops": {"process_file": {"config": {"filename": "foo"}}}},
    )

def test_sensor():
    # Evaluate sensor
    run_requests = list(sensor_to_test())
    
    assert len(run_requests) == 1
    assert run_requests[0].run_key == "foo"
Test with context:
import dagster as dg

@dg.sensor(target=my_job)
def stateful_sensor(context):
    # Use context.cursor to track state
    last_id = int(context.cursor or "0")
    new_items = fetch_items_after(last_id)
    
    for item in new_items:
        yield RunRequest(run_key=str(item.id))
    
    if new_items:
        context.update_cursor(str(new_items[-1].id))

def test_stateful_sensor():
    # Build sensor context with initial cursor
    context = dg.build_sensor_context(cursor="100")
    
    # Evaluate sensor
    run_requests = list(stateful_sensor(context))
    
    assert len(run_requests) > 0
    assert context.cursor != "100"  # Cursor should update

Schedule vs Sensor: When to Use Each

  • You need time-based execution (daily reports, nightly ETL)
  • Your pipeline runs on a fixed cadence
  • Timing is predictable and regular
  • You want simple cron-based scheduling
  • You need event-based execution (new files, API updates)
  • Timing is unpredictable or depends on external systems
  • You want to react to asset materializations
  • You need to check external systems for changes

Schedule & Sensor Management

Starting/Stopping via UI

  1. Navigate to the Schedules or Sensors page
  2. Toggle the schedule/sensor on or off
  3. View execution history and logs

Starting/Stopping via CLI

# Start a schedule
dagster schedule start my_schedule

# Stop a schedule
dagster schedule stop my_schedule

# Start a sensor
dagster sensor start my_sensor

# Stop a sensor
dagster sensor stop my_sensor

# List all schedules
dagster schedule list

# List all sensors
dagster sensor list

Partitioned Schedules

Schedules can materialize specific partitions:
import dagster as dg

@dg.asset(
    partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01")
)
def daily_asset(context):
    return fetch_data_for_date(context.partition_key)

@dg.schedule(
    target=[daily_asset],
    cron_schedule="0 1 * * *",  # 1 AM every day
)
def daily_partition_schedule(context: dg.ScheduleEvaluationContext):
    # Get yesterday's date for the partition
    scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    
    return dg.RunRequest(
        partition_key=scheduled_date,
    )

Advanced Patterns

Conditional Schedule Execution

import dagster as dg

@dg.schedule(target=my_job, cron_schedule="0 0 * * *")
def conditional_schedule(context: dg.ScheduleEvaluationContext):
    # Skip weekends
    if context.scheduled_execution_time.weekday() >= 5:
        return dg.SkipReason("Skipping weekend execution")
    
    # Skip holidays
    if is_holiday(context.scheduled_execution_time):
        return dg.SkipReason("Skipping holiday")
    
    return dg.RunRequest()

Dynamic Run Requests

Generate multiple run requests from a single evaluation:
import dagster as dg

@dg.sensor(target=my_job)
def multi_run_sensor():
    # Check multiple sources
    sources = ["source_a", "source_b", "source_c"]
    
    for source in sources:
        if has_new_data(source):
            yield RunRequest(
                run_key=f"{source}_{get_timestamp()}",
                run_config={"source": source},
                tags={"source": source},
            )

Sensor Cursor for State Management

import dagster as dg
import json

@dg.sensor(target=my_job)
def cursor_sensor(context: dg.SensorEvaluationContext):
    # Load state from cursor
    state = json.loads(context.cursor) if context.cursor else {"last_id": 0}
    
    # Fetch new items
    new_items = fetch_items_since(state["last_id"])
    
    for item in new_items:
        yield RunRequest(
            run_key=str(item.id),
            run_config={"item_id": item.id},
        )
    
    # Update state
    if new_items:
        state["last_id"] = new_items[-1].id
        context.update_cursor(json.dumps(state))

Best Practices

Run keys prevent duplicate runs. Use identifiers like filenames, timestamps, or event IDs to ensure idempotency.
Wrap sensor logic in try-except blocks and return SkipReason on errors to avoid marking sensors as failed.
Sensors should check for events quickly. Avoid expensive computations in sensor code - save those for the job execution.
Implement run status sensors to send notifications on success or failure, providing observability into pipeline health.
Use build_schedule_context() and build_sensor_context() to write unit tests for your automation logic.

API Reference

Build docs developers (and LLMs) love