Skip to main content

Introduction

DBOS provides a durable, distributed scheduler that executes workflows based on cron schedules. Scheduled workflows are resilient to failures and automatically deduplicated across multiple application instances.

Decorator-Based Scheduling

Use the @DBOS.scheduled() decorator to schedule workflows directly in code:
from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.scheduled("0 0 * * *")  # Every day at midnight
@DBOS.workflow()
def daily_report_workflow(scheduled_at: datetime, context: Any) -> None:
    """Generate and send daily reports."""
    DBOS.logger.info(f"Generating report for {scheduled_at}")
    
    # Generate report
    report = generate_daily_report(scheduled_at)
    
    # Send to recipients
    send_report_email(report)

@DBOS.step()
def generate_daily_report(date: datetime) -> dict:
    """Generate report data."""
    return {
        "date": date.isoformat(),
        "metrics": calculate_metrics(date)
    }

@DBOS.step()
def send_report_email(report: dict) -> None:
    """Send report via email."""
    email_service.send("[email protected]", "Daily Report", report)
Scheduled workflows must accept two parameters: scheduled_at (datetime) and context (Any).

Cron Expression Format

DBOS supports extended cron expressions with seconds precision (6 fields):
┌────────── second (0 - 59)
│ ┌──────── minute (0 - 59)
│ │ ┌────── hour (0 - 23)
│ │ │ ┌──── day of month (1 - 31)
│ │ │ │ ┌── month (1 - 12)
│ │ │ │ │ ┌ day of week (0 - 6) (Sunday = 0)
│ │ │ │ │ │
* * * * * *

Common Patterns

# Every 30 seconds
@DBOS.scheduled("*/30 * * * * *")

# Every minute
@DBOS.scheduled("0 * * * * *")

# Every 5 minutes
@DBOS.scheduled("0 */5 * * * *")

# Every 15 minutes
@DBOS.scheduled("0 */15 * * * *")
Always include 6 fields (including seconds). Standard 5-field cron expressions are not supported.

Programmatic Scheduling

Create and manage schedules dynamically using the DBOS API:

Creating Schedules

from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.workflow()
def backup_database(scheduled_at: datetime, context: Any) -> None:
    """Perform database backup."""
    backup_name = context["backup_name"]
    DBOS.logger.info(f"Starting backup: {backup_name}")
    
    # Perform backup
    create_backup(backup_name, scheduled_at)

# Create a schedule programmatically
DBOS.create_schedule(
    schedule_name="nightly-backup",
    workflow_fn=backup_database,
    schedule="0 0 2 * * *",  # 2 AM daily
    context={"backup_name": "production"}
)

Listing Schedules

# List all schedules
all_schedules = DBOS.list_schedules()
for schedule in all_schedules:
    print(f"Name: {schedule['schedule_name']}")
    print(f"Workflow: {schedule['workflow_name']}")
    print(f"Cron: {schedule['schedule']}")
    print(f"Status: {schedule['status']}")
    print(f"Context: {schedule['context']}")
    print()

# Filter by status
active_schedules = DBOS.list_schedules(status="ACTIVE")
paused_schedules = DBOS.list_schedules(status="PAUSED")

# Filter by workflow name
backup_schedules = DBOS.list_schedules(
    workflow_name="backup_database"
)

# Filter by schedule name prefix
report_schedules = DBOS.list_schedules(
    schedule_name_prefix="report-"
)

Getting a Specific Schedule

# Get schedule by name
schedule = DBOS.get_schedule("nightly-backup")

if schedule:
    print(f"Schedule: {schedule['schedule']}")
    print(f"Context: {schedule['context']}")
    print(f"Status: {schedule['status']}")
else:
    print("Schedule not found")

Pausing and Resuming

# Pause a schedule (stops firing, but doesn't delete)
DBOS.pause_schedule("nightly-backup")

# Resume a paused schedule
DBOS.resume_schedule("nightly-backup")

# Check status
schedule = DBOS.get_schedule("nightly-backup")
if schedule["status"] == "ACTIVE":
    print("Schedule is running")
else:
    print("Schedule is paused")

Deleting Schedules

# Delete a schedule
DBOS.delete_schedule("nightly-backup")

# Verify deletion
schedule = DBOS.get_schedule("nightly-backup")
print(schedule)  # None

Apply Schedules Atomically

Replace multiple schedules atomically:
@DBOS.workflow()
def hourly_sync(scheduled_at: datetime, context: Any) -> None:
    sync_data(context["source"])

@DBOS.workflow()
def daily_cleanup(scheduled_at: datetime, context: Any) -> None:
    cleanup_old_data(context["retention_days"])

