Skip to main content
The @schedule decorator allows you to run your Metaflow workflows on a recurring schedule when deployed to production orchestrators.

Basic Usage

Add the @schedule decorator to your flow to specify when it should run:
from metaflow import FlowSpec, schedule, step

@schedule(daily=True)
class DailyReportFlow(FlowSpec):
    
    @step
    def start(self):
        print("Running daily report...")
        self.next(self.end)
    
    @step
    def end(self):
        print("Report complete")

if __name__ == '__main__':
    DailyReportFlow()
Deploy the scheduled flow:
python daily_report.py step-functions create
The flow will now run automatically every day at midnight UTC.

Schedule Options

Predefined Intervals

The @schedule decorator supports common scheduling intervals:
hourly
bool
default:"False"
Run the workflow every hour
daily
bool
default:"True"
Run the workflow every day at midnight UTC
weekly
bool
default:"False"
Run the workflow every Sunday at midnight UTC
# Hourly execution
@schedule(hourly=True)
class HourlyFlow(FlowSpec):
    pass

# Weekly execution
@schedule(weekly=True)
class WeeklyFlow(FlowSpec):
    pass

Custom Cron Expressions

For more complex schedules, use cron expressions:
cron
str
default:"None"
Custom cron expression for scheduling
# Run every weekday at 9 AM UTC
@schedule(cron='0 9 ? * MON-FRI *')
class WeekdayMorningFlow(FlowSpec):
    pass

# Run on the first day of every month
@schedule(cron='0 0 1 * ? *')
class MonthlyFlow(FlowSpec):
    pass

# Run every 15 minutes
@schedule(cron='0/15 * * * ? *')
class FrequentFlow(FlowSpec):
    pass

Cron Expression Format

AWS Step Functions

For AWS Step Functions deployments, use AWS EventBridge cron expressions:
cron(Minutes Hours Day-of-month Month Day-of-week Year)
Format: Minutes Hours Day-of-month Month Day-of-week Year
  • Minutes: 0-59
  • Hours: 0-23 (UTC)
  • Day-of-month: 1-31
  • Month: 1-12 or JAN-DEC
  • Day-of-week: 1-7 or SUN-SAT (1 = Sunday)
  • Year: 1970-2199
Special characters:
  • * (wildcard): All values
  • , (comma): Multiple values
  • - (dash): Range of values
  • / (slash): Increments
  • ? (question mark): No specific value (use for day-of-month or day-of-week)
Examples:
# Every day at 2:30 PM UTC
@schedule(cron='30 14 * * ? *')

# Every Monday at 8 AM UTC
@schedule(cron='0 8 ? * MON *')

# Every 30 minutes
@schedule(cron='0/30 * * * ? *')

# Last day of every month at midnight
@schedule(cron='0 0 L * ? *')

# First Monday of every month
@schedule(cron='0 0 ? * MON#1 *')

Argo Workflows

For Argo Workflows, use standard Unix cron format:
Minute Hour Day Month Weekday
Format: Minute Hour Day Month Weekday
  • Minute: 0-59
  • Hour: 0-23 (UTC by default, configurable with timezone)
  • Day: 1-31
  • Month: 1-12
  • Weekday: 0-6 (0 = Sunday)
Examples:
# Every day at 3 AM UTC
@schedule(cron='0 3 * * *')

# Every Monday at 9 AM
@schedule(cron='0 9 * * 1')

# Every 2 hours
@schedule(cron='0 */2 * * *')

Timezone Support

Argo Workflows supports timezone configuration:
timezone
str
default:"None"
Timezone in IANA format (e.g., ‘America/Los_Angeles’)
from metaflow import FlowSpec, schedule, step

