Skip to main content
DBOS Transact integrates seamlessly with FastAPI to provide durable execution, automatic recovery, and distributed tracing for your async web applications.

Quick Start

Integrating DBOS with FastAPI requires just a few lines of code:
1

Install Dependencies

pip install dbos fastapi uvicorn
2

Initialize DBOS with FastAPI

from fastapi import FastAPI
from dbos import DBOS, DBOSConfig

app = FastAPI()

# Initialize DBOS with your FastAPI app
config = DBOSConfig(name="my-fastapi-app")
DBOS(fastapi=app, config=config)
3

Use DBOS Decorators

@app.get("/user/{user_id}")
@DBOS.workflow()
async def get_user(user_id: str) -> dict:
    user_data = await fetch_user_step(user_id)
    return {"user": user_data}

@DBOS.step()
async def fetch_user_step(user_id: str) -> str:
    # This step is automatically retried on failure
    return f"User {user_id}"

How It Works

When you pass your FastAPI app to DBOS, it automatically:
  1. Installs middleware to track requests and enable recovery
  2. Manages lifecycle through FastAPI’s lifespan events
  3. Handles errors with custom exception handlers for DBOSException
  4. Traces requests with OpenTelemetry integration

Automatic Middleware Setup

DBOS adds HTTP middleware that:
  • Generates or uses request IDs from the dbos-request-id header
  • Captures request metadata (URL, method, IP address)
  • Supports idempotency via the dbos-idempotency-key header
  • Records response status codes in traces
# The middleware is automatically installed - here's what it does internally:
@app.middleware("http")
async def dbos_fastapi_middleware(request: Request, call_next: Callable) -> Any:
    attributes = {
        "name": str(request.url.path),
        "requestID": request.headers.get("dbos-request-id") or generate_uuid(),
        "requestIP": request.client.host if request.client else None,
        "requestURL": str(request.url),
        "requestMethod": request.method,
    }
    with EnterDBOSHandler(attributes):
        # Handle idempotency
        workflow_id = request.headers.get("dbos-idempotency-key")
        if workflow_id:
            with SetWorkflowID(workflow_id):
                response = await call_next(request)
        else:
            response = await call_next(request)
    return response

Usage Patterns

Stacking Decorators

You can combine FastAPI route decorators with DBOS decorators:
@app.get("/process/{item_id}")
@DBOS.workflow()
async def process_item(item_id: str) -> dict:
    # This entire endpoint is a durable workflow
    result = await process_step(item_id)
    await notify_step(item_id, result)
    return {"item_id": item_id, "result": result}

Starting Workflows from Endpoints

For long-running operations, start a workflow and return immediately:
@app.post("/jobs")
async def create_job(job_data: dict) -> dict:
    # Start workflow in the background
    handle = DBOS.start_workflow(process_job, job_data)
    
    return {
        "workflow_id": handle.workflow_id,
        "status": "started"
    }

@app.get("/jobs/{workflow_id}")
async def get_job_status(workflow_id: str) -> dict:
    # Retrieve workflow handle by ID
    handle = DBOS.retrieve_workflow(workflow_id)
    status = handle.get_status()
    
    return {
        "workflow_id": workflow_id,
        "status": status.status,
        "result": handle.get_result() if status.status == "SUCCESS" else None
    }

@DBOS.workflow()
async def process_job(job_data: dict) -> dict:
    # Long-running workflow logic
    await asyncio.sleep(10)
    return {"processed": job_data}

Idempotency

Use the dbos-idempotency-key header to ensure requests are processed exactly once:
import httpx

# Client code
async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://localhost:8000/payment",
        json={"amount": 100, "user_id": "123"},
        headers={"dbos-idempotency-key": "payment-abc-123"}
    )
If the same request is sent multiple times (e.g., due to network issues), DBOS ensures it’s executed only once.

Custom Lifespan Management

DBOS works with FastAPI’s lifespan context managers:
from contextlib import asynccontextmanager

resource = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global resource
    resource = await initialize_resource()
    yield
    # Shutdown
    await cleanup_resource(resource)
    resource = None