# Apply multiple schedules at once
DBOS.apply_schedules([
    {
        "schedule_name": "hourly-sync",
        "workflow_fn": hourly_sync,
        "schedule": "0 0 * * * *",
        "context": {"source": "external-api"}
    },
    {
        "schedule_name": "daily-cleanup",
        "workflow_fn": daily_cleanup,
        "schedule": "0 0 3 * * *",
        "context": {"retention_days": 30}
    }
])

# Later, update schedules atomically
DBOS.apply_schedules([
    {
        "schedule_name": "hourly-sync",
        "workflow_fn": hourly_sync,
        "schedule": "0 */30 * * * *",  # Changed to every 30 minutes
        "context": {"source": "external-api"}
    },
    {
        "schedule_name": "daily-cleanup",
        "workflow_fn": daily_cleanup,
        "schedule": "0 0 3 * * *",
        "context": {"retention_days": 60}  # Changed retention
    }
])
apply_schedules() replaces existing schedules with the same name, making it ideal for declarative schedule management.

Backfilling Schedules

Enqueue executions for past time periods:
from datetime import datetime, timedelta

# Backfill last 7 days of daily reports
start = datetime.now() - timedelta(days=7)
end = datetime.now()

handles = DBOS.backfill_schedule(
    schedule_name="daily-report",
    start=start,
    end=end
)

print(f"Enqueued {len(handles)} backfill workflows")

# Wait for all to complete
for handle in handles:
    result = handle.get_result()
    print(f"Completed: {handle.get_workflow_id()}")
Backfill uses the same deterministic workflow IDs as the scheduler, so already-executed times are automatically skipped.

Triggering Schedules Manually

Run a scheduled workflow immediately:
# Trigger the scheduled workflow now
handle = DBOS.trigger_schedule("daily-report")

print(f"Triggered workflow: {handle.get_workflow_id()}")

# Wait for completion
result = handle.get_result()

Durable Sleep

For workflow-internal delays, use durable sleep instead of scheduling:
@DBOS.workflow()
def delayed_notification_workflow(user_id: str, message: str) -> None:
    """Send notification after a delay."""
    
    # Send immediate confirmation
    send_notification(user_id, "Processing your request...")
    
    # Durable sleep for 1 hour (survives restarts)
    DBOS.sleep(3600)
    
    # Send follow-up notification
    send_notification(user_id, message)
Use DBOS.sleep() for delays within workflows. It’s durable and the workflow will resume after the delay even if the process restarts.

Schedule Context

Pass configuration to scheduled workflows:
@DBOS.workflow()
def environment_specific_backup(scheduled_at: datetime, context: Any) -> None:
    """Backup with environment-specific configuration."""
    
    env = context["environment"]
    retention_days = context["retention_days"]
    notification_email = context["notification_email"]
    
    DBOS.logger.info(f"Backing up {env} environment")
    
    # Perform backup
    backup_id = create_backup(env, scheduled_at)
    
    # Clean old backups
    cleanup_old_backups(env, retention_days)
    
    # Notify
    send_email(
        notification_email,
        f"{env} Backup Complete",
        f"Backup ID: {backup_id}"
    )

# Production schedule
DBOS.create_schedule(
    schedule_name="production-backup",
    workflow_fn=environment_specific_backup,
    schedule="0 0 1 * * *",  # 1 AM daily
    context={
        "environment": "production",
        "retention_days": 30,
        "notification_email": "[email protected]"
    }
)

# Staging schedule
DBOS.create_schedule(
    schedule_name="staging-backup",
    workflow_fn=environment_specific_backup,
    schedule="0 0 3 * * *",  # 3 AM daily
    context={
        "environment": "staging",
        "retention_days": 7,
        "notification_email": "[email protected]"
    }
)

Distributed Scheduling

Schedules work correctly in distributed deployments:
# This schedule runs even with multiple app instances
@DBOS.scheduled("0 0 * * * *")  # Every hour
@DBOS.workflow()
def distributed_task(scheduled_at: datetime, context: Any) -> None:
    """This runs exactly once per hour across all instances."""
    
    # Deterministic workflow ID prevents duplicates
    workflow_id = DBOS.workflow_id
    print(f"Running scheduled task: {workflow_id}")
    
    # Do work
    process_hourly_data(scheduled_at)
DBOS automatically coordinates schedules across multiple instances using deterministic workflow IDs and database-backed deduplication.

Complete Example: Multi-Schedule Report System

from dbos import DBOS
from datetime import datetime, timedelta
from typing import Any
import sqlalchemy as sa

@DBOS.transaction()
def get_metrics(start_time: datetime, end_time: datetime, metric_type: str) -> list[dict]:
    """Query metrics from database."""
    result = DBOS.sql_session.execute(
        sa.text(
            "SELECT timestamp, value FROM metrics "
            "WHERE timestamp >= :start AND timestamp < :end "
            "AND metric_type = :type ORDER BY timestamp"
        ),
        {"start": start_time, "end": end_time, "type": metric_type}
    )
    return [{"timestamp": row[0], "value": row[1]} for row in result]

