The @schedule decorator allows you to run your Metaflow workflows on a recurring schedule when deployed to production orchestrators.
Basic Usage
Add the @schedule decorator to your flow to specify when it should run:
from metaflow import FlowSpec, schedule, step
@schedule ( daily = True )
class DailyReportFlow ( FlowSpec ):
@step
def start ( self ):
print ( "Running daily report..." )
self .next( self .end)
@step
def end ( self ):
print ( "Report complete" )
if __name__ == '__main__' :
DailyReportFlow()
Deploy the scheduled flow:
python daily_report.py step-functions create
The flow will now run automatically every day at midnight UTC.
Schedule Options
Predefined Intervals
The @schedule decorator supports common scheduling intervals:
Run the workflow every hour
Run the workflow every day at midnight UTC
Run the workflow every Sunday at midnight UTC
# Hourly execution
@schedule ( hourly = True )
class HourlyFlow ( FlowSpec ):
pass
# Weekly execution
@schedule ( weekly = True )
class WeeklyFlow ( FlowSpec ):
pass
Custom Cron Expressions
For more complex schedules, use cron expressions:
Custom cron expression for scheduling
# Run every weekday at 9 AM UTC
@schedule ( cron = '0 9 ? * MON-FRI *' )
class WeekdayMorningFlow ( FlowSpec ):
pass
# Run on the first day of every month
@schedule ( cron = '0 0 1 * ? *' )
class MonthlyFlow ( FlowSpec ):
pass
# Run every 15 minutes
@schedule ( cron = '0/15 * * * ? *' )
class FrequentFlow ( FlowSpec ):
pass
AWS Step Functions
For AWS Step Functions deployments, use AWS EventBridge cron expressions :
cron(Minutes Hours Day-of-month Month Day-of-week Year)
Format : Minutes Hours Day-of-month Month Day-of-week Year
Minutes : 0-59
Hours : 0-23 (UTC)
Day-of-month : 1-31
Month : 1-12 or JAN-DEC
Day-of-week : 1-7 or SUN-SAT (1 = Sunday)
Year : 1970-2199
Special characters :
* (wildcard): All values
, (comma): Multiple values
- (dash): Range of values
/ (slash): Increments
? (question mark): No specific value (use for day-of-month or day-of-week)
Examples :
# Every day at 2:30 PM UTC
@schedule ( cron = '30 14 * * ? *' )
# Every Monday at 8 AM UTC
@schedule ( cron = '0 8 ? * MON *' )
# Every 30 minutes
@schedule ( cron = '0/30 * * * ? *' )
# Last day of every month at midnight
@schedule ( cron = '0 0 L * ? *' )
# First Monday of every month
@schedule ( cron = '0 0 ? * MON#1 *' )
Argo Workflows
For Argo Workflows, use standard Unix cron format:
Minute Hour Day Month Weekday
Format : Minute Hour Day Month Weekday
Minute : 0-59
Hour : 0-23 (UTC by default, configurable with timezone)
Day : 1-31
Month : 1-12
Weekday : 0-6 (0 = Sunday)
Examples :
# Every day at 3 AM UTC
@schedule ( cron = '0 3 * * *' )
# Every Monday at 9 AM
@schedule ( cron = '0 9 * * 1' )
# Every 2 hours
@schedule ( cron = '0 */2 * * *' )
Timezone Support
Argo Workflows supports timezone configuration:
Timezone in IANA format (e.g., ‘America/Los_Angeles’)
from metaflow import FlowSpec, schedule, step
# Run at 9 AM Pacific Time
@schedule ( cron = '0 9 * * *' , timezone = 'America/Los_Angeles' )
class PacificTimeFlow ( FlowSpec ):
@step
def start ( self ):
from datetime import datetime
print ( f "Current time: { datetime.now() } " )
self .next( self .end)
@step
def end ( self ):
pass
Timezone support is only available for Argo Workflows . AWS Step Functions schedules always run in UTC.
Managing Scheduled Flows
Deploying a Scheduled Flow
# Deploy with schedule
python myflow.py step-functions create
# Deploy with Argo Workflows
python myflow.py argo-workflows create
The scheduler is automatically configured based on the @schedule decorator.
Triggering Manually
You can still trigger scheduled flows manually:
# Manual trigger with Step Functions
python myflow.py step-functions trigger
# Manual trigger with Argo Workflows
python myflow.py argo-workflows trigger
Pausing Scheduled Flows
To pause a scheduled flow without deleting it:
AWS Step Functions : Disable the EventBridge rule in the AWS Console
Argo Workflows : Suspend the CronWorkflow:
kubectl patch cronworkflow < workflow-nam e > -p '{"spec":{"suspend":true}}'
Updating the Schedule
To change the schedule, modify the @schedule decorator and redeploy:
# Change from daily to hourly
@schedule ( hourly = True ) # was: @schedule(daily=True)
class MyFlow ( FlowSpec ):
pass
Then redeploy:
python myflow.py step-functions create
Examples
Data Pipeline with Hourly Updates
from metaflow import FlowSpec, schedule, step, Parameter, resources
@schedule ( hourly = True )
class HourlyETLFlow ( FlowSpec ):
"""ETL pipeline that runs every hour"""
source = Parameter( 'source' ,
help = 'Data source to process' ,
default = 's3://my-bucket/data/' )
@step
def start ( self ):
from datetime import datetime
self .timestamp = datetime.utcnow().isoformat()
print ( f "Starting ETL at { self .timestamp } " )
self .next( self .extract)
@resources ( memory = 4000 )
@step
def extract ( self ):
# Extract data from source
print ( f "Extracting data from { self .source } " )
self .data = self .load_data()
self .next( self .transform)
@resources ( memory = 8000 , cpu = 2 )
@step
def transform ( self ):
# Transform data
self .processed_data = self .process( self .data)
self .next( self .load)
@step
def load ( self ):
# Load to destination
self .save_results( self .processed_data)
self .next( self .end)
@step
def end ( self ):
print ( f "ETL completed at { self .timestamp } " )
Weekly Report Generation
from metaflow import FlowSpec, schedule, step, card
@schedule ( cron = '0 8 ? * MON *' ) # Every Monday at 8 AM UTC
class WeeklyReportFlow ( FlowSpec ):
"""Generate weekly business reports"""
@step
def start ( self ):
from datetime import datetime, timedelta
self .end_date = datetime.utcnow()
self .start_date = self .end_date - timedelta( days = 7 )
self .next( self .fetch_data)
@step
def fetch_data ( self ):
# Fetch last week's data
self .metrics = self .get_weekly_metrics(
self .start_date,
self .end_date
)
self .next( self .generate_report)
@card
@step
def generate_report ( self ):
# Generate visualizations and report
from metaflow.cards import Markdown, Table
report = Markdown( f "# Weekly Report \n\n "
f "Period: { self .start_date } to { self .end_date } " )
metrics_table = Table( self .metrics)
self .next( self .send_report)
@step
def send_report ( self ):
# Send report via email or Slack
self .notify_stakeholders()
self .next( self .end)
@step
def end ( self ):
print ( "Weekly report sent successfully" )
Complex Schedule with Multiple Time Zones
from metaflow import FlowSpec, schedule, step, project
# For global team: Run during business hours in multiple regions
@schedule ( cron = '0 9,14,22 * * *' ) # 9 AM, 2 PM, 10 PM UTC
class GlobalSyncFlow ( FlowSpec ):
"""Sync data for teams across time zones"""
@step
def start ( self ):
from datetime import datetime
import pytz
current_hour = datetime.utcnow().hour
if current_hour == 9 :
self .region = 'APAC'
self .tz = pytz.timezone( 'Asia/Tokyo' )
elif current_hour == 14 :
self .region = 'EMEA'
self .tz = pytz.timezone( 'Europe/London' )
else :
self .region = 'Americas'
self .tz = pytz.timezone( 'America/New_York' )
self .next( self .sync_region)
@step
def sync_region ( self ):
print ( f "Syncing data for { self .region } " )
self .next( self .end)
@step
def end ( self ):
pass
Best Practices
Always think in UTC when setting schedules to avoid confusion with daylight saving time changes. If you need local time, use Argo Workflows with the timezone parameter.
Ensure your flow can complete within the schedule interval. If a flow takes 2 hours to run, don’t schedule it to run every hour.
Use --workflow-timeout when deploying to prevent runaway executions: python myflow.py step-functions create --workflow-timeout 3600
Handle missing data gracefully
Scheduled flows should handle cases where expected data isn’t available yet. Use @retry or conditional logic.
Test schedule expressions
Use online cron expression testers to verify your schedule before deploying:
Troubleshooting
Flow Not Running on Schedule
Check the schedule syntax : Verify your cron expression is valid
Check orchestrator logs : Look for scheduling errors in AWS EventBridge or Argo
Verify deployment : Ensure the flow was deployed with create, not just triggered
Unexpected Run Times
Verify timezone : Remember that Step Functions uses UTC only
Check for delays : Orchestrators may have slight delays (typically < 1 minute)
Consider execution time : The schedule is for start time, not completion time
Multiple Concurrent Runs
Increase workflow timeout : Set appropriate limits
Adjust schedule frequency : Ensure flows complete before next run
Implement run locks : Use external coordination if needed
Next Steps
Event Triggering Learn about event-driven workflows
Monitoring Monitor scheduled flows in production