The DBOS Flask middleware wraps your Flask application at the WSGI level:
# This is done automatically when you call DBOS(flask=app)from dbos._flask import setup_flask_middlewaresetup_flask_middleware(app)# Internally, this wraps your app:app.wsgi_app = FlaskMiddleware(app.wsgi_app)
You can call workflows from regular Flask routes without making the route itself a workflow:
from flask import jsonify@app.route("/start-job/<job_id>")def start_job(job_id: str) -> dict: # Not a workflow - just a regular route # But it starts a durable workflow handle = DBOS.start_workflow(background_job, job_id) return jsonify({ "job_id": job_id, "workflow_id": handle.workflow_id, "status": "started" })@DBOS.workflow()def background_job(job_id: str) -> dict: # This is the actual durable workflow result = process_data(job_id) return {"job_id": job_id, "completed": result}@DBOS.step()def process_data(job_id: str) -> str: # Processing logic here return f"Processed {job_id}"
@app.route("/payment", methods=["POST"])def handle_payment() -> dict: # The idempotency key from headers determines the workflow ID # If this request is sent multiple times, the workflow runs only once data = request.get_json() handle = process_payment_workflow(data["amount"], data["user_id"]) return {"status": "processed", "workflow_id": handle.workflow_id}@DBOS.workflow()def process_payment_workflow(amount: int, user_id: str) -> dict: transaction_id = record_payment(amount, user_id) send_receipt(user_id, transaction_id) return {"transaction_id": transaction_id}
DBOS automatically recovers interrupted workflows. Here’s how it works:
import uuid@DBOS.workflow()def recoverable_workflow(data: str) -> tuple[str, str]: workflow_id = DBOS.workflow_id result = process_step(data) return result, workflow_id@app.route("/test/<data>")def test_recovery(data: str) -> dict: # First call with idempotency key result1, id1 = recoverable_workflow(data) # Second call returns the same workflow ID result2, id2 = recoverable_workflow(data) return { "result1": result1, "result2": result2, "same_workflow": id1 == id2 }
To test recovery:
# Send request with idempotency keyresponse = requests.get( "http://localhost:5000/test/data", headers={"dbos-idempotency-key": str(uuid.uuid4())})# Even if the server crashes mid-workflow,# DBOS will recover and complete it on restart
DBOS tracks workflow execution in its system database. On restart, it automatically detects and recovers any pending workflows, ensuring exactly-once execution.
DBOS integrates with Flask’s application lifecycle:
@app.before_first_requestdef before_first_request(): print("First request - DBOS is already initialized")@app.teardown_appcontextdef teardown(exception=None): # DBOS cleanup is handled automatically pass
When using DBOS with Flask, don’t call DBOS.launch() or DBOS.destroy() manually. The Flask middleware handles initialization and cleanup automatically.
DBOS automatically tracks requests with unique IDs:
# Client provides request IDresponse = requests.get( "http://localhost:5000/endpoint", headers={"dbos-request-id": "custom-request-123"})# Or DBOS generates one automaticallyresponse = requests.get("http://localhost:5000/endpoint")# Request ID is auto-generated and tracked