Skip to main content

Overview

Mage provides a powerful scheduling system through Pipeline Schedules (also called triggers) that allow you to automate pipeline execution based on time intervals, events, or API calls.

Schedule Types

Mage supports three types of pipeline schedules:

Time-Based Schedules

Execute pipelines at regular intervals or specific times.
from mage_ai.orchestration.db.models.schedules import PipelineSchedule
from mage_ai.data_preparation.models.triggers import ScheduleType, ScheduleInterval
from datetime import datetime
import pytz

# Create a time-based schedule
schedule = PipelineSchedule.create(
    name='daily_etl_trigger',
    pipeline_uuid='my_pipeline',
    schedule_type=ScheduleType.TIME,
    schedule_interval=ScheduleInterval.DAILY,
    start_time=datetime(2024, 1, 1, 9, 0, 0, tzinfo=pytz.UTC),
    status=ScheduleStatus.ACTIVE,
    variables={'env': 'production'}
)
Schedule Intervals:
from mage_ai.data_preparation.models.triggers import ScheduleInterval

# Run once
schedule_interval = ScheduleInterval.ONCE

# Run continuously (always on)
schedule_interval = ScheduleInterval.ALWAYS_ON

# Run every hour
schedule_interval = ScheduleInterval.HOURLY

# Run every day
schedule_interval = ScheduleInterval.DAILY

# Run every week
schedule_interval = ScheduleInterval.WEEKLY

# Run every month
schedule_interval = ScheduleInterval.MONTHLY

Event-Based Schedules

Trigger pipelines based on external events.
from mage_ai.data_preparation.models.triggers import ScheduleType

# Create event-based schedule
event_schedule = PipelineSchedule.create(
    name='s3_file_upload_trigger',
    pipeline_uuid='process_uploads',
    schedule_type=ScheduleType.EVENT,
    status=ScheduleStatus.ACTIVE,
    variables={
        'event_type': 's3:ObjectCreated:*',
        'bucket': 'my-data-bucket'
    }
)

API Schedules

Trigger pipelines through API calls.
# Create API-triggered schedule
api_schedule = PipelineSchedule.create(
    name='api_trigger',
    pipeline_uuid='on_demand_pipeline',
    schedule_type=ScheduleType.API,
    status=ScheduleStatus.ACTIVE,
    token='unique-api-token-123'  # Auto-generated if not provided
)

# Trigger via API
import requests

response = requests.post(
    'http://mage-instance/api/pipeline_schedules/123/pipeline_runs/unique-api-token-123',
    json={
        'pipeline_run': {
            'variables': {
                'date': '2024-01-15',
                'env': 'production'
            }
        }
    }
)

Creating Schedules

Using the PipelineSchedule Model

from mage_ai.orchestration.db.models.schedules import PipelineSchedule
from mage_ai.data_preparation.models.triggers import (
    ScheduleType,
    ScheduleInterval,
    ScheduleStatus
)
from datetime import datetime
import pytz

# Create a schedule
schedule = PipelineSchedule.create(
    name='hourly_data_sync',
    description='Sync customer data every hour',
    pipeline_uuid='customer_sync_pipeline',
    schedule_type=ScheduleType.TIME,
    schedule_interval=ScheduleInterval.HOURLY,
    start_time=datetime.now(tz=pytz.UTC),
    status=ScheduleStatus.ACTIVE,
    variables={
        'env': 'production',
        'region': 'us-west-2'
    },
    sla=3600,  # SLA in seconds (1 hour)
    settings={
        'skip_if_previous_running': True,
        'timeout': 1800  # 30 minutes
    }
)

Using the Trigger Config

from mage_ai.data_preparation.models.triggers import Trigger, add_or_update_trigger_for_pipeline_and_persist
import pytz
from datetime import datetime

# Create trigger config
trigger = Trigger(
    name='nightly_etl',
    pipeline_uuid='etl_pipeline',
    schedule_type=ScheduleType.TIME,
    schedule_interval=ScheduleInterval.DAILY,
    start_time=datetime(2024, 1, 1, 2, 0, 0, tzinfo=pytz.UTC),
    status=ScheduleStatus.ACTIVE,
    variables={'env': 'production'},
    sla=7200
)

# Persist trigger
add_or_update_trigger_for_pipeline_and_persist(
    trigger,
    'etl_pipeline'
)

