Skip to main content

Introduction

Workflows are the core building blocks of DBOS Transact applications. They orchestrate multiple steps and transactions into reliable, recoverable operations that automatically resume from the last completed step after interruptions. This tutorial will guide you through creating your first workflow from scratch.

Basic Workflow

1

Import DBOS

Start by importing the DBOS class:
from dbos import DBOS
2

Create a Workflow Function

Define your workflow using the @DBOS.workflow() decorator:
@DBOS.workflow()
def process_order(order_id: str, customer_email: str) -> str:
    # Workflow orchestration logic here
    result = f"Processed order {order_id}"
    DBOS.logger.info(f"Processing order for {customer_email}")
    return result
Workflow functions are automatically assigned a unique workflow_id that persists across restarts.
3

Run Your Workflow

Execute the workflow directly:
result = process_order("order-123", "[email protected]")
print(result)  # "Processed order order-123"

Adding Steps to Workflows

Steps are lightweight, idempotent operations that execute within workflows. They’re perfect for external API calls, computations, or any non-database operations.
@DBOS.step()
def calculate_total(items: list[dict]) -> float:
    """Calculate order total - this step is idempotent and cached."""
    total = sum(item["price"] * item["quantity"] for item in items)
    return total

@DBOS.step()
def send_confirmation_email(email: str, order_id: str) -> None:
    """Send email - only executes once even if workflow is restarted."""
    # Call to external email service
    print(f"Sending confirmation to {email} for order {order_id}")

@DBOS.workflow()
def complete_order_workflow(order_id: str, items: list[dict], email: str) -> dict:
    # Calculate total (step 1)
    total = calculate_total(items)
    
    # Send confirmation (step 2)
    send_confirmation_email(email, order_id)
    
    return {
        "order_id": order_id,
        "total": total,
        "status": "completed"
    }
Steps are automatically retried on transient failures and their results are cached, so they only execute once per workflow execution.

Adding Transactions

Transactions provide ACID guarantees for database operations:
import sqlalchemy as sa

