Schedules allow you to automate pipeline execution at specific times or intervals. This is essential for production ML workflows that need to run regularly, such as data refreshes, model retraining, or batch processing.
Understanding Pipeline Schedules
ZenML supports two types of schedules:
Cron-based schedules : Use cron expressions for complex timing patterns
Interval-based schedules : Run pipelines at regular intervals
One-time schedules : Run a pipeline once at a specific time
Creating Your First Schedule
Import the Schedule class:
from zenml import pipeline, step, Schedule
from datetime import datetime, timedelta
Step 2: Define Your Pipeline
Create a pipeline that you want to schedule:
@step
def load_data ():
"""Load fresh data."""
print ( f "Loading data at { datetime.now() } " )
return { "timestamp" : datetime.now()}
@step
def process_data ( data : dict ):
"""Process the loaded data."""
print ( f "Processing data from { data[ 'timestamp' ] } " )
return data
@pipeline
def daily_processing_pipeline ():
"""Pipeline that processes data daily."""
data = load_data()
process_data(data)
Step 3: Create a Schedule
Define when the pipeline should run:
# Run every day at 2 AM
schedule = Schedule(
name = "daily_2am" ,
cron_expression = "0 2 * * *"
)
Step 4: Run Pipeline with Schedule
Execute the pipeline with the schedule:
if __name__ == "__main__" :
daily_processing_pipeline.with_options(
schedule = schedule
)()
Schedule Types
Cron-Based Schedules
Use cron expressions for precise timing control:
from zenml import Schedule
# Every day at 2:30 AM
schedule = Schedule(
name = "daily_processing" ,
cron_expression = "30 2 * * *"
)
# Every Monday at 9 AM
schedule = Schedule(
name = "weekly_training" ,
cron_expression = "0 9 * * 1"
)
# Every hour
schedule = Schedule(
name = "hourly_sync" ,
cron_expression = "0 * * * *"
)
# First day of every month at midnight
schedule = Schedule(
name = "monthly_report" ,
cron_expression = "0 0 1 * *"
)
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *
Interval-Based Schedules
Run pipelines at regular intervals:
from datetime import datetime, timedelta, timezone
from zenml import Schedule
# Every 6 hours, starting now
schedule = Schedule(
name = "every_6_hours" ,
start_time = datetime.now(timezone.utc),
interval_second = timedelta( hours = 6 )
)
# Every 30 minutes
schedule = Schedule(
name = "every_30_min" ,
start_time = datetime.now(timezone.utc),
interval_second = timedelta( minutes = 30 )
)
# Every 2 days
schedule = Schedule(
name = "every_2_days" ,
start_time = datetime.now(timezone.utc),
interval_second = timedelta( days = 2 )
)
One-Time Schedules
Schedule a pipeline to run once at a specific time:
from datetime import datetime, timezone
# Run once tomorrow at 3 PM
tomorrow_3pm = datetime.now(timezone.utc) + timedelta( days = 1 )
tomorrow_3pm = tomorrow_3pm.replace( hour = 15 , minute = 0 , second = 0 )
schedule = Schedule(
name = "one_time_batch" ,
run_once_start_time = tomorrow_3pm
)
Schedule Configuration Options
Start and End Times
Control when a schedule becomes active and when it stops:
from datetime import datetime, timedelta, timezone
start = datetime.now(timezone.utc)
end = start + timedelta( days = 30 ) # Run for 30 days
schedule = Schedule(
name = "limited_schedule" ,
cron_expression = "0 2 * * *" , # Daily at 2 AM
start_time = start,
end_time = end
)
Catchup Behavior
Control whether missed runs should be executed:
# Catchup enabled: Run all missed executions
schedule = Schedule(
name = "with_catchup" ,
cron_expression = "0 * * * *" , # Hourly
catchup = True # Execute missed runs
)
# Catchup disabled: Only run latest scheduled execution
schedule = Schedule(
name = "no_catchup" ,
cron_expression = "0 * * * *" ,
catchup = False # Skip missed runs
)
When to use catchup:
Enable catchup if each run processes unique data and missing a run means missing data
Disable catchup if only the latest run matters, or if your pipeline handles backfilling internally
Common Scheduling Patterns
Daily Model Training
Retrain models daily with fresh data:
from zenml import pipeline, Schedule
from datetime import datetime, timezone
@pipeline
def training_pipeline (
dataset_date : str ,
learning_rate : float = 0.001
):
"""Daily model training pipeline."""
data = load_daily_data(dataset_date)
train_data, test_data = split_data(data)
model = train_model(train_data, learning_rate)
metrics = evaluate_model(model, test_data)
promote_if_better(model, metrics)
# Schedule for daily training at 3 AM
schedule = Schedule(
name = "daily_training" ,
cron_expression = "0 3 * * *"
)
if __name__ == "__main__" :
training_pipeline.with_options(
schedule = schedule
)(
dataset_date = datetime.now(timezone.utc).strftime( "%Y-%m- %d " )
)
Hourly Data Processing
Process incoming data every hour:
@pipeline
def data_processing_pipeline ( batch_id : str ):
"""Process new data every hour."""
new_data = fetch_new_data(batch_id)
validated = validate_data(new_data)
transformed = transform_data(validated)
store_data(transformed)
schedule = Schedule(
name = "hourly_processing" ,
cron_expression = "0 * * * *" , # Every hour
catchup = False # Only process latest data
)
Weekly Model Evaluation
Evaluate model performance weekly:
@pipeline
def weekly_evaluation_pipeline ():
"""Comprehensive weekly model evaluation."""
production_model = load_production_model()
week_data = load_past_week_data()
metrics = evaluate_model(production_model, week_data)
report = generate_report(metrics)
send_alert_if_degraded(report)
schedule = Schedule(
name = "weekly_evaluation" ,
cron_expression = "0 9 * * 1" , # Monday at 9 AM
)
Monthly Batch Processing
Run large batch jobs monthly:
@pipeline
def monthly_batch_pipeline ( month : str ):
"""Process entire month of data."""
monthly_data = load_monthly_data(month)
aggregated = aggregate_data(monthly_data)
reports = generate_monthly_reports(aggregated)
archive_data(monthly_data)
schedule = Schedule(
name = "monthly_batch" ,
cron_expression = "0 0 1 * *" , # First of month at midnight
catchup = True # Process any missed months
)
Managing Scheduled Pipelines
Via CLI
# List all schedules
zenml schedule list
# Get schedule details
zenml schedule describe daily_training
# Pause a schedule
zenml schedule pause daily_training
# Resume a schedule
zenml schedule resume daily_training
# Delete a schedule
zenml schedule delete daily_training
Via Python Client
from zenml.client import Client
client = Client()
# List schedules
schedules = client.list_schedules()
# Get a specific schedule
schedule = client.get_schedule( "daily_training" )
# Pause a schedule
client.pause_schedule( "daily_training" )
# Resume a schedule
client.resume_schedule( "daily_training" )
# Delete a schedule
client.delete_schedule( "daily_training" )
Timezone Considerations
Always use timezone-aware datetimes:
from datetime import datetime, timezone
# Good: Timezone-aware datetime
schedule = Schedule(
name = "explicit_timezone" ,
start_time = datetime.now(timezone.utc),
cron_expression = "0 9 * * *"
)
# Avoid: Naive datetime (will use local timezone)
schedule = Schedule(
name = "implicit_timezone" ,
start_time = datetime.now(), # May cause issues
cron_expression = "0 9 * * *"
)
If you provide a datetime without timezone information, ZenML will treat it as a datetime in your local timezone and show a warning.
Combining Schedules with Deployment
You can deploy a pipeline and add a schedule:
@pipeline
def scheduled_inference_pipeline ( batch_size : int = 32 ):
"""Inference pipeline that runs on a schedule."""
model = load_production_model()
data = fetch_inference_data()
predictions = predict(model, data, batch_size)
store_predictions(predictions)
schedule = Schedule(
name = "hourly_inference" ,
cron_expression = "0 * * * *"
)
if __name__ == "__main__" :
scheduled_inference_pipeline.with_options(
schedule = schedule
)( batch_size = 64 )
Best Practices
Use UTC Times Always use UTC timezone for schedules to avoid daylight saving time issues
Consider Catchup Carefully Disable catchup for pipelines that handle backfilling internally
Set End Times Use end times for temporary or experimental schedules
Monitor Schedule Health Regularly check that scheduled pipelines are running successfully
Name Schedules Clearly Use descriptive names that indicate frequency and purpose
Test Before Scheduling Run your pipeline manually several times before putting it on a schedule
Troubleshooting
Schedule Not Running
Check schedule status:
zenml schedule describe my_schedule
Verify the schedule is not paused and the start time has passed.
Wrong Execution Time
Verify timezone settings:
from datetime import datetime, timezone
# Always use explicit UTC
start_time = datetime.now(timezone.utc)
Missed Runs
Check catchup settings:
# Enable catchup to execute missed runs
schedule = Schedule(
name = "my_schedule" ,
cron_expression = "0 * * * *" ,
catchup = True # Run missed executions
)
Cron Expression Examples
Every 15 Minutes
Every Day at Noon
Every Weekday at 9 AM
Every Quarter (Jan, Apr, Jul, Oct)
Twice Daily (6 AM and 6 PM)
Every Sunday at 3 AM
Next Steps
Deploying Pipelines Learn how to deploy pipelines for on-demand execution
Creating Pipelines Understand pipeline fundamentals
Stack Configuration Configure stacks for scheduled pipeline execution