Schedule Configuration

Schedule Properties

name
string
required
Unique name for the schedule
pipeline_uuid
string
required
UUID of the pipeline to schedule
schedule_type
ScheduleType
required
Type of schedule: time, event, or api
schedule_interval
string
Interval for time-based schedules: @once, @hourly, @daily, @weekly, @monthly, or cron expression
start_time
datetime
When to start executing the schedule
status
ScheduleStatus
Schedule status: active or inactive
variables
Dict
Variables to pass to pipeline runs
sla
int
Service Level Agreement in seconds - alert if pipeline run exceeds this duration
settings
Dict
Additional schedule settings

Advanced Settings

schedule.settings = {
    # Skip execution if previous run is still running
    'skip_if_previous_running': True,
    
    # Maximum execution time (seconds)
    'timeout': 3600,
    
    # Status to set if timeout is reached
    'timeout_status': 'cancelled',
    
    # Enable landing time scheduling
    'landing_time_enabled': True,
    
    # Create initial pipeline run when activated
    'create_initial_pipeline_run': False
}

await schedule.update(settings=schedule.settings)

Managing Schedules

Activate/Deactivate

# Activate schedule
schedule.update(
    status=ScheduleStatus.ACTIVE,
    last_enabled_at=datetime.now(tz=pytz.UTC)
)

# Deactivate schedule
schedule.update(status=ScheduleStatus.INACTIVE)

Update Schedule

# Update schedule interval
schedule.update(
    schedule_interval=ScheduleInterval.DAILY,
    start_time=datetime(2024, 1, 1, 9, 0, 0, tzinfo=pytz.UTC)
)

# Update variables
schedule.update(
    variables={
        'env': 'production',
        'region': 'eu-west-1',
        'batch_size': 1000
    }
)

Delete Schedule

schedule.delete()

Pipeline Runs

Each execution of a scheduled pipeline creates a PipelineRun:
from mage_ai.orchestration.db.models.schedules import PipelineRun

# Get pipeline runs for a schedule
pipeline_runs = schedule.pipeline_runs

for run in pipeline_runs:
    print(f'Run {run.id}: {run.status}')
    print(f'  Execution date: {run.execution_date}')
    print(f'  Started: {run.started_at}')
    print(f'  Completed: {run.completed_at}')
    print(f'  Block runs: {run.block_runs_count}')

Pipeline Run Status

from mage_ai.orchestration.db.models.schedules import PipelineRun

class PipelineRunStatus:
    INITIAL = 'initial'      # Created but not started
    RUNNING = 'running'      # Currently executing
    COMPLETED = 'completed'  # Successfully finished
    FAILED = 'failed'        # Failed with errors
    CANCELLED = 'cancelled'  # Manually cancelled

Create Manual Run

# Create a manual pipeline run
from mage_ai.orchestration.db.models.schedules import PipelineRun
from datetime import datetime
import pytz

pipeline_run = PipelineRun.create(
    pipeline_schedule_id=schedule.id,
    pipeline_uuid='my_pipeline',
    execution_date=datetime.now(tz=pytz.UTC),
    variables={
        'date': '2024-01-15',
        'env': 'production'
    },
    status=PipelineRun.PipelineRunStatus.INITIAL
)

Execution Dates and Partitions

Execution Date

The execution_date represents the logical date/time for which the pipeline is processing data:
# Access execution date in blocks
@data_loader
def load_data(*args, **kwargs):
    # Get execution date from kwargs
    execution_date = kwargs.get('execution_date')
    
    # Use it to load data for that time period
    query = f"""
        SELECT * FROM events
        WHERE event_date = '{execution_date.date()}'
    """
    
    return run_query(query)

Execution Partition

Partitions organize outputs and logs for different runs:
# Execution partition format
# {pipeline_schedule_id}/{execution_date}
# Example: 123/20240115T093000

pipeline_run = PipelineRun.query.get(456)
print(pipeline_run.execution_partition)
# Output: 123/20240115T093000

# Variables stored with partition
# .variables/pipelines/my_pipeline/123/20240115T093000/block_uuid/output_0

Landing Time Scheduling