@DBOS.transaction()
def save_order(order_id: str, customer_id: str, total: float) -> int:
    """Save order to database with ACID guarantees."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (order_id, customer_id, total, status) "
            "VALUES (:order_id, :customer_id, :total, 'pending') "
            "RETURNING id"
        ),
        {"order_id": order_id, "customer_id": customer_id, "total": total}
    )
    row = result.fetchone()
    return row[0]

@DBOS.transaction()
def update_inventory(items: list[dict]) -> None:
    """Update inventory levels atomically."""
    for item in items:
        DBOS.sql_session.execute(
            sa.text(
                "UPDATE inventory SET quantity = quantity - :qty "
                "WHERE product_id = :pid"
            ),
            {"qty": item["quantity"], "pid": item["product_id"]}
        )

@DBOS.workflow()
def order_with_database_workflow(order_id: str, customer_id: str, items: list[dict]) -> dict:
    # Step 1: Calculate total
    total = calculate_total(items)
    
    # Step 2: Save order to database
    db_order_id = save_order(order_id, customer_id, total)
    
    # Step 3: Update inventory
    update_inventory(items)
    
    # Step 4: Send confirmation
    send_confirmation_email("[email protected]", order_id)
    
    return {"order_id": order_id, "db_id": db_order_id, "total": total}

Workflow Context and Properties

Access workflow metadata and utilities using DBOS context properties:
@DBOS.workflow()
def contextual_workflow(user_input: str) -> dict:
    # Access the unique workflow ID
    wf_id = DBOS.workflow_id
    
    # Access the current step ID (increments with each step/transaction)
    current_step = DBOS.step_id
    
    # Use the logger (automatically includes workflow_id in logs)
    DBOS.logger.info(f"Processing input: {user_input}")
    
    return {
        "workflow_id": wf_id,
        "step_id": current_step,
        "input": user_input
    }

Child Workflows

Workflows can start other workflows as children. Child workflows track their parent relationship:
@DBOS.workflow()
def child_workflow(item_id: str) -> str:
    # Process a single item
    return f"Processed {item_id}"

@DBOS.workflow()
def parent_workflow(item_ids: list[str]) -> list[str]:
    results = []
    
    # Synchronous child workflow (waits for completion)
    result1 = child_workflow(item_ids[0])
    results.append(result1)
    
    # Asynchronous child workflow (returns immediately)
    handle = DBOS.start_workflow(child_workflow, item_ids[1])
    
    # Do other work...
    
    # Wait for async child to complete
    result2 = handle.get_result()
    results.append(result2)
    
    return results
Child workflow IDs follow the pattern {parent_id}-{function_id} for synchronous children, or use generated UUIDs for asynchronous children.

Setting Workflow IDs

For idempotency, you can assign specific IDs to workflows:
from dbos import SetWorkflowID

# Use a deterministic ID based on business logic
order_workflow_id = f"order-{order_id}"

with SetWorkflowID(order_workflow_id):
    result = process_order(order_id, customer_email)

# Running again with the same ID returns the cached result
with SetWorkflowID(order_workflow_id):
    result = process_order(order_id, customer_email)  # Returns immediately
Only the first workflow started within a SetWorkflowID block receives the specified ID. Subsequent workflows use generated IDs.

Workflow Recovery

Workflows automatically recover from failures by replaying from their last completed step:
@DBOS.step()
def risky_external_call(data: str) -> str:
    # This might fail, but will be retried automatically
    # If it succeeds once, the result is cached
    response = external_api.send(data)
    return response

@DBOS.workflow()
def resilient_workflow(input_data: str) -> str:
    # Step 1: Save initial state
    save_order("order-1", "customer-1", 100.0)
    
    # Step 2: Make external call (cached on success)
    result = risky_external_call(input_data)
    
    # Step 3: Process result
    final_result = process_result(result)
    
    return final_result

# If this workflow crashes after step 2, recovery will:
# 1. Skip step 1 (transaction already committed)
# 2. Skip step 2 (result cached)
# 3. Execute step 3 and continue

Workflow Handles

Start workflows asynchronously and retrieve their results later:
# Start workflow without waiting
handle = DBOS.start_workflow(process_order, "order-456", "[email protected]")

# Get the workflow ID
workflow_id = handle.get_workflow_id()

# Check workflow status
status = handle.get_status()
print(status.status)  # "PENDING", "SUCCESS", "ERROR", etc.

# Wait for completion and get result
result = handle.get_result()
print(result)

Async Workflows

DBOS supports async/await syntax for workflows:
import asyncio

@DBOS.workflow()
async def async_workflow(data: str) -> str:
    # Call async steps
    result1 = await async_step(data)
    
    # Start child workflow asynchronously
    handle = await DBOS.start_workflow_async(child_workflow, result1)
    
    # Concurrent operations
    result2 = await handle.get_result()
    
    return result2

@DBOS.step()
async def async_step(input: str) -> str:
    await asyncio.sleep(1)  # Simulated async operation
    return f"Processed: {input}"

# Run async workflow
result = asyncio.run(async_workflow("test"))

Complete Example

Here’s a full e-commerce order processing workflow:
from dbos import DBOS
import sqlalchemy as sa
from datetime import datetime

@DBOS.step()
def validate_inventory(items: list[dict]) -> bool:
    """Check if items are in stock."""
    for item in items:
        # External inventory service call
        if not check_stock(item["product_id"], item["quantity"]):
            return False
    return True

@DBOS.step()
def calculate_order_total(items: list[dict]) -> float:
    """Calculate total with tax and shipping."""
    subtotal = sum(item["price"] * item["quantity"] for item in items)
    tax = subtotal * 0.08
    shipping = 10.0 if subtotal < 50 else 0.0
    return subtotal + tax + shipping

@DBOS.transaction()
def create_order_record(order_id: str, customer_id: str, total: float, items: list[dict]) -> int:
    """Create order in database."""
    # Insert order
    order_result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (order_id, customer_id, total, created_at) "
            "VALUES (:oid, :cid, :total, :created) RETURNING id"
        ),
        {"oid": order_id, "cid": customer_id, "total": total, "created": datetime.now()}
    )
    db_id = order_result.fetchone()[0]
    
    # Insert order items
    for item in items:
        DBOS.sql_session.execute(
            sa.text(
                "INSERT INTO order_items (order_id, product_id, quantity, price) "
                "VALUES (:oid, :pid, :qty, :price)"
            ),
            {"oid": db_id, "pid": item["product_id"], "qty": item["quantity"], "price": item["price"]}
        )
    
    return db_id

@DBOS.step()
def charge_payment(customer_id: str, amount: float) -> str:
    """Process payment via payment gateway."""
    # External payment API call
    transaction_id = payment_gateway.charge(customer_id, amount)
    return transaction_id

@DBOS.step()
def send_order_confirmation(email: str, order_id: str, total: float) -> None:
    """Send confirmation email to customer."""
    email_service.send(
        to=email,
        subject=f"Order Confirmation - {order_id}",
        body=f"Thank you for your order! Total: ${total:.2f}"
    )

@DBOS.workflow()
def process_customer_order(order_id: str, customer_id: str, customer_email: str, items: list[dict]) -> dict:
    """Complete order processing workflow."""
    DBOS.logger.info(f"Starting order processing for {order_id}")
    
    # Step 1: Validate inventory
    if not validate_inventory(items):
        raise Exception("Insufficient inventory")
    
    # Step 2: Calculate total
    total = calculate_order_total(items)
    
    # Step 3: Create order in database
    db_order_id = create_order_record(order_id, customer_id, total, items)
    
    # Step 4: Charge payment
    transaction_id = charge_payment(customer_id, total)
    
    # Step 5: Send confirmation
    send_order_confirmation(customer_email, order_id, total)
    
    DBOS.logger.info(f"Order {order_id} completed successfully")
    
    return {
        "order_id": order_id,
        "db_id": db_order_id,
        "transaction_id": transaction_id,
        "total": total,
        "status": "completed"
    }

Next Steps

Scheduled Workflows

Learn how to schedule workflows with cron expressions

Queue Tutorial

Control concurrency and rate limiting with queues

Error Handling

Handle failures and implement retry strategies

Notifications

Use durable notifications for workflow coordination

Build docs developers (and LLMs) love