Skip to main content
metaflow-dagster automatically converts @schedule decorators on your Metaflow flows into Dagster ScheduleDefinition objects, enabling time-based execution without any additional configuration.

How it works

When you decorate a flow with @schedule, the compiler:
  1. Detects the schedule decorator at compile time
  2. Generates a ScheduleDefinition automatically
  3. Includes it in the Definitions object
  4. Makes it available in the Dagster UI

Using @schedule decorator

Add the @schedule decorator to your flow with a cron expression or shorthand:
from metaflow import FlowSpec, step, schedule

@schedule(cron="0 2 * * *")
class NightlyPipeline(FlowSpec):
    @step
    def start(self):
        print("Running nightly job...")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Cron syntax

The cron parameter accepts standard cron expressions:
# Every day at 2 AM
@schedule(cron="0 2 * * *")

# Every Monday at 9 AM
@schedule(cron="0 9 * * 1")

# Every 6 hours
@schedule(cron="0 */6 * * *")

# Every weekday at 8:30 AM
@schedule(cron="30 8 * * 1-5")
Cron format: minute hour day month day_of_week

Shorthand schedules

For common patterns, use shorthand keywords:
# Run daily at midnight
@schedule(daily=True)
class DailyFlow(FlowSpec):
    ...

# Run hourly
@schedule(hourly=True)
class HourlyFlow(FlowSpec):
    ...

# Run weekly on Sunday at midnight
@schedule(weekly=True)
class WeeklyFlow(FlowSpec):
    ...
Shorthand mappings:
  • daily=True"0 0 * * *" (midnight)
  • hourly=True"0 * * * *" (top of every hour)
  • weekly=True"0 0 * * 0" (Sunday midnight)

Generated ScheduleDefinition

The compiler generates a schedule definition automatically:
# Generated in dagster_defs.py
NightlyPipeline_schedule = ScheduleDefinition(
    job=NightlyPipeline,
    cron_schedule="0 2 * * *",
)

defs = Definitions(
    jobs=[NightlyPipeline],
    schedules=[NightlyPipeline_schedule],
    sensors=[],
)
The schedule is named <JobName>_schedule and automatically included in the Definitions object.

Managing schedules in Dagster

1

Compile the flow

Generate the Dagster definitions file:
python my_flow.py dagster create dagster_defs.py
2

Launch Dagster

Start the Dagster UI:
dagster dev -f dagster_defs.py
3

Enable the schedule

In the Dagster UI:
  • Navigate to Overview → Schedules
  • Find your schedule (e.g., NightlyPipeline_schedule)
  • Toggle it to Running
4

View upcoming runs

Dagster shows the next scheduled execution time and all past runs.

Combining schedules with parameters

Schedules work seamlessly with parametrized flows:
from metaflow import FlowSpec, step, schedule, Parameter

@schedule(daily=True)
class DataRefreshFlow(FlowSpec):
    environment = Parameter("environment", default="production")
    batch_size = Parameter("batch_size", default=1000)
    
    @step
    def start(self):
        print(f"Refreshing {self.environment} with batch size {self.batch_size}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
Scheduled runs use the default parameter values. You can override parameters for manual runs through the Dagster launchpad.

Disabling schedules

To temporarily disable a schedule without redeploying:
  1. In the Dagster UI, go to Overview → Schedules
  2. Toggle the schedule to Stopped
To permanently remove a schedule:
  1. Remove the @schedule decorator from your flow
  2. Recompile: python my_flow.py dagster create dagster_defs.py
  3. Restart Dagster
Schedules are evaluated by the Dagster daemon. Make sure the daemon is running for schedules to fire. When using dagster dev, the daemon is started automatically.

Next steps

Triggers and Sensors

Learn about event-driven execution with @trigger and @trigger_on_finish

Resume Failed Runs

Recover from failures by reusing completed steps