Skip to main content
Schedules allow you to automate pipeline execution at specific times or intervals. This is essential for production ML workflows that need to run regularly, such as data refreshes, model retraining, or batch processing.

Understanding Pipeline Schedules

ZenML supports two types of schedules:
  • Cron-based schedules: Use cron expressions for complex timing patterns
  • Interval-based schedules: Run pipelines at regular intervals
  • One-time schedules: Run a pipeline once at a specific time

Creating Your First Schedule

1
Step 1: Import Schedule
2
Import the Schedule class:
3
from zenml import pipeline, step, Schedule
from datetime import datetime, timedelta
4
Step 2: Define Your Pipeline
5
Create a pipeline that you want to schedule:
6
@step
def load_data():
    """Load fresh data."""
    print(f"Loading data at {datetime.now()}")
    return {"timestamp": datetime.now()}

@step
def process_data(data: dict):
    """Process the loaded data."""
    print(f"Processing data from {data['timestamp']}")
    return data

@pipeline
def daily_processing_pipeline():
    """Pipeline that processes data daily."""
    data = load_data()
    process_data(data)
7
Step 3: Create a Schedule
8
Define when the pipeline should run:
9
# Run every day at 2 AM
schedule = Schedule(
    name="daily_2am",
    cron_expression="0 2 * * *"
)
10
Step 4: Run Pipeline with Schedule
11
Execute the pipeline with the schedule:
12
if __name__ == "__main__":
    daily_processing_pipeline.with_options(
        schedule=schedule
    )()

Schedule Types

Cron-Based Schedules

Use cron expressions for precise timing control:
from zenml import Schedule

# Every day at 2:30 AM
schedule = Schedule(
    name="daily_processing",
    cron_expression="30 2 * * *"
)

# Every Monday at 9 AM
schedule = Schedule(
    name="weekly_training",
    cron_expression="0 9 * * 1"
)

# Every hour
schedule = Schedule(
    name="hourly_sync",
    cron_expression="0 * * * *"
)

# First day of every month at midnight
schedule = Schedule(
    name="monthly_report",
    cron_expression="0 0 1 * *"
)

Cron Expression Format

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *

Interval-Based Schedules

Run pipelines at regular intervals:
from datetime import datetime, timedelta, timezone
from zenml import Schedule

# Every 6 hours, starting now
schedule = Schedule(
    name="every_6_hours",
    start_time=datetime.now(timezone.utc),
    interval_second=timedelta(hours=6)
)

# Every 30 minutes
schedule = Schedule(
    name="every_30_min",
    start_time=datetime.now(timezone.utc),
    interval_second=timedelta(minutes=30)
)

# Every 2 days
schedule = Schedule(
    name="every_2_days",
    start_time=datetime.now(timezone.utc),
    interval_second=timedelta(days=2)
)

One-Time Schedules

Schedule a pipeline to run once at a specific time:
from datetime import datetime, timezone

# Run once tomorrow at 3 PM
tomorrow_3pm = datetime.now(timezone.utc) + timedelta(days=1)
tomorrow_3pm = tomorrow_3pm.replace(hour=15, minute=0, second=0)

schedule = Schedule(
    name="one_time_batch",
    run_once_start_time=tomorrow_3pm
)

Schedule Configuration Options

Start and End Times

Control when a schedule becomes active and when it stops:
from datetime import datetime, timedelta, timezone

start = datetime.now(timezone.utc)
end = start + timedelta(days=30)  # Run for 30 days

schedule = Schedule(
    name="limited_schedule",
    cron_expression="0 2 * * *",  # Daily at 2 AM
    start_time=start,
    end_time=end
)

Catchup Behavior

Control whether missed runs should be executed:
# Catchup enabled: Run all missed executions
schedule = Schedule(
    name="with_catchup",
    cron_expression="0 * * * *",  # Hourly
    catchup=True  # Execute missed runs
)

# Catchup disabled: Only run latest scheduled execution
schedule = Schedule(
    name="no_catchup",
    cron_expression="0 * * * *",
    catchup=False  # Skip missed runs
)
When to use catchup:
  • Enable catchup if each run processes unique data and missing a run means missing data
  • Disable catchup if only the latest run matters, or if your pipeline handles backfilling internally

Common Scheduling Patterns

Daily Model Training

Retrain models daily with fresh data:
from zenml import pipeline, Schedule
from datetime import datetime, timezone

@pipeline
def training_pipeline(
    dataset_date: str,
    learning_rate: float = 0.001
):
    """Daily model training pipeline."""
    data = load_daily_data(dataset_date)
    train_data, test_data = split_data(data)
    model = train_model(train_data, learning_rate)
    metrics = evaluate_model(model, test_data)
    promote_if_better(model, metrics)

