Durability in DBOS means your workflows automatically recover from any failure or interruption. When your application crashes, is killed, or restarts, DBOS detects all in-progress workflows and resumes them from the last completed step. You don’t need to write any recovery code—it’s built into the framework.
from dbos import DBOS@DBOS.workflow()def durable_order_processing(order_id: str): charge_payment(order_id) # Step 1: If app crashes here reserve_inventory(order_id) # Step 2: ...recovery skips Step 1 ship_order(order_id) # Step 3: ...and continues from Step 2 return "order completed"# If the app crashes after Step 1 completes, when it restarts:# - Step 1 is skipped (already completed, result is reused)# - Step 2 executes# - Step 3 executes# - Workflow completes successfully
Already-completed steps are skipped - Their stored results are reused
In-progress steps are retried - The step that was executing when the crash occurred runs again
Not-yet-started steps execute normally - The workflow continues forward
@DBOS.workflow()def multi_step_workflow(): step_a() # ✅ Completed before crash → skipped on recovery step_b() # ❌ Was running during crash → retried on recovery step_c() # ⏸️ Not started yet → runs on recovery step_d() # ⏸️ Not started yet → runs on recovery
Each step executes at least once but may execute multiple times if:
The step was running during a crash (replayed on recovery)
The step failed and is being retried
A workflow restart occurs mid-step
Important: Design your steps to be idempotent (safe to run multiple times with the same inputs). This ensures correct behavior even if a step is retried.
# ❌ Not idempotent - dangerous!@DBOS.step()def bad_step(account_id: str): balance = get_balance(account_id) new_balance = balance + 100 # Adds 100 every time set_balance(account_id, new_balance)# ✅ Idempotent - safe!@DBOS.step()def good_step(account_id: str, amount: float): # Always sets the balance to a specific value set_balance(account_id, amount)
For recovery to work correctly, workflows must be deterministic: given the same inputs, they must execute the same sequence of steps in the same order.
import randomfrom datetime import datetime@DBOS.workflow()def non_deterministic_workflow(): # Different behavior on replay! if random.random() > 0.5: step_a() else: step_b() # Different timestamp on replay! if datetime.now().hour < 12: morning_task()
Problem: On recovery, the workflow might take a different path than during original execution, leading to incorrect state.
@DBOS.workflow()def deterministic_workflow(): # Move non-deterministic logic into steps random_value = get_random_value() # Step returns consistent result if random_value > 0.5: step_a() else: step_b() current_hour = get_current_hour() # Step returns consistent result if current_hour < 12: morning_task()@DBOS.step()def get_random_value(): import random return random.random() # Result is checkpointed@DBOS.step()def get_current_hour(): from datetime import datetime return datetime.now().hour # Result is checkpointed
Solution: Put all non-deterministic operations (random numbers, timestamps, API calls) in steps. Their results are checkpointed and reused during recovery.
DBOS connects to Postgres and checks the workflow execution table.
2
Find incomplete workflows
DBOS queries for all workflows with status PENDING (started but not completed).
3
Resume each workflow
For each incomplete workflow:
Load the workflow function by name
Deserialize the original arguments
Replay the workflow from the beginning
Skip completed steps (reuse stored results)
Execute incomplete steps
4
Handle errors
If recovery fails repeatedly, DBOS marks the workflow as ERROR after max recovery attempts.
# Start your applicationdbos = DBOS(config)dbos.launch() # Automatically recovers all incomplete workflows# Your application is now running# All interrupted workflows have been resumed
Limit how many times DBOS attempts to recover a workflow:
from dbos import DBOS# Set globally in configconfig = DBOSConfig( max_recovery_attempts=5 # Default: 50)# Or per-workflow via contextfrom dbos import SetWorkflowID@DBOS.workflow()def my_workflow(): # This workflow will retry up to 10 times on recovery pass
After exceeding max recovery attempts, the workflow is marked as ERROR and stops retrying. You can inspect and manually restart such workflows using DBOSClient.
DBOS automatically computes an application version hash based on your workflow code. When you deploy a new version, DBOS detects the change:
# DBOS tracks app version automaticallyapp_version = dbos.app_version # e.g., "a3f7d8e9..."# Workflows from old versions can still recover# But you can query workflows by app version:client = DBOSClient(system_database_url)old_workflows = client.list_workflows(app_version="a3f7d8e9...")
Use DBOSClient.list_workflows() to find workflows from previous versions that are still running. You can cancel or migrate them before deploying breaking changes.
from dbos import Queueprocessing_queue = Queue("tasks", concurrency=10)@DBOS.workflow()def submit_tasks(tasks: list[dict]): # All enqueued tasks will be processed even if app crashes handles = [processing_queue.enqueue(process_task, t) for t in tasks] return [h.get_result() for h in handles]
# Get all steps in a workflowsteps = client.list_workflow_steps(workflow_id)for step in steps: print(f"Step: {step.name}") print(f"Status: {step.status}") # SUCCESS, ERROR, PENDING if step.error: print(f"Error: {step.error}")
DBOS integrates with OpenTelemetry for distributed tracing:
# Enable OTLP export in configconfig = DBOSConfig( otlp_endpoint="https://your-collector:4318")# Workflows, steps, and transactions are automatically traced# View traces in Jaeger, Honeycomb, or other OTLP-compatible tools
Ensure steps can be safely retried without side effects.
# Use unique IDs for external operations@DBOS.step()def idempotent_api_call(request_id: str, data: dict): # API uses request_id to deduplicate return api.process(idempotency_key=request_id, data=data)
Wait for active workflows to complete before deploying breaking changes, or use versioning strategies.
# Check for active workflows before deployactive = client.list_workflows(status="PENDING")if active: print(f"Warning: {len(active)} workflows still running")