# Run at 9 AM Pacific Time
@schedule(cron='0 9 * * *', timezone='America/Los_Angeles')
class PacificTimeFlow(FlowSpec):
    
    @step
    def start(self):
        from datetime import datetime
        print(f"Current time: {datetime.now()}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
Timezone support is only available for Argo Workflows. AWS Step Functions schedules always run in UTC.

Managing Scheduled Flows

Deploying a Scheduled Flow

# Deploy with schedule
python myflow.py step-functions create

# Deploy with Argo Workflows
python myflow.py argo-workflows create
The scheduler is automatically configured based on the @schedule decorator.

Triggering Manually

You can still trigger scheduled flows manually:
# Manual trigger with Step Functions
python myflow.py step-functions trigger

# Manual trigger with Argo Workflows
python myflow.py argo-workflows trigger

Pausing Scheduled Flows

To pause a scheduled flow without deleting it: AWS Step Functions: Disable the EventBridge rule in the AWS Console Argo Workflows: Suspend the CronWorkflow:
kubectl patch cronworkflow <workflow-name> -p '{"spec":{"suspend":true}}'

Updating the Schedule

To change the schedule, modify the @schedule decorator and redeploy:
# Change from daily to hourly
@schedule(hourly=True)  # was: @schedule(daily=True)
class MyFlow(FlowSpec):
    pass
Then redeploy:
python myflow.py step-functions create

Examples

Data Pipeline with Hourly Updates

from metaflow import FlowSpec, schedule, step, Parameter, resources

@schedule(hourly=True)
class HourlyETLFlow(FlowSpec):
    """ETL pipeline that runs every hour"""
    
    source = Parameter('source',
                      help='Data source to process',
                      default='s3://my-bucket/data/')
    
    @step
    def start(self):
        from datetime import datetime
        self.timestamp = datetime.utcnow().isoformat()
        print(f"Starting ETL at {self.timestamp}")
        self.next(self.extract)
    
    @resources(memory=4000)
    @step
    def extract(self):
        # Extract data from source
        print(f"Extracting data from {self.source}")
        self.data = self.load_data()
        self.next(self.transform)
    
    @resources(memory=8000, cpu=2)
    @step
    def transform(self):
        # Transform data
        self.processed_data = self.process(self.data)
        self.next(self.load)
    
    @step
    def load(self):
        # Load to destination
        self.save_results(self.processed_data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"ETL completed at {self.timestamp}")

Weekly Report Generation

from metaflow import FlowSpec, schedule, step, card

@schedule(cron='0 8 ? * MON *')  # Every Monday at 8 AM UTC
class WeeklyReportFlow(FlowSpec):
    """Generate weekly business reports"""
    
    @step
    def start(self):
        from datetime import datetime, timedelta
        self.end_date = datetime.utcnow()
        self.start_date = self.end_date - timedelta(days=7)
        self.next(self.fetch_data)
    
    @step
    def fetch_data(self):
        # Fetch last week's data
        self.metrics = self.get_weekly_metrics(
            self.start_date, 
            self.end_date
        )
        self.next(self.generate_report)
    
    @card
    @step
    def generate_report(self):
        # Generate visualizations and report
        from metaflow.cards import Markdown, Table
        
        report = Markdown(f"# Weekly Report\n\n"
                         f"Period: {self.start_date} to {self.end_date}")
        
        metrics_table = Table(self.metrics)
        
        self.next(self.send_report)
    
    @step
    def send_report(self):
        # Send report via email or Slack
        self.notify_stakeholders()
        self.next(self.end)
    
    @step
    def end(self):
        print("Weekly report sent successfully")

Complex Schedule with Multiple Time Zones

from metaflow import FlowSpec, schedule, step, project

# For global team: Run during business hours in multiple regions
@schedule(cron='0 9,14,22 * * *')  # 9 AM, 2 PM, 10 PM UTC
class GlobalSyncFlow(FlowSpec):
    """Sync data for teams across time zones"""
    
    @step
    def start(self):
        from datetime import datetime
        import pytz
        
        current_hour = datetime.utcnow().hour
        
        if current_hour == 9:
            self.region = 'APAC'
            self.tz = pytz.timezone('Asia/Tokyo')
        elif current_hour == 14:
            self.region = 'EMEA'
            self.tz = pytz.timezone('Europe/London')
        else:
            self.region = 'Americas'
            self.tz = pytz.timezone('America/New_York')
        
        self.next(self.sync_region)
    
    @step
    def sync_region(self):
        print(f"Syncing data for {self.region}")
        self.next(self.end)
    
    @step
    def end(self):
        pass

Best Practices

Always think in UTC when setting schedules to avoid confusion with daylight saving time changes. If you need local time, use Argo Workflows with the timezone parameter.
Ensure your flow can complete within the schedule interval. If a flow takes 2 hours to run, don’t schedule it to run every hour.
Use --workflow-timeout when deploying to prevent runaway executions:
python myflow.py step-functions create --workflow-timeout 3600
Scheduled flows should handle cases where expected data isn’t available yet. Use @retry or conditional logic.
Use online cron expression testers to verify your schedule before deploying:

Troubleshooting

Flow Not Running on Schedule

  1. Check the schedule syntax: Verify your cron expression is valid
  2. Check orchestrator logs: Look for scheduling errors in AWS EventBridge or Argo
  3. Verify deployment: Ensure the flow was deployed with create, not just triggered

Unexpected Run Times

  1. Verify timezone: Remember that Step Functions uses UTC only
  2. Check for delays: Orchestrators may have slight delays (typically < 1 minute)
  3. Consider execution time: The schedule is for start time, not completion time

Multiple Concurrent Runs

  1. Increase workflow timeout: Set appropriate limits
  2. Adjust schedule frequency: Ensure flows complete before next run
  3. Implement run locks: Use external coordination if needed

Next Steps

Event Triggering

Learn about event-driven workflows

Monitoring

Monitor scheduled flows in production

Build docs developers (and LLMs) love