# Schedule for daily training at 3 AM
schedule = Schedule(
    name="daily_training",
    cron_expression="0 3 * * *"
)

if __name__ == "__main__":
    training_pipeline.with_options(
        schedule=schedule
    )(
        dataset_date=datetime.now(timezone.utc).strftime("%Y-%m-%d")
    )

Hourly Data Processing

Process incoming data every hour:
@pipeline
def data_processing_pipeline(batch_id: str):
    """Process new data every hour."""
    new_data = fetch_new_data(batch_id)
    validated = validate_data(new_data)
    transformed = transform_data(validated)
    store_data(transformed)

schedule = Schedule(
    name="hourly_processing",
    cron_expression="0 * * * *",  # Every hour
    catchup=False  # Only process latest data
)

Weekly Model Evaluation

Evaluate model performance weekly:
@pipeline
def weekly_evaluation_pipeline():
    """Comprehensive weekly model evaluation."""
    production_model = load_production_model()
    week_data = load_past_week_data()
    
    metrics = evaluate_model(production_model, week_data)
    report = generate_report(metrics)
    send_alert_if_degraded(report)

schedule = Schedule(
    name="weekly_evaluation",
    cron_expression="0 9 * * 1",  # Monday at 9 AM
)

Monthly Batch Processing

Run large batch jobs monthly:
@pipeline
def monthly_batch_pipeline(month: str):
    """Process entire month of data."""
    monthly_data = load_monthly_data(month)
    aggregated = aggregate_data(monthly_data)
    reports = generate_monthly_reports(aggregated)
    archive_data(monthly_data)

schedule = Schedule(
    name="monthly_batch",
    cron_expression="0 0 1 * *",  # First of month at midnight
    catchup=True  # Process any missed months
)

Managing Scheduled Pipelines

Via CLI

# List all schedules
zenml schedule list

# Get schedule details
zenml schedule describe daily_training

# Pause a schedule
zenml schedule pause daily_training

# Resume a schedule
zenml schedule resume daily_training

# Delete a schedule
zenml schedule delete daily_training

Via Python Client

from zenml.client import Client

client = Client()

# List schedules
schedules = client.list_schedules()

# Get a specific schedule
schedule = client.get_schedule("daily_training")

# Pause a schedule
client.pause_schedule("daily_training")

# Resume a schedule
client.resume_schedule("daily_training")

# Delete a schedule
client.delete_schedule("daily_training")

Timezone Considerations

Always use timezone-aware datetimes:
from datetime import datetime, timezone

# Good: Timezone-aware datetime
schedule = Schedule(
    name="explicit_timezone",
    start_time=datetime.now(timezone.utc),
    cron_expression="0 9 * * *"
)

# Avoid: Naive datetime (will use local timezone)
schedule = Schedule(
    name="implicit_timezone",
    start_time=datetime.now(),  # May cause issues
    cron_expression="0 9 * * *"
)
If you provide a datetime without timezone information, ZenML will treat it as a datetime in your local timezone and show a warning.

Combining Schedules with Deployment

You can deploy a pipeline and add a schedule:
@pipeline
def scheduled_inference_pipeline(batch_size: int = 32):
    """Inference pipeline that runs on a schedule."""
    model = load_production_model()
    data = fetch_inference_data()
    predictions = predict(model, data, batch_size)
    store_predictions(predictions)

schedule = Schedule(
    name="hourly_inference",
    cron_expression="0 * * * *"
)

if __name__ == "__main__":
    scheduled_inference_pipeline.with_options(
        schedule=schedule
    )(batch_size=64)

Best Practices

Use UTC Times

Always use UTC timezone for schedules to avoid daylight saving time issues

Consider Catchup Carefully

Disable catchup for pipelines that handle backfilling internally

Set End Times

Use end times for temporary or experimental schedules

Monitor Schedule Health

Regularly check that scheduled pipelines are running successfully

Name Schedules Clearly

Use descriptive names that indicate frequency and purpose

Test Before Scheduling

Run your pipeline manually several times before putting it on a schedule

Troubleshooting

Schedule Not Running

Check schedule status:
zenml schedule describe my_schedule
Verify the schedule is not paused and the start time has passed.

Wrong Execution Time

Verify timezone settings:
from datetime import datetime, timezone

# Always use explicit UTC
start_time = datetime.now(timezone.utc)

Missed Runs

Check catchup settings:
# Enable catchup to execute missed runs
schedule = Schedule(
    name="my_schedule",
    cron_expression="0 * * * *",
    catchup=True  # Run missed executions
)

Cron Expression Examples

*/15 * * * *

Next Steps

Deploying Pipelines

Learn how to deploy pipelines for on-demand execution

Creating Pipelines

Understand pipeline fundamentals

Stack Configuration

Configure stacks for scheduled pipeline execution

Build docs developers (and LLMs) love