DBOS Transact integrates seamlessly with FastAPI to provide durable execution, automatic recovery, and distributed tracing for your async web applications.
Quick Start
Integrating DBOS with FastAPI requires just a few lines of code:
Install Dependencies
pip install dbos fastapi uvicorn
Initialize DBOS with FastAPI
from fastapi import FastAPI
from dbos import DBOS , DBOSConfig
app = FastAPI()
# Initialize DBOS with your FastAPI app
config = DBOSConfig( name = "my-fastapi-app" )
DBOS( fastapi = app, config = config)
Use DBOS Decorators
@app.get ( "/user/ {user_id} " )
@DBOS.workflow ()
async def get_user ( user_id : str ) -> dict :
user_data = await fetch_user_step(user_id)
return { "user" : user_data}
@DBOS.step ()
async def fetch_user_step ( user_id : str ) -> str :
# This step is automatically retried on failure
return f "User { user_id } "
How It Works
When you pass your FastAPI app to DBOS, it automatically:
Installs middleware to track requests and enable recovery
Manages lifecycle through FastAPI’s lifespan events
Handles errors with custom exception handlers for DBOSException
Traces requests with OpenTelemetry integration
Automatic Middleware Setup
DBOS adds HTTP middleware that:
Generates or uses request IDs from the dbos-request-id header
Captures request metadata (URL, method, IP address)
Supports idempotency via the dbos-idempotency-key header
Records response status codes in traces
# The middleware is automatically installed - here's what it does internally:
@app.middleware ( "http" )
async def dbos_fastapi_middleware ( request : Request, call_next : Callable) -> Any:
attributes = {
"name" : str (request.url.path),
"requestID" : request.headers.get( "dbos-request-id" ) or generate_uuid(),
"requestIP" : request.client.host if request.client else None ,
"requestURL" : str (request.url),
"requestMethod" : request.method,
}
with EnterDBOSHandler(attributes):
# Handle idempotency
workflow_id = request.headers.get( "dbos-idempotency-key" )
if workflow_id:
with SetWorkflowID(workflow_id):
response = await call_next(request)
else :
response = await call_next(request)
return response
Usage Patterns
Stacking Decorators
You can combine FastAPI route decorators with DBOS decorators:
Workflow Endpoint
Step Endpoint
Transaction Endpoint
@app.get ( "/process/ {item_id} " )
@DBOS.workflow ()
async def process_item ( item_id : str ) -> dict :
# This entire endpoint is a durable workflow
result = await process_step(item_id)
await notify_step(item_id, result)
return { "item_id" : item_id, "result" : result}
Starting Workflows from Endpoints
For long-running operations, start a workflow and return immediately:
@app.post ( "/jobs" )
async def create_job ( job_data : dict ) -> dict :
# Start workflow in the background
handle = DBOS .start_workflow(process_job, job_data)
return {
"workflow_id" : handle.workflow_id,
"status" : "started"
}
@app.get ( "/jobs/ {workflow_id} " )
async def get_job_status ( workflow_id : str ) -> dict :
# Retrieve workflow handle by ID
handle = DBOS .retrieve_workflow(workflow_id)
status = handle.get_status()
return {
"workflow_id" : workflow_id,
"status" : status.status,
"result" : handle.get_result() if status.status == "SUCCESS" else None
}
@DBOS.workflow ()
async def process_job ( job_data : dict ) -> dict :
# Long-running workflow logic
await asyncio.sleep( 10 )
return { "processed" : job_data}
Idempotency
Use the dbos-idempotency-key header to ensure requests are processed exactly once:
import httpx
# Client code
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/payment" ,
json = { "amount" : 100 , "user_id" : "123" },
headers = { "dbos-idempotency-key" : "payment-abc-123" }
)
If the same request is sent multiple times (e.g., due to network issues), DBOS ensures it’s executed only once.
Custom Lifespan Management
DBOS works with FastAPI’s lifespan context managers:
from contextlib import asynccontextmanager
resource = None
@asynccontextmanager
async def lifespan ( app : FastAPI):
# Startup
global resource
resource = await initialize_resource()
yield
# Shutdown
await cleanup_resource(resource)
resource = None
app = FastAPI( lifespan = lifespan)
DBOS( fastapi = app, config = config)
@app.get ( "/resource" )
@DBOS.workflow ()
async def use_resource () -> dict :
# Access the resource initialized in lifespan
return { "resource" : resource}
DBOS automatically hooks into FastAPI’s lifespan events to manage its internal state. Your custom lifespan managers work alongside DBOS’s lifecycle management.
Error Handling
DBOS automatically installs an exception handler for DBOSException:
from dbos import error
@app.get ( "/data/ {id} " )
@DBOS.workflow ()
async def get_data ( id : str ) -> dict :
if not id .isdigit():
raise error.DBOSException(
"Invalid ID format" ,
dbos_error_code = "INVALID_ID" ,
status_code = 400
)
return { "id" : id }
The exception handler returns a JSON response:
{
"message" : "Invalid ID format" ,
"dbos_error_code" : "INVALID_ID" ,
"dbos_error" : "DBOSException"
}
Complete Example
Here’s a full example combining multiple DBOS features with FastAPI:
import sqlalchemy as sa
from fastapi import FastAPI
from pydantic import BaseModel
from dbos import DBOS , DBOSConfig
app = FastAPI()
config = DBOSConfig(
name = "order-service" ,
application_database_url = "postgresql://user:password@localhost/orders"
)
DBOS( fastapi = app, config = config)
class Order ( BaseModel ):
user_id: str
item_id: str
quantity: int
@app.post ( "/orders" )
@DBOS.workflow ()
async def create_order ( order : Order) -> dict :
# Durable workflow - automatically recovers on failure
order_id = await save_order_txn(order)
await send_confirmation_step(order.user_id, order_id)
return { "order_id" : order_id, "status" : "confirmed" }
@DBOS.transaction ()
async def save_order_txn ( order : Order) -> str :
# Transactional database operation
result = DBOS .sql_session.execute(
sa.text(
"INSERT INTO orders (user_id, item_id, quantity) "
"VALUES (:user, :item, :qty) RETURNING id"
),
{ "user" : order.user_id, "item" : order.item_id, "qty" : order.quantity}
).fetchone()
return str (result[ 0 ])
@DBOS.step ()
async def send_confirmation_step ( user_id : str , order_id : str ) -> None :
# External API call - automatically retried on failure
print ( f "Sending confirmation to { user_id } for order { order_id } " )
@app.get ( "/orders/ {order_id} " )
@DBOS.transaction ()
async def get_order ( order_id : str ) -> dict :
result = DBOS .sql_session.execute(
sa.text( "SELECT * FROM orders WHERE id = :id" ),
{ "id" : order_id}
).fetchone()
if not result:
from dbos import error
raise error.DBOSException( "Order not found" , status_code = 404 )
return { "order_id" : result[ 0 ], "user_id" : result[ 1 ]}
Running Your Application
Start your FastAPI application with uvicorn:
uvicorn main:app --reload
DBOS automatically:
Initializes on startup
Recovers any pending workflows from previous runs
Cleans up on shutdown
When using DBOS with FastAPI, always pass the FastAPI app instance to the DBOS constructor. Don’t call DBOS.launch() manually - the lifespan middleware handles initialization.
Advanced Configuration
Request ID Customization
By default, DBOS looks for request IDs in the dbos-request-id header. You can provide custom request IDs:
response = await client.get(
"/endpoint" ,
headers = { "dbos-request-id" : "custom-request-123" }
)
Telemetry and Tracing
DBOS automatically creates OpenTelemetry spans for each request. Configure OTLP endpoints in your config:
config = DBOSConfig(
name = "my-app" ,
enable_otlp = True ,
otlp_traces_endpoints = [ "http://localhost:4318/v1/traces" ],
otlp_logs_endpoints = [ "http://localhost:4318/v1/logs" ]
)
Testing
Use FastAPI’s TestClient for testing DBOS endpoints:
from fastapi.testclient import TestClient
def test_endpoint ():
client = TestClient(app)
response = client.get( "/workflow/test/data" )
assert response.status_code == 200
assert response.json() == { "result" : "test1data" }
Next Steps