Mage provides a powerful scheduling system through Pipeline Schedules (also called triggers) that allow you to automate pipeline execution based on time intervals, events, or API calls.
from mage_ai.data_preparation.models.triggers import ScheduleInterval# Run onceschedule_interval = ScheduleInterval.ONCE# Run continuously (always on)schedule_interval = ScheduleInterval.ALWAYS_ON# Run every hourschedule_interval = ScheduleInterval.HOURLY# Run every dayschedule_interval = ScheduleInterval.DAILY# Run every weekschedule_interval = ScheduleInterval.WEEKLY# Run every monthschedule_interval = ScheduleInterval.MONTHLY
# Run every 15 minutesschedule_interval = '*/15 * * * *'# Run at 2:30 AM every dayschedule_interval = '30 2 * * *'# Run every Monday at 9 AMschedule_interval = '0 9 * * 1'# Run on the 1st of every month at midnightschedule_interval = '0 0 1 * *'# Run every weekday at 6 PMschedule_interval = '0 18 * * 1-5'
schedule.settings = { # Skip execution if previous run is still running 'skip_if_previous_running': True, # Maximum execution time (seconds) 'timeout': 3600, # Status to set if timeout is reached 'timeout_status': 'cancelled', # Enable landing time scheduling 'landing_time_enabled': True, # Create initial pipeline run when activated 'create_initial_pipeline_run': False}await schedule.update(settings=schedule.settings)
The execution_date represents the logical date/time for which the pipeline is processing data:
# Access execution date in blocks@data_loaderdef load_data(*args, **kwargs): # Get execution date from kwargs execution_date = kwargs.get('execution_date') # Use it to load data for that time period query = f""" SELECT * FROM events WHERE event_date = '{execution_date.date()}' """ return run_query(query)
Landing time scheduling ensures pipelines complete by a specific time, accounting for expected runtime:
schedule = PipelineSchedule.create( name='morning_report', pipeline_uuid='daily_report', schedule_type=ScheduleType.TIME, schedule_interval=ScheduleInterval.DAILY, start_time=datetime(2024, 1, 1, 9, 0, 0, tzinfo=pytz.UTC), # Target completion time status=ScheduleStatus.ACTIVE, settings={ 'landing_time_enabled': True # Enable landing time })# The scheduler will start the pipeline early based on historical runtimes# to ensure it completes by 9:00 AM
# Get last run statuslast_status = schedule.last_pipeline_run_statusprint(f'Last run: {last_status}')# Get recent runsrecent_runs = schedule.recently_completed_pipeline_runs(sample_size=10)for run in recent_runs: print(f'{run.execution_date}: {run.status}')# Calculate average runtimeavg_runtime = schedule.runtime_average(sample_size=20)print(f'Average runtime: {avg_runtime} seconds')
# Get all active schedulesactive_schedules = PipelineSchedule.active_schedules( pipeline_uuids=['pipeline1', 'pipeline2'])for schedule in active_schedules: print(f'{schedule.name}: {schedule.schedule_interval}')