@DBOS.step()
def generate_report(metrics: list[dict], report_type: str) -> dict:
    """Generate report from metrics."""
    if not metrics:
        return {"error": "No data available"}
    
    total = sum(m["value"] for m in metrics)
    avg = total / len(metrics)
    
    return {
        "report_type": report_type,
        "count": len(metrics),
        "total": total,
        "average": avg,
        "metrics": metrics
    }

@DBOS.step()
def send_report(report: dict, recipients: list[str]) -> None:
    """Send report to recipients."""
    for recipient in recipients:
        email_service.send(
            to=recipient,
            subject=f"{report['report_type']} Report",
            body=format_report(report)
        )

@DBOS.workflow()
def report_workflow(scheduled_at: datetime, context: Any) -> dict:
    """Generate and send scheduled report."""
    report_type = context["report_type"]
    metric_type = context["metric_type"]
    recipients = context["recipients"]
    lookback_hours = context.get("lookback_hours", 24)
    
    DBOS.logger.info(f"Generating {report_type} report for {scheduled_at}")
    
    # Calculate time range
    end_time = scheduled_at
    start_time = end_time - timedelta(hours=lookback_hours)
    
    # Get metrics
    metrics = get_metrics(start_time, end_time, metric_type)
    
    # Generate report
    report = generate_report(metrics, report_type)
    
    # Send to recipients
    send_report(report, recipients)
    
    DBOS.logger.info(f"{report_type} report sent successfully")
    
    return report

# Initialize schedules on application startup
def setup_schedules():
    """Set up all report schedules."""
    
    DBOS.apply_schedules([
        # Hourly operational metrics
        {
            "schedule_name": "hourly-ops-report",
            "workflow_fn": report_workflow,
            "schedule": "0 0 * * * *",  # Every hour
            "context": {
                "report_type": "Hourly Operations",
                "metric_type": "operations",
                "recipients": ["[email protected]"],
                "lookback_hours": 1
            }
        },
        
        # Daily summary
        {
            "schedule_name": "daily-summary",
            "workflow_fn": report_workflow,
            "schedule": "0 0 6 * * *",  # 6 AM daily
            "context": {
                "report_type": "Daily Summary",
                "metric_type": "all",
                "recipients": ["[email protected]", "[email protected]"],
                "lookback_hours": 24
            }
        },
        
        # Weekly executive report
        {
            "schedule_name": "weekly-executive",
            "workflow_fn": report_workflow,
            "schedule": "0 0 9 * * 1",  # Monday 9 AM
            "context": {
                "report_type": "Weekly Executive Summary",
                "metric_type": "business",
                "recipients": ["[email protected]"],
                "lookback_hours": 168  # 7 days
            }
        },
        
        # Monthly financial report
        {
            "schedule_name": "monthly-financial",
            "workflow_fn": report_workflow,
            "schedule": "0 0 8 1 * *",  # 1st of month, 8 AM
            "context": {
                "report_type": "Monthly Financial",
                "metric_type": "financial",
                "recipients": ["[email protected]", "[email protected]"],
                "lookback_hours": 720  # 30 days
            }
        }
    ])
    
    print("All report schedules configured")

# List active schedules
def list_active_schedules():
    """Display all active report schedules."""
    schedules = DBOS.list_schedules(status="ACTIVE")
    
    for schedule in schedules:
        print(f"\nSchedule: {schedule['schedule_name']}")
        print(f"  Cron: {schedule['schedule']}")
        print(f"  Type: {schedule['context']['report_type']}")
        print(f"  Recipients: {', '.join(schedule['context']['recipients'])}")

# Manually trigger a report
def trigger_report_now(schedule_name: str):
    """Manually trigger a scheduled report."""
    handle = DBOS.trigger_schedule(schedule_name)
    print(f"Triggered {schedule_name}: {handle.get_workflow_id()}")
    return handle

Best Practices

  • Use seconds/minutes for real-time monitoring
  • Use hours for operational tasks
  • Use daily/weekly for reports and summaries
  • Avoid excessive frequency that could overwhelm systems
  • Scheduled workflows should gracefully handle missing or incomplete data
  • Use context to configure retry behavior
  • Log warnings for data quality issues
  • Regularly check schedule status
  • Alert on failed scheduled workflows
  • Track execution duration trends
  • Store environment-specific configuration in context
  • Avoid storing sensitive data directly (use references)
  • Keep context small and serializable

Next Steps

Queue Tutorial

Learn about workflow queuing and concurrency control

Workflow Management

Monitor and manage scheduled workflows programmatically

Error Handling

Handle failures in scheduled workflows

Configuration

Configure scheduler polling intervals

Build docs developers (and LLMs) love