Skip to main content
The Schedule class allows you to configure when and how often a pipeline should run automatically.

Signature

Schedule(
    name: Optional[str] = None,
    cron_expression: Optional[str] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    interval_second: Optional[timedelta] = None,
    catchup: bool = False,
    run_once_start_time: Optional[datetime] = None,
)

Parameters

name
str
Optional name for the schedule. If not set, a default name will be generated based on the pipeline name and current date/time.
cron_expression
str
Cron expression defining when the pipeline should run. Takes precedence over start_time + interval_second if set.
start_time
datetime
When the schedule should start. If timezone-naive, treated as local timezone.
end_time
datetime
When the schedule should end. If timezone-naive, treated as local timezone.
interval_second
timedelta
Time interval between recurring runs for a periodic schedule.
catchup
bool
default:"False"
Whether to backfill missed runs if the schedule was paused. If True, catches up on all missed intervals. If False, only schedules the latest interval.
run_once_start_time
datetime
When to run the pipeline once (for one-time schedules). If timezone-naive, treated as local timezone.

Examples

Daily Schedule with Cron

from zenml import pipeline, Schedule

@pipeline
def daily_pipeline():
    # Pipeline steps
    pass

# Run every day at 2:00 AM
schedule = Schedule(
    name="daily_run",
    cron_expression="0 2 * * *"
)

daily_pipeline.run(schedule=schedule)

Hourly Schedule

from zenml import pipeline, Schedule
from datetime import timedelta

@pipeline
def hourly_pipeline():
    pass

# Run every hour
schedule = Schedule(
    name="hourly_run",
    interval_second=timedelta(hours=1)
)

hourly_pipeline.run(schedule=schedule)

Periodic Schedule with Start Time

from zenml import pipeline, Schedule
from datetime import datetime, timedelta, timezone

@pipeline
def training_pipeline():
    pass

# Run every 6 hours starting at a specific time
start = datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc)

schedule = Schedule(
    name="training_schedule",
    start_time=start,
    interval_second=timedelta(hours=6)
)

training_pipeline.run(schedule=schedule)

Schedule with End Time

from zenml import pipeline, Schedule
from datetime import datetime, timedelta, timezone

@pipeline
def experiment_pipeline():
    pass

# Run every hour for one week
start = datetime.now(timezone.utc)
end = start + timedelta(days=7)

schedule = Schedule(
    name="week_long_experiment",
    start_time=start,
    end_time=end,
    interval_second=timedelta(hours=1)
)

experiment_pipeline.run(schedule=schedule)

Complex Cron Schedule

from zenml import pipeline, Schedule

@pipeline
def business_hours_pipeline():
    pass

# Run every weekday at 9 AM, 12 PM, and 5 PM
schedule = Schedule(
    name="business_hours",
    cron_expression="0 9,12,17 * * 1-5"  # Mon-Fri at 9,12,17
)

business_hours_pipeline.run(schedule=schedule)

Common Cron Expressions

from zenml import Schedule

# Every minute
Schedule(cron_expression="* * * * *")

# Every 15 minutes
Schedule(cron_expression="*/15 * * * *")

# Every hour at minute 30
Schedule(cron_expression="30 * * * *")

# Every day at midnight
Schedule(cron_expression="0 0 * * *")

# Every day at 3:30 AM
Schedule(cron_expression="30 3 * * *")

# Every Monday at 9 AM
Schedule(cron_expression="0 9 * * 1")

# First day of every month at midnight
Schedule(cron_expression="0 0 1 * *")

# Every weekday at 6 PM
Schedule(cron_expression="0 18 * * 1-5")

Catchup Behavior

from zenml import pipeline, Schedule
from datetime import datetime, timedelta, timezone

@pipeline
def data_pipeline():
    pass

# With catchup=True: backfills all missed runs
schedule_with_catchup = Schedule(
    name="with_catchup",
    start_time=datetime.now(timezone.utc) - timedelta(days=7),
    interval_second=timedelta(hours=6),
    catchup=True  # Will run 28 times to catch up (7 days * 4 runs/day)
)

# With catchup=False: only runs the latest
schedule_no_catchup = Schedule(
    name="no_catchup",
    start_time=datetime.now(timezone.utc) - timedelta(days=7),
    interval_second=timedelta(hours=6),
    catchup=False  # Will only run once (latest interval)
)

One-Time Scheduled Run

from zenml import pipeline, Schedule
from datetime import datetime, timedelta, timezone

@pipeline
def one_time_pipeline():
    pass

# Run once tomorrow at 10 AM
run_time = datetime.now(timezone.utc) + timedelta(days=1)
run_time = run_time.replace(hour=10, minute=0, second=0, microsecond=0)

schedule = Schedule(
    name="one_time_run",
    run_once_start_time=run_time
)

one_time_pipeline.run(schedule=schedule)

Weekly Schedule

from zenml import pipeline, Schedule

@pipeline
def weekly_pipeline():
    pass

# Run every Sunday at 1 AM
schedule = Schedule(
    name="weekly_maintenance",
    cron_expression="0 1 * * 0"  # Sunday at 1 AM
)

weekly_pipeline.run(schedule=schedule)

Monthly Schedule

from zenml import pipeline, Schedule

@pipeline
def monthly_report():
    pass

# Run on the 1st and 15th of each month at midnight
schedule = Schedule(
    name="bi_monthly_report",
    cron_expression="0 0 1,15 * *"
)

monthly_report.run(schedule=schedule)

Timezone-Aware Schedule

from zenml import pipeline, Schedule
from datetime import datetime, timedelta, timezone, timedelta as td

@pipeline
def regional_pipeline():
    pass

# Schedule in a specific timezone (e.g., US Pacific)
pacific_tz = timezone(td(hours=-8))
start = datetime(2024, 1, 15, 9, 0, 0, tzinfo=pacific_tz)

schedule = Schedule(
    name="pacific_time_schedule",
    start_time=start,
    interval_second=timedelta(days=1)
)

regional_pipeline.run(schedule=schedule)

Complex Interval Schedule

from zenml import pipeline, Schedule
from datetime import timedelta

@pipeline
def frequent_pipeline():
    pass

# Run every 2 hours and 30 minutes
schedule = Schedule(
    name="custom_interval",
    interval_second=timedelta(hours=2, minutes=30)
)

frequent_pipeline.run(schedule=schedule)

Cron Expression Format

Cron expressions consist of 5 fields:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 7) (0 and 7 are Sunday)
│ │ │ │ │
* * * * *
Special characters:
  • * - Any value
  • , - Value list separator (e.g., 1,3,5)
  • - - Range of values (e.g., 1-5)
  • / - Step values (e.g., */15)

Use Cases

  1. Daily batch processing - Run data pipelines at specific times
  2. Regular model retraining - Retrain models on a schedule
  3. Periodic data ingestion - Fetch data from external sources
  4. Scheduled reports - Generate reports at regular intervals
  5. Maintenance tasks - Run cleanup or optimization tasks
  6. Monitoring pipelines - Regular health checks and monitoring
  7. Time-based experiments - Run experiments at specific times

Important Notes

  • If a datetime has no timezone, it’s treated as local timezone (with a warning)
  • cron_expression takes precedence over start_time + interval_second
  • Not all orchestrators support all schedule types
  • The schedule is managed by the orchestrator
  • Use catchup=False if your pipeline handles backfilling internally

@pipeline

Learn about pipelines

Build docs developers (and LLMs) love