Skip to main content

Overview

Schedules enable automatic pipeline execution on recurring intervals using cron expressions. They’re ideal for periodic data ingestion, model retraining, digital twin synchronization, and report generation.

Schedule Structure

pkg/models/schedule.go
type Schedule struct {
    ID           string
    ProjectID    string
    Name         string
    Description  string
    CronSchedule string     // Cron expression
    Pipelines    []string   // Pipeline IDs to execute
    Enabled      bool
    LastRun      *time.Time
    NextRun      *time.Time
    CreatedAt    time.Time
    UpdatedAt    time.Time
}

Creating a Schedule

curl -X POST http://localhost:8080/api/schedules \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "proj-uuid-1234",
    "name": "daily-customer-import",
    "description": "Import customer data every day at 2 AM",
    "cron_schedule": "0 2 * * *",
    "pipelines": ["pipe-uuid-5678", "pipe-uuid-9012"],
    "enabled": true
  }'

Cron Syntax

Mimir uses standard cron syntax with 5 fields:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday = 0)
│ │ │ │ │
* * * * *

Common Patterns

# Every hour
0 * * * *

# Every day at 2 AM
0 2 * * *

# Every day at 9:30 AM
30 9 * * *

# Twice daily (6 AM and 6 PM)
0 6,18 * * *

Execution Process

When a schedule triggers:
  1. Scheduler checks cron expression against current time
  2. For each pipeline in the schedule, a WorkTask is created
  3. WorkTasks are enqueued with priority 1 (default for scheduled tasks)
  4. Workers execute pipelines asynchronously
  5. Schedule’s last_run is updated
  6. next_run is calculated from cron expression
pkg/scheduler/service.go:217
func (s *Service) executeJob(job *models.Schedule) {
    log.Printf("Executing scheduled job: %s", job.Name)
    
    // Update last run time
    now := time.Now()
    job.LastRun = &now
    
    // Calculate next run
    schedule, _ := cron.ParseStandard(job.CronSchedule)
    nextRun := schedule.Next(now)
    job.NextRun = &nextRun
    
    s.store.SaveSchedule(job)
    
    // Create WorkTasks for all pipelines
    for _, pipelineID := range job.Pipelines {
        pipeline, _ := s.pipelineService.Get(pipelineID)
        
        workTask := &models.WorkTask{
            ID:       uuid.New().String(),
            Type:     models.WorkTaskTypePipelineExecution,
            Status:   models.WorkTaskStatusQueued,
            Priority: 1,
            TaskSpec: models.TaskSpec{
                PipelineID: pipelineID,
                ProjectID:  pipeline.ProjectID,
                Parameters: map[string]interface{}{
                    "trigger_type":  "scheduled",
                    "triggered_by":  job.ID,
                    "schedule_name": job.Name,
                },
            },
            ResourceRequirements: models.ResourceRequirements{
                CPU:    "500m",
                Memory: "1Gi",
            },
        }
        
        s.queue.Enqueue(workTask)
    }
}

Managing Schedules

Listing Schedules

# All schedules for a project
curl http://localhost:8080/api/projects/proj-uuid-1234/schedules

# All schedules (admin)
curl http://localhost:8080/api/schedules

Getting a Schedule

curl http://localhost:8080/api/schedules/sched-uuid-5555

Updating a Schedule

curl -X PATCH http://localhost:8080/api/schedules/sched-uuid-5555 \
  -H "Content-Type: application/json" \
  -d '{
    "cron_schedule": "0 3 * * *",
    "description": "Import at 3 AM instead of 2 AM",
    "enabled": true
  }'
Updating a schedule recalculates next_run based on the new cron expression.

Disabling a Schedule

curl -X PATCH http://localhost:8080/api/schedules/sched-uuid-5555 \
  -H "Content-Type: application/json" \
  -d '{"enabled": false}'

Deleting a Schedule

curl -X DELETE http://localhost:8080/api/schedules/sched-uuid-5555

Use Cases

Daily customer import:
{
  "name": "daily-customer-import",
  "cron_schedule": "0 2 * * *",
  "pipelines": ["customer-import-pipeline"],
  "description": "Import customers from CRM every night"
}
Runs nightly at 2 AM to fetch fresh customer data.

Monitoring Schedule Execution

Check when schedules last ran and when they’ll run next:
curl http://localhost:8080/api/schedules/sched-uuid-5555
Response:
{
  "id": "sched-uuid-5555",
  "name": "daily-customer-import",
  "cron_schedule": "0 2 * * *",
  "enabled": true,
  "last_run": "2026-03-01T02:00:00Z",
  "next_run": "2026-03-02T02:00:00Z",
  "pipelines": ["pipe-uuid-5678"]
}
Use last_run and next_run to verify schedule health.

Best Practices

Schedule resource-intensive jobs during off-peak hours:
# 2 AM is typically low-traffic
0 2 * * *

# Weekends for batch processing
0 0 * * 0,6
Reduces impact on production systems.
Schedules run in the orchestrator’s timezone (default: UTC).Configure project timezone:
{
  "settings": {
    "timezone": "America/New_York"
  }
}
Or adjust cron expressions for local time:
  • 9 AM EST = 14 * * * * (UTC)
  • 2 PM PST = 22 * * * * (UTC)
Pipelines execute concurrently by default. For sequential execution:Option 1: Create separate schedules with staggered times
{"cron_schedule": "0 2 * * *", "pipelines": ["step-1"]}
{"cron_schedule": "30 2 * * *", "pipelines": ["step-2"]}
Option 2: Chain pipelines internally (one pipeline calls another)
Schedule continues even if pipeline fails. To implement retries:
  1. Check pipeline execution status via API
  2. Create a monitoring schedule that checks for failed executions
  3. Trigger retry pipeline for failures
Or build retry logic into the pipeline itself.
Test cron expressions before deployment:Online tools:Dry run: Create schedule as disabled, verify next_run, then enable.

Validation

Expression is validated using standard cron parser:
pkg/scheduler/service.go:308
if _, err := cron.ParseStandard(req.CronSchedule); err != nil {
    return fmt.Errorf("invalid cron expression: %w", err)
}
Invalid expressions are rejected at creation time.
All pipeline IDs are verified before schedule creation:
pkg/scheduler/service.go:301
for _, pipelineID := range req.Pipelines {
    if _, err := s.pipelineService.Get(pipelineID); err != nil {
        return fmt.Errorf("pipeline not found: %s", pipelineID)
    }
}
Schedule names must be non-empty but don’t need to be unique.Use descriptive names:
  • daily-customer-import
  • weekly-model-retrain
  • hourly-metrics-sync

Troubleshooting

Check status:
curl http://localhost:8080/api/schedules/sched-uuid-5555
Verify:
  • enabled: true
  • next_run is in the future
  • Orchestrator service is running
  • Scheduler was started (Start() called)
Check pipeline execution logs:
curl http://localhost:8080/api/pipelines/pipe-uuid-5678/executions
Look for:
  • status: "failed"
  • error field with failure reason
  • trigger_type: "scheduled"
If orchestrator is down during scheduled time, execution is skipped.Schedules do not queue missed runs. Next execution occurs at next_run.For critical jobs, implement gap detection:
SELECT ?lastRun WHERE {
  ?schedule :lastRun ?lastRun .
  FILTER (?lastRun < NOW() - DURATION("PT25H"))
}

Next Steps

Pipelines

Create pipelines to schedule.

Projects

Organize schedules within projects.

Digital Twins

Schedule digital twin synchronization.

Build docs developers (and LLMs) love