app = FastAPI(lifespan=lifespan)
DBOS(fastapi=app, config=config)

@app.get("/resource")
@DBOS.workflow()
async def use_resource() -> dict:
    # Access the resource initialized in lifespan
    return {"resource": resource}
DBOS automatically hooks into FastAPI’s lifespan events to manage its internal state. Your custom lifespan managers work alongside DBOS’s lifecycle management.

Error Handling

DBOS automatically installs an exception handler for DBOSException:
from dbos import error

@app.get("/data/{id}")
@DBOS.workflow()
async def get_data(id: str) -> dict:
    if not id.isdigit():
        raise error.DBOSException(
            "Invalid ID format",
            dbos_error_code="INVALID_ID",
            status_code=400
        )
    return {"id": id}
The exception handler returns a JSON response:
{
  "message": "Invalid ID format",
  "dbos_error_code": "INVALID_ID",
  "dbos_error": "DBOSException"
}

Complete Example

Here’s a full example combining multiple DBOS features with FastAPI:
import sqlalchemy as sa
from fastapi import FastAPI
from pydantic import BaseModel
from dbos import DBOS, DBOSConfig

app = FastAPI()

config = DBOSConfig(
    name="order-service",
    application_database_url="postgresql://user:password@localhost/orders"
)
DBOS(fastapi=app, config=config)

class Order(BaseModel):
    user_id: str
    item_id: str
    quantity: int

@app.post("/orders")
@DBOS.workflow()
async def create_order(order: Order) -> dict:
    # Durable workflow - automatically recovers on failure
    order_id = await save_order_txn(order)
    await send_confirmation_step(order.user_id, order_id)
    return {"order_id": order_id, "status": "confirmed"}

@DBOS.transaction()
async def save_order_txn(order: Order) -> str:
    # Transactional database operation
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (user_id, item_id, quantity) "
            "VALUES (:user, :item, :qty) RETURNING id"
        ),
        {"user": order.user_id, "item": order.item_id, "qty": order.quantity}
    ).fetchone()
    return str(result[0])

@DBOS.step()
async def send_confirmation_step(user_id: str, order_id: str) -> None:
    # External API call - automatically retried on failure
    print(f"Sending confirmation to {user_id} for order {order_id}")

@app.get("/orders/{order_id}")
@DBOS.transaction()
async def get_order(order_id: str) -> dict:
    result = DBOS.sql_session.execute(
        sa.text("SELECT * FROM orders WHERE id = :id"),
        {"id": order_id}
    ).fetchone()
    if not result:
        from dbos import error
        raise error.DBOSException("Order not found", status_code=404)
    return {"order_id": result[0], "user_id": result[1]}

Running Your Application

Start your FastAPI application with uvicorn:
uvicorn main:app --reload
DBOS automatically:
  • Initializes on startup
  • Recovers any pending workflows from previous runs
  • Cleans up on shutdown
When using DBOS with FastAPI, always pass the FastAPI app instance to the DBOS constructor. Don’t call DBOS.launch() manually - the lifespan middleware handles initialization.

Advanced Configuration

Request ID Customization

By default, DBOS looks for request IDs in the dbos-request-id header. You can provide custom request IDs:
response = await client.get(
    "/endpoint",
    headers={"dbos-request-id": "custom-request-123"}
)

Telemetry and Tracing

DBOS automatically creates OpenTelemetry spans for each request. Configure OTLP endpoints in your config:
config = DBOSConfig(
    name="my-app",
    enable_otlp=True,
    otlp_traces_endpoints=["http://localhost:4318/v1/traces"],
    otlp_logs_endpoints=["http://localhost:4318/v1/logs"]
)

Testing

Use FastAPI’s TestClient for testing DBOS endpoints:
from fastapi.testclient import TestClient

def test_endpoint():
    client = TestClient(app)
    response = client.get("/workflow/test/data")
    assert response.status_code == 200
    assert response.json() == {"result": "test1data"}

Next Steps

Build docs developers (and LLMs) love