Skip to main content
DBOS Transact integrates with Flask to provide durable execution, automatic recovery, and workflow orchestration for your Flask applications.

Quick Start

Integrating DBOS with Flask is straightforward:
1

Install Dependencies

pip install dbos flask
2

Initialize DBOS with Flask

from flask import Flask
from dbos import DBOS, DBOSConfig

app = Flask(__name__)

# Initialize DBOS with your Flask app
config = DBOSConfig(name="my-flask-app")
DBOS(flask=app, config=config)
3

Use DBOS Decorators

@app.route("/process/<user_id>")
@DBOS.workflow()
def process_user(user_id: str) -> dict:
    result = process_transaction(user_id)
    return {"user_id": user_id, "result": result}

@DBOS.transaction()
def process_transaction(user_id: str) -> str:
    # Database operation
    return f"Processed {user_id}"
4

Run Your Application

flask run

How It Works

When you pass your Flask app to DBOS, it automatically installs WSGI middleware that:
  1. Tracks requests with unique request IDs
  2. Enables recovery by capturing request context
  3. Supports idempotency via the dbos-idempotency-key header
  4. Manages lifecycle with Flask’s startup and shutdown hooks

Middleware Implementation

The DBOS Flask middleware wraps your Flask application at the WSGI level:
# This is done automatically when you call DBOS(flask=app)
from dbos._flask import setup_flask_middleware

setup_flask_middleware(app)

# Internally, this wraps your app:
app.wsgi_app = FlaskMiddleware(app.wsgi_app)
The middleware captures:
  • Request URL and path
  • HTTP method
  • Client IP address
  • Request ID from headers or auto-generated
  • Workflow ID for idempotent requests

Usage Patterns

Basic Routes with DBOS

@app.route("/workflow/<var1>/<var2>")
@DBOS.workflow()
def test_workflow(var1: str, var2: str) -> dict:
    # This entire route is a durable workflow
    res1 = test_transaction(var1)
    res2 = test_step(var2)
    return {"result": res1 + res2}

Calling Workflows from Routes

You can call workflows from regular Flask routes without making the route itself a workflow:
from flask import jsonify

@app.route("/start-job/<job_id>")
def start_job(job_id: str) -> dict:
    # Not a workflow - just a regular route
    # But it starts a durable workflow
    handle = DBOS.start_workflow(background_job, job_id)
    
    return jsonify({
        "job_id": job_id,
        "workflow_id": handle.workflow_id,
        "status": "started"
    })

@DBOS.workflow()
def background_job(job_id: str) -> dict:
    # This is the actual durable workflow
    result = process_data(job_id)
    return {"job_id": job_id, "completed": result}

@DBOS.step()
def process_data(job_id: str) -> str:
    # Processing logic here
    return f"Processed {job_id}"

Idempotency with Flask

Use the dbos-idempotency-key header to ensure requests execute exactly once:
import requests

# Client code
response = requests.post(
    "http://localhost:5000/payment",
    json={"amount": 100, "user_id": "123"},
    headers={"dbos-idempotency-key": "payment-abc-123"}
)
On the server:
@app.route("/payment", methods=["POST"])
def handle_payment() -> dict:
    # The idempotency key from headers determines the workflow ID
    # If this request is sent multiple times, the workflow runs only once
    data = request.get_json()
    handle = process_payment_workflow(data["amount"], data["user_id"])
    return {"status": "processed", "workflow_id": handle.workflow_id}

@DBOS.workflow()
def process_payment_workflow(amount: int, user_id: str) -> dict:
    transaction_id = record_payment(amount, user_id)
    send_receipt(user_id, transaction_id)
    return {"transaction_id": transaction_id}

Request Context Access

Access DBOS context from within your Flask routes:
from dbos._context import assert_current_dbos_context

@app.route("/info")
def get_info() -> dict:
    ctx = assert_current_dbos_context()
    
    return {
        "is_workflow": ctx.is_within_workflow(),
        "request_id": ctx.request.request_id if ctx.request else None
    }

Complete Example

Here’s a full Flask application with DBOS:
import sqlalchemy as sa
from flask import Flask, request, jsonify
from dbos import DBOS, DBOSConfig

app = Flask(__name__)
app.config["TESTING"] = False

config = DBOSConfig(
    name="flask-orders",
    application_database_url="postgresql://user:password@localhost/orders"
)
DBOS(flask=app, config=config)

# Routes
@app.route("/orders", methods=["POST"])
def create_order() -> dict:
    """Create a new order - starts a durable workflow"""
    data = request.get_json()
    handle = DBOS.start_workflow(
        process_order_workflow,
        data["user_id"],
        data["items"]
    )
    return jsonify({
        "order_id": handle.workflow_id,
        "status": "processing"
    })

