Introduction
DBOS provides a durable, distributed scheduler that executes workflows based on cron schedules. Scheduled workflows are resilient to failures and automatically deduplicated across multiple application instances.
Decorator-Based Scheduling
Use the @DBOS.scheduled() decorator to schedule workflows directly in code:
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.scheduled ( "0 0 * * *" ) # Every day at midnight
@DBOS.workflow ()
def daily_report_workflow ( scheduled_at : datetime, context : Any) -> None :
"""Generate and send daily reports."""
DBOS .logger.info( f "Generating report for { scheduled_at } " )
# Generate report
report = generate_daily_report(scheduled_at)
# Send to recipients
send_report_email(report)
@DBOS.step ()
def generate_daily_report ( date : datetime) -> dict :
"""Generate report data."""
return {
"date" : date.isoformat(),
"metrics" : calculate_metrics(date)
}
@DBOS.step ()
def send_report_email ( report : dict ) -> None :
"""Send report via email."""
email_service.send( "[email protected] " , "Daily Report" , report)
Scheduled workflows must accept two parameters: scheduled_at (datetime) and context (Any).
DBOS supports extended cron expressions with seconds precision (6 fields):
┌────────── second (0 - 59)
│ ┌──────── minute (0 - 59)
│ │ ┌────── hour (0 - 23)
│ │ │ ┌──── day of month (1 - 31)
│ │ │ │ ┌── month (1 - 12)
│ │ │ │ │ ┌ day of week (0 - 6) (Sunday = 0)
│ │ │ │ │ │
* * * * * *
Common Patterns
Frequent
Hourly
Daily
Weekly
Monthly
# Every 30 seconds
@DBOS.scheduled ( "*/30 * * * * *" )
# Every minute
@DBOS.scheduled ( "0 * * * * *" )
# Every 5 minutes
@DBOS.scheduled ( "0 */5 * * * *" )
# Every 15 minutes
@DBOS.scheduled ( "0 */15 * * * *" )
# Every hour at minute 0
@DBOS.scheduled ( "0 0 * * * *" )
# Every hour at minute 30
@DBOS.scheduled ( "0 30 * * * *" )
# Every 3 hours
@DBOS.scheduled ( "0 0 */3 * * *" )
# Every day at midnight
@DBOS.scheduled ( "0 0 0 * * *" )
# Every day at 6:00 AM
@DBOS.scheduled ( "0 0 6 * * *" )
# Every day at 9:30 PM
@DBOS.scheduled ( "0 30 21 * * *" )
# Twice daily (8 AM and 8 PM)
@DBOS.scheduled ( "0 0 8,20 * * *" )
# Every Monday at 9 AM
@DBOS.scheduled ( "0 0 9 * * 1" )
# Every weekday at 8 AM
@DBOS.scheduled ( "0 0 8 * * 1-5" )
# Every Sunday at midnight
@DBOS.scheduled ( "0 0 0 * * 0" )
# First day of month at midnight
@DBOS.scheduled ( "0 0 0 1 * *" )
# 15th of every month at noon
@DBOS.scheduled ( "0 0 12 15 * *" )
# Last day of month (use 28-31 carefully)
@DBOS.scheduled ( "0 0 0 28-31 * *" )
Always include 6 fields (including seconds). Standard 5-field cron expressions are not supported.
Programmatic Scheduling
Create and manage schedules dynamically using the DBOS API:
Creating Schedules
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.workflow ()
def backup_database ( scheduled_at : datetime, context : Any) -> None :
"""Perform database backup."""
backup_name = context[ "backup_name" ]
DBOS .logger.info( f "Starting backup: { backup_name } " )
# Perform backup
create_backup(backup_name, scheduled_at)
# Create a schedule programmatically
DBOS .create_schedule(
schedule_name = "nightly-backup" ,
workflow_fn = backup_database,
schedule = "0 0 2 * * *" , # 2 AM daily
context = { "backup_name" : "production" }
)
Listing Schedules
# List all schedules
all_schedules = DBOS .list_schedules()
for schedule in all_schedules:
print ( f "Name: { schedule[ 'schedule_name' ] } " )
print ( f "Workflow: { schedule[ 'workflow_name' ] } " )
print ( f "Cron: { schedule[ 'schedule' ] } " )
print ( f "Status: { schedule[ 'status' ] } " )
print ( f "Context: { schedule[ 'context' ] } " )
print ()
# Filter by status
active_schedules = DBOS .list_schedules( status = "ACTIVE" )
paused_schedules = DBOS .list_schedules( status = "PAUSED" )
# Filter by workflow name
backup_schedules = DBOS .list_schedules(
workflow_name = "backup_database"
)
# Filter by schedule name prefix
report_schedules = DBOS .list_schedules(
schedule_name_prefix = "report-"
)
Getting a Specific Schedule
# Get schedule by name
schedule = DBOS .get_schedule( "nightly-backup" )
if schedule:
print ( f "Schedule: { schedule[ 'schedule' ] } " )
print ( f "Context: { schedule[ 'context' ] } " )
print ( f "Status: { schedule[ 'status' ] } " )
else :
print ( "Schedule not found" )
Pausing and Resuming
# Pause a schedule (stops firing, but doesn't delete)
DBOS .pause_schedule( "nightly-backup" )
# Resume a paused schedule
DBOS .resume_schedule( "nightly-backup" )
# Check status
schedule = DBOS .get_schedule( "nightly-backup" )
if schedule[ "status" ] == "ACTIVE" :
print ( "Schedule is running" )
else :
print ( "Schedule is paused" )
Deleting Schedules
# Delete a schedule
DBOS .delete_schedule( "nightly-backup" )
# Verify deletion
schedule = DBOS .get_schedule( "nightly-backup" )
print (schedule) # None
Apply Schedules Atomically
Replace multiple schedules atomically:
@DBOS.workflow ()
def hourly_sync ( scheduled_at : datetime, context : Any) -> None :
sync_data(context[ "source" ])
@DBOS.workflow ()
def daily_cleanup ( scheduled_at : datetime, context : Any) -> None :
cleanup_old_data(context[ "retention_days" ])
# Apply multiple schedules at once
DBOS .apply_schedules([
{
"schedule_name" : "hourly-sync" ,
"workflow_fn" : hourly_sync,
"schedule" : "0 0 * * * *" ,
"context" : { "source" : "external-api" }
},
{
"schedule_name" : "daily-cleanup" ,
"workflow_fn" : daily_cleanup,
"schedule" : "0 0 3 * * *" ,
"context" : { "retention_days" : 30 }
}
])
# Later, update schedules atomically
DBOS .apply_schedules([
{
"schedule_name" : "hourly-sync" ,
"workflow_fn" : hourly_sync,
"schedule" : "0 */30 * * * *" , # Changed to every 30 minutes
"context" : { "source" : "external-api" }
},
{
"schedule_name" : "daily-cleanup" ,
"workflow_fn" : daily_cleanup,
"schedule" : "0 0 3 * * *" ,
"context" : { "retention_days" : 60 } # Changed retention
}
])
apply_schedules() replaces existing schedules with the same name, making it ideal for declarative schedule management.
Backfilling Schedules
Enqueue executions for past time periods:
from datetime import datetime, timedelta
# Backfill last 7 days of daily reports
start = datetime.now() - timedelta( days = 7 )
end = datetime.now()
handles = DBOS .backfill_schedule(
schedule_name = "daily-report" ,
start = start,
end = end
)
print ( f "Enqueued { len (handles) } backfill workflows" )
# Wait for all to complete
for handle in handles:
result = handle.get_result()
print ( f "Completed: { handle.get_workflow_id() } " )
Backfill uses the same deterministic workflow IDs as the scheduler, so already-executed times are automatically skipped.
Triggering Schedules Manually
Run a scheduled workflow immediately:
# Trigger the scheduled workflow now
handle = DBOS .trigger_schedule( "daily-report" )
print ( f "Triggered workflow: { handle.get_workflow_id() } " )
# Wait for completion
result = handle.get_result()
Durable Sleep
For workflow-internal delays, use durable sleep instead of scheduling:
@DBOS.workflow ()
def delayed_notification_workflow ( user_id : str , message : str ) -> None :
"""Send notification after a delay."""
# Send immediate confirmation
send_notification(user_id, "Processing your request..." )
# Durable sleep for 1 hour (survives restarts)
DBOS .sleep( 3600 )
# Send follow-up notification
send_notification(user_id, message)
Use DBOS.sleep() for delays within workflows. It’s durable and the workflow will resume after the delay even if the process restarts.
Schedule Context
Pass configuration to scheduled workflows:
@DBOS.workflow ()
def environment_specific_backup ( scheduled_at : datetime, context : Any) -> None :
"""Backup with environment-specific configuration."""
env = context[ "environment" ]
retention_days = context[ "retention_days" ]
notification_email = context[ "notification_email" ]
DBOS .logger.info( f "Backing up { env } environment" )
# Perform backup
backup_id = create_backup(env, scheduled_at)
# Clean old backups
cleanup_old_backups(env, retention_days)
# Notify
send_email(
notification_email,
f " { env } Backup Complete" ,
f "Backup ID: { backup_id } "
)
# Production schedule
DBOS .create_schedule(
schedule_name = "production-backup" ,
workflow_fn = environment_specific_backup,
schedule = "0 0 1 * * *" , # 1 AM daily
context = {
"environment" : "production" ,
"retention_days" : 30 ,
"notification_email" : "[email protected] "
}
)
# Staging schedule
DBOS .create_schedule(
schedule_name = "staging-backup" ,
workflow_fn = environment_specific_backup,
schedule = "0 0 3 * * *" , # 3 AM daily
context = {
"environment" : "staging" ,
"retention_days" : 7 ,
"notification_email" : "[email protected] "
}
)
Distributed Scheduling
Schedules work correctly in distributed deployments:
# This schedule runs even with multiple app instances
@DBOS.scheduled ( "0 0 * * * *" ) # Every hour
@DBOS.workflow ()
def distributed_task ( scheduled_at : datetime, context : Any) -> None :
"""This runs exactly once per hour across all instances."""
# Deterministic workflow ID prevents duplicates
workflow_id = DBOS .workflow_id
print ( f "Running scheduled task: { workflow_id } " )
# Do work
process_hourly_data(scheduled_at)
DBOS automatically coordinates schedules across multiple instances using deterministic workflow IDs and database-backed deduplication.
Complete Example: Multi-Schedule Report System
from dbos import DBOS
from datetime import datetime, timedelta
from typing import Any
import sqlalchemy as sa
@DBOS.transaction ()
def get_metrics ( start_time : datetime, end_time : datetime, metric_type : str ) -> list[ dict ]:
"""Query metrics from database."""
result = DBOS .sql_session.execute(
sa.text(
"SELECT timestamp, value FROM metrics "
"WHERE timestamp >= :start AND timestamp < :end "
"AND metric_type = :type ORDER BY timestamp"
),
{ "start" : start_time, "end" : end_time, "type" : metric_type}
)
return [{ "timestamp" : row[ 0 ], "value" : row[ 1 ]} for row in result]
@DBOS.step ()
def generate_report ( metrics : list[ dict ], report_type : str ) -> dict :
"""Generate report from metrics."""
if not metrics:
return { "error" : "No data available" }
total = sum (m[ "value" ] for m in metrics)
avg = total / len (metrics)
return {
"report_type" : report_type,
"count" : len (metrics),
"total" : total,
"average" : avg,
"metrics" : metrics
}
@DBOS.step ()
def send_report ( report : dict , recipients : list[ str ]) -> None :
"""Send report to recipients."""
for recipient in recipients:
email_service.send(
to = recipient,
subject = f " { report[ 'report_type' ] } Report" ,
body = format_report(report)
)
@DBOS.workflow ()
def report_workflow ( scheduled_at : datetime, context : Any) -> dict :
"""Generate and send scheduled report."""
report_type = context[ "report_type" ]
metric_type = context[ "metric_type" ]
recipients = context[ "recipients" ]
lookback_hours = context.get( "lookback_hours" , 24 )
DBOS .logger.info( f "Generating { report_type } report for { scheduled_at } " )
# Calculate time range
end_time = scheduled_at
start_time = end_time - timedelta( hours = lookback_hours)
# Get metrics
metrics = get_metrics(start_time, end_time, metric_type)
# Generate report
report = generate_report(metrics, report_type)
# Send to recipients
send_report(report, recipients)
DBOS .logger.info( f " { report_type } report sent successfully" )
return report
# Initialize schedules on application startup
def setup_schedules ():
"""Set up all report schedules."""
DBOS .apply_schedules([
# Hourly operational metrics
{
"schedule_name" : "hourly-ops-report" ,
"workflow_fn" : report_workflow,
"schedule" : "0 0 * * * *" , # Every hour
"context" : {
"report_type" : "Hourly Operations" ,
"metric_type" : "operations" ,
"recipients" : [ "[email protected] " ],
"lookback_hours" : 1
}
},
# Daily summary
{
"schedule_name" : "daily-summary" ,
"workflow_fn" : report_workflow,
"schedule" : "0 0 6 * * *" , # 6 AM daily
"context" : {
"report_type" : "Daily Summary" ,
"metric_type" : "all" ,
"recipients" : [ "[email protected] " , "[email protected] " ],
"lookback_hours" : 24
}
},
# Weekly executive report
{
"schedule_name" : "weekly-executive" ,
"workflow_fn" : report_workflow,
"schedule" : "0 0 9 * * 1" , # Monday 9 AM
"context" : {
"report_type" : "Weekly Executive Summary" ,
"metric_type" : "business" ,
"recipients" : [ "[email protected] " ],
"lookback_hours" : 168 # 7 days
}
},
# Monthly financial report
{
"schedule_name" : "monthly-financial" ,
"workflow_fn" : report_workflow,
"schedule" : "0 0 8 1 * *" , # 1st of month, 8 AM
"context" : {
"report_type" : "Monthly Financial" ,
"metric_type" : "financial" ,
"recipients" : [ "[email protected] " , "[email protected] " ],
"lookback_hours" : 720 # 30 days
}
}
])
print ( "All report schedules configured" )
# List active schedules
def list_active_schedules ():
"""Display all active report schedules."""
schedules = DBOS .list_schedules( status = "ACTIVE" )
for schedule in schedules:
print ( f " \n Schedule: { schedule[ 'schedule_name' ] } " )
print ( f " Cron: { schedule[ 'schedule' ] } " )
print ( f " Type: { schedule[ 'context' ][ 'report_type' ] } " )
print ( f " Recipients: { ', ' .join(schedule[ 'context' ][ 'recipients' ]) } " )
# Manually trigger a report
def trigger_report_now ( schedule_name : str ):
"""Manually trigger a scheduled report."""
handle = DBOS .trigger_schedule(schedule_name)
print ( f "Triggered { schedule_name } : { handle.get_workflow_id() } " )
return handle
Best Practices
Choose Appropriate Frequencies
Use seconds/minutes for real-time monitoring
Use hours for operational tasks
Use daily/weekly for reports and summaries
Avoid excessive frequency that could overwhelm systems
Scheduled workflows should gracefully handle missing or incomplete data
Use context to configure retry behavior
Log warnings for data quality issues
Regularly check schedule status
Alert on failed scheduled workflows
Track execution duration trends
Store environment-specific configuration in context
Avoid storing sensitive data directly (use references)
Keep context small and serializable
Next Steps
Queue Tutorial Learn about workflow queuing and concurrency control
Workflow Management Monitor and manage scheduled workflows programmatically
Error Handling Handle failures in scheduled workflows
Configuration Configure scheduler polling intervals