Landing time scheduling ensures pipelines complete by a specific time, accounting for expected runtime:
schedule = PipelineSchedule.create(
    name='morning_report',
    pipeline_uuid='daily_report',
    schedule_type=ScheduleType.TIME,
    schedule_interval=ScheduleInterval.DAILY,
    start_time=datetime(2024, 1, 1, 9, 0, 0, tzinfo=pytz.UTC),  # Target completion time
    status=ScheduleStatus.ACTIVE,
    settings={
        'landing_time_enabled': True  # Enable landing time
    }
)

# The scheduler will start the pipeline early based on historical runtimes
# to ensure it completes by 9:00 AM

Service Level Agreements (SLA)

Set SLAs to monitor pipeline execution times:
# Set 1-hour SLA
schedule.update(sla=3600)

# Check if run passed SLA
pipeline_run = PipelineRun.query.get(123)
if pipeline_run.passed_sla:
    print('Pipeline completed within SLA')
else:
    print('SLA breach detected!')
    # Send alert

Concurrency Control

Skip if Previous Running

schedule.settings = {
    'skip_if_previous_running': True
}
When enabled, Mage skips scheduling a new run if the previous run is still executing.

Multiple Runs

By default, pipelines can have multiple concurrent runs. Check in-progress runs:
# Check running pipeline runs
in_progress_count = schedule.pipeline_in_progress_runs_count
print(f'{in_progress_count} runs currently in progress')

Backfills

Backfill missing pipeline runs for historical dates:
from mage_ai.orchestration.db.models.schedules import Backfill
from datetime import datetime, timedelta
import pytz

# Create backfill
backfill = Backfill.create(
    pipeline_schedule_id=schedule.id,
    name='january_backfill',
    start_datetime=datetime(2024, 1, 1, tzinfo=pytz.UTC),
    end_datetime=datetime(2024, 1, 31, tzinfo=pytz.UTC),
    interval_type='day',  # 'hour', 'day', 'week', 'month'
    interval_units=1,
    variables={'env': 'production'}
)

# Start backfill
backfill.start()

Runtime Variables

Schedule Variables

Set default variables for all runs:
schedule.variables = {
    'env': 'production',
    'region': 'us-west-2',
    'batch_size': 1000
}

Runtime Variables

Override variables for specific runs:
pipeline_run = PipelineRun.create(
    pipeline_schedule_id=schedule.id,
    pipeline_uuid='my_pipeline',
    variables={
        'env': 'staging',  # Override schedule variable
        'date': '2024-01-15'  # Add new variable
    }
)

Built-in Variables

Mage provides built-in variables:
@data_loader
def load_data(*args, **kwargs):
    # Execution date
    execution_date = kwargs['execution_date']
    
    # Execution partition
    execution_partition = kwargs['execution_partition']
    
    # Pipeline run ID
    pipeline_run_id = kwargs.get('pipeline_run_id')
    
    # Block run ID
    block_run_id = kwargs.get('block_run_id')
    
    # Environment from variables
    env = kwargs.get('env', 'development')

Monitoring Schedules

Check Schedule Health

# Get last run status
last_status = schedule.last_pipeline_run_status
print(f'Last run: {last_status}')

# Get recent runs
recent_runs = schedule.recently_completed_pipeline_runs(sample_size=10)
for run in recent_runs:
    print(f'{run.execution_date}: {run.status}')

# Calculate average runtime
avg_runtime = schedule.runtime_average(sample_size=20)
print(f'Average runtime: {avg_runtime} seconds')

Active Schedules

# Get all active schedules
active_schedules = PipelineSchedule.active_schedules(
    pipeline_uuids=['pipeline1', 'pipeline2']
)

for schedule in active_schedules:
    print(f'{schedule.name}: {schedule.schedule_interval}')

Best Practices

Use Descriptive NamesName schedules clearly to indicate their purpose: hourly_customer_sync, daily_sales_report, monthly_aggregation
Set Appropriate SLAsConfigure SLAs based on business requirements to get alerted when pipelines take longer than expected.
Timezone AwarenessAlways use timezone-aware datetime objects with pytz.UTC or the appropriate timezone for your use case.
Landing Time for Critical PipelinesEnable landing time scheduling for pipelines that must complete by a specific deadline, like morning reports.

Next Steps

Dynamic Blocks

Parallel execution with dynamic blocks

Monitoring

Monitor pipeline runs and schedules

Notifications

Configure alerts for pipeline events

Backfills

Run historical pipeline executions

Build docs developers (and LLMs) love