Workflows are the core building blocks of DBOS Transact applications. They orchestrate multiple steps and transactions into reliable, recoverable operations that automatically resume from the last completed step after interruptions.This tutorial will guide you through creating your first workflow from scratch.
Define your workflow using the @DBOS.workflow() decorator:
@DBOS.workflow()def process_order(order_id: str, customer_email: str) -> str: # Workflow orchestration logic here result = f"Processed order {order_id}" DBOS.logger.info(f"Processing order for {customer_email}") return result
Workflow functions are automatically assigned a unique workflow_id that persists across restarts.
3
Run Your Workflow
Execute the workflow directly:
result = process_order("order-123", "[email protected]")print(result) # "Processed order order-123"
Steps are lightweight, idempotent operations that execute within workflows. They’re perfect for external API calls, computations, or any non-database operations.
@DBOS.step()def calculate_total(items: list[dict]) -> float: """Calculate order total - this step is idempotent and cached.""" total = sum(item["price"] * item["quantity"] for item in items) return total@DBOS.step()def send_confirmation_email(email: str, order_id: str) -> None: """Send email - only executes once even if workflow is restarted.""" # Call to external email service print(f"Sending confirmation to {email} for order {order_id}")@DBOS.workflow()def complete_order_workflow(order_id: str, items: list[dict], email: str) -> dict: # Calculate total (step 1) total = calculate_total(items) # Send confirmation (step 2) send_confirmation_email(email, order_id) return { "order_id": order_id, "total": total, "status": "completed" }
Steps are automatically retried on transient failures and their results are cached, so they only execute once per workflow execution.
Access workflow metadata and utilities using DBOS context properties:
@DBOS.workflow()def contextual_workflow(user_input: str) -> dict: # Access the unique workflow ID wf_id = DBOS.workflow_id # Access the current step ID (increments with each step/transaction) current_step = DBOS.step_id # Use the logger (automatically includes workflow_id in logs) DBOS.logger.info(f"Processing input: {user_input}") return { "workflow_id": wf_id, "step_id": current_step, "input": user_input }
For idempotency, you can assign specific IDs to workflows:
from dbos import SetWorkflowID# Use a deterministic ID based on business logicorder_workflow_id = f"order-{order_id}"with SetWorkflowID(order_workflow_id): result = process_order(order_id, customer_email)# Running again with the same ID returns the cached resultwith SetWorkflowID(order_workflow_id): result = process_order(order_id, customer_email) # Returns immediately
Only the first workflow started within a SetWorkflowID block receives the specified ID. Subsequent workflows use generated IDs.
Workflows automatically recover from failures by replaying from their last completed step:
@DBOS.step()def risky_external_call(data: str) -> str: # This might fail, but will be retried automatically # If it succeeds once, the result is cached response = external_api.send(data) return response@DBOS.workflow()def resilient_workflow(input_data: str) -> str: # Step 1: Save initial state save_order("order-1", "customer-1", 100.0) # Step 2: Make external call (cached on success) result = risky_external_call(input_data) # Step 3: Process result final_result = process_result(result) return final_result# If this workflow crashes after step 2, recovery will:# 1. Skip step 1 (transaction already committed)# 2. Skip step 2 (result cached)# 3. Execute step 3 and continue