@app.route("/orders/<order_id>")
@DBOS.transaction()
def get_order(order_id: str) -> dict:
    """Get order details from database"""
    result = DBOS.sql_session.execute(
        sa.text("SELECT * FROM orders WHERE id = :id"),
        {"id": order_id}
    ).fetchone()
    
    if not result:
        return {"error": "Order not found"}, 404
    
    return jsonify({
        "order_id": result[0],
        "user_id": result[1],
        "status": result[2]
    })

# Workflows and Steps
@DBOS.workflow()
def process_order_workflow(user_id: str, items: list) -> dict:
    """Durable workflow for order processing"""
    # Save to database
    order_id = save_order_txn(user_id, items)
    
    # External API calls (retriable)
    payment_result = process_payment_step(order_id, user_id)
    
    if payment_result["success"]:
        notify_user_step(user_id, order_id)
        return {"order_id": order_id, "status": "completed"}
    else:
        return {"order_id": order_id, "status": "payment_failed"}

@DBOS.transaction()
def save_order_txn(user_id: str, items: list) -> str:
    """Transactional database operation"""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (user_id, items, status) "
            "VALUES (:user, :items, 'pending') RETURNING id"
        ),
        {"user": user_id, "items": str(items)}
    ).fetchone()
    return str(result[0])

@DBOS.step()
def process_payment_step(order_id: str, user_id: str) -> dict:
    """External payment processing - automatically retried on failure"""
    print(f"Processing payment for order {order_id}")
    # Call payment API
    return {"success": True, "transaction_id": "txn_123"}

@DBOS.step()
def notify_user_step(user_id: str, order_id: str) -> None:
    """Send notification - automatically retried on failure"""
    print(f"Notifying user {user_id} about order {order_id}")
    # Send email/SMS

if __name__ == "__main__":
    app.run(debug=True)

Workflow Recovery

DBOS automatically recovers interrupted workflows. Here’s how it works:
import uuid

@DBOS.workflow()
def recoverable_workflow(data: str) -> tuple[str, str]:
    workflow_id = DBOS.workflow_id
    result = process_step(data)
    return result, workflow_id

@app.route("/test/<data>")
def test_recovery(data: str) -> dict:
    # First call with idempotency key
    result1, id1 = recoverable_workflow(data)
    
    # Second call returns the same workflow ID
    result2, id2 = recoverable_workflow(data)
    
    return {
        "result1": result1,
        "result2": result2,
        "same_workflow": id1 == id2
    }
To test recovery:
# Send request with idempotency key
response = requests.get(
    "http://localhost:5000/test/data",
    headers={"dbos-idempotency-key": str(uuid.uuid4())}
)

# Even if the server crashes mid-workflow,
# DBOS will recover and complete it on restart
DBOS tracks workflow execution in its system database. On restart, it automatically detects and recovers any pending workflows, ensuring exactly-once execution.

Testing

Test Flask applications with DBOS using Flask’s test client:
def test_flask_endpoint():
    app.config["TESTING"] = True
    client = app.test_client()
    
    # Test regular endpoint
    response = client.get("/endpoint/a/b")
    assert response.status_code == 200
    assert response.json == {"result": "a1b"}
    
    # Test with idempotency key
    headers = {"dbos-idempotency-key": str(uuid.uuid4())}
    response = client.post("/orders", json={"user_id": "123"}, headers=headers)
    assert response.status_code == 200

Lifecycle Management

DBOS integrates with Flask’s application lifecycle:
@app.before_first_request
def before_first_request():
    print("First request - DBOS is already initialized")

@app.teardown_appcontext
def teardown(exception=None):
    # DBOS cleanup is handled automatically
    pass
When using DBOS with Flask, don’t call DBOS.launch() or DBOS.destroy() manually. The Flask middleware handles initialization and cleanup automatically.

Request ID Tracking

DBOS automatically tracks requests with unique IDs:
# Client provides request ID
response = requests.get(
    "http://localhost:5000/endpoint",
    headers={"dbos-request-id": "custom-request-123"}
)

# Or DBOS generates one automatically
response = requests.get("http://localhost:5000/endpoint")
# Request ID is auto-generated and tracked

Error Handling

Handle errors in Flask routes with DBOS:
from dbos import error

@app.route("/data/<id>")
@DBOS.workflow()
def get_data(id: str) -> dict:
    if not id.isdigit():
        raise error.DBOSException(
            "Invalid ID format",
            dbos_error_code="INVALID_ID",
            status_code=400
        )
    
    result = fetch_data_txn(id)
    return {"data": result}

@app.errorhandler(error.DBOSException)
def handle_dbos_error(e):
    return jsonify({
        "error": str(e.message),
        "code": e.dbos_error_code
    }), e.status_code or 500

Configuration

Configure DBOS with your Flask app:
config = DBOSConfig(
    name="my-flask-app",
    application_database_url="postgresql://localhost/mydb",
    log_level="INFO",
    admin_port=8080,
    enable_otlp=True,
    otlp_traces_endpoints=["http://localhost:4318/v1/traces"]
)

DBOS(flask=app, config=config)

Next Steps

Build docs developers (and LLMs) love