Skip to main content

DBOS Transact: Lightweight Durable Workflows

DBOS provides lightweight durable workflows built on top of Postgres. Instead of managing your own workflow orchestrator or task queue system, you can use DBOS to add durable workflows and queues to your program in just a few lines of code.

What Makes DBOS Different?

DBOS is entirely contained in an open-source Python library. There’s no additional infrastructure for you to configure or manage—just install the library, connect it to Postgres, and annotate your functions to make them durable.
from dbos import DBOS

@DBOS.step()
def step_one():
    # This step is automatically checkpointed
    return "Step 1 complete"

@DBOS.step()
def step_two():
    # This step is also checkpointed
    return "Step 2 complete"

@DBOS.workflow()
def my_workflow():
    result1 = step_one()
    result2 = step_two()
    return f"{result1}, {result2}"

# If your program crashes, the workflow automatically resumes
# from the last completed step when it restarts
my_workflow()

When Should I Use DBOS?

You should consider using DBOS if your application needs to reliably handle failures. For example:
  • Payments services that must reliably process transactions even if servers crash mid-operation
  • Long-running data pipelines that need to resume seamlessly from checkpoints rather than restart from the beginning
  • AI agents that orchestrate multiple unreliable API calls and need to track progress
  • Background job processing that requires guaranteed execution and result delivery
Handling failures is costly and complicated, requiring complex state management and recovery logic as well as heavyweight tools like external orchestration services. DBOS makes it simpler: annotate your code to checkpoint it in Postgres and automatically recover from any failure.

Quickstart

Get up and running with DBOS in under 5 minutes

Installation

Detailed installation and setup instructions

Core Concepts

Learn about workflows, steps, and transactions

API Reference

Complete API documentation

Key Features

Durable Workflows

DBOS workflows make your program durable by checkpointing its state in Postgres. If your program ever fails, when it restarts all your workflows automatically resume from the last completed step. Workflows are particularly useful for:
  • Orchestrating business processes so they seamlessly recover from any failure
  • Building observable and fault-tolerant data pipelines
  • Operating an AI agent, or any application that relies on unreliable or non-deterministic APIs

Durable Queues

DBOS queues help you durably run tasks in the background. You can enqueue a task (which can be a single step or an entire workflow) from a durable workflow and one of your processes will pick it up for execution.
from dbos import DBOS, Queue

queue = Queue("example_queue")

@DBOS.step()
def process_task(task):
    # Process the task
    return f"Processed: {task}"

@DBOS.workflow()
def process_tasks(tasks):
    task_handles = []
    # Enqueue each task so all tasks are processed concurrently
    for task in tasks:
        handle = queue.enqueue(process_task, task)
        task_handles.append(handle)
    # Wait for each task to complete and retrieve its result
    return [handle.get_result() for handle in task_handles]
Queues provide flow control, so you can limit concurrency, set timeouts, rate limit execution, deduplicate tasks, or prioritize tasks. They don’t require a separate queueing service or message broker—just Postgres.

Programmatic Workflow Management

Your workflows are stored as rows in a Postgres table, so you have full programmatic control over them. Write scripts to query workflow executions, batch pause or resume workflows, or even restart failed workflows from a specific step.
from dbos import DBOSClient

# Create a DBOS client connected to your Postgres database
client = DBOSClient(database_url)

# Find all workflows that errored between 3:00 and 5:00 AM UTC
workflows = client.list_workflows(
    status="ERROR", 
    start_time="2025-04-22T03:00:00Z", 
    end_time="2025-04-22T05:00:00Z"
)

for workflow in workflows:
    # Check which workflows failed due to a service outage
    steps = client.list_workflow_steps(workflow)
    if len(steps) >= 3 and is_service_outage(steps[2]["error"]):
        # Restart those workflows from Step 2
        DBOS.fork_workflow(workflow.workflow_id, 2)

Exactly-Once Event Processing

Use DBOS to build reliable webhooks, event listeners, or Kafka consumers by starting a workflow exactly-once in response to an event. Acknowledge the event immediately while reliably processing it in the background.
from dbos import DBOS, SetWorkflowID

def handle_message(request):
    event_id = request.body["event_id"]
    # Use the event ID as an idempotency key
    with SetWorkflowID(event_id):
        # Start the workflow in the background
        DBOS.start_workflow(message_workflow, request.body["event"])

Durable Scheduling

Schedule workflows using cron syntax, or use durable sleep to pause workflows for as long as you like (even days or weeks) before executing.
from datetime import datetime

@DBOS.scheduled('* * * * *')  # Run once every minute
@DBOS.workflow()
def example_scheduled_workflow(scheduled_time: datetime, actual_time: datetime):
    DBOS.logger.info("I am a workflow scheduled to run once a minute.")

@DBOS.workflow()
def reminder_workflow(email: str, time_to_sleep: int):
    send_confirmation_email(email)
    DBOS.sleep(time_to_sleep)  # Sleep durably through interruptions
    send_reminder_email(email)

Durable Notifications

Pause your workflow executions until a notification is received, or emit events from your workflow to send progress updates to external clients. All notifications are stored in Postgres with exactly-once semantics.
@DBOS.workflow()
def billing_workflow():
    # Calculate the charge and submit to payments service
    calculate_and_submit_bill()
    
    # Wait durably for payment confirmation
    payment_status = DBOS.recv(PAYMENT_STATUS, timeout=payment_service_timeout)
    
    if payment_status is not None and payment_status == "paid":
        handle_successful_payment()
    else:
        handle_failed_payment_or_timeout()

Next Steps

Get Started with the Quickstart

Build your first durable workflow in minutes

Workflow Tutorial

Learn how to build with durable workflows and queues

Queue Tutorial

Learn how to use durable queues for background processing

Join Our Community

Get help and connect with other developers

Build docs developers (and LLMs) love