Skip to main content

Introduction

This quickstart will guide you through creating your first durable workflow with DBOS Transact. You’ll build a simple workflow, run it, simulate a crash, and watch it automatically recover.

Prerequisites

Before you begin, ensure you have:
  • Python 3.10 or later installed
  • pip for package installation

Installation and Setup

1

Install DBOS

Install the DBOS library using pip:
pip install dbos
Verify the installation:
python -c "import dbos; print('DBOS installed successfully!')"
2

Create Your Project

Create a new directory for your project and a Python file:
mkdir dbos-quickstart
cd dbos-quickstart
touch quickstart.py
3

Configure DBOS

Create a dbos-config.yaml file in your project directory:
dbos-config.yaml
name: quickstart-app
system_database_url: sqlite:///dbos.db
log_level: INFO
We’re using SQLite for simplicity. For production use, switch to Postgres.

Build Your First Workflow

Open quickstart.py and add the following code:
quickstart.py
from dbos import DBOS
import time

# Initialize DBOS
dbos = DBOS()

@DBOS.step()
def step_one():
    """This step simulates some work."""
    print("Executing step 1...")
    time.sleep(1)
    return "Step 1 completed"

@DBOS.step()
def step_two(previous_result: str):
    """This step uses the result from step 1."""
    print(f"Executing step 2 with: {previous_result}")
    time.sleep(1)
    return "Step 2 completed"

@DBOS.step()
def step_three(previous_result: str):
    """This step demonstrates recovery."""
    print(f"Executing step 3 with: {previous_result}")
    time.sleep(1)
    return "Workflow completed successfully!"

@DBOS.workflow()
def my_first_workflow():
    """A simple three-step workflow."""
    DBOS.logger.info("Starting workflow...")
    
    # Step 1: First operation
    result1 = step_one()
    
    # Step 2: Second operation
    result2 = step_two(result1)
    
    # Step 3: Final operation
    result3 = step_three(result2)
    
    DBOS.logger.info("Workflow finished!")
    return result3

if __name__ == "__main__":
    # Launch DBOS (initializes database and starts recovery)
    DBOS.launch()
    
    # Run the workflow
    result = my_first_workflow()
    print(f"\nResult: {result}")
    
    # Clean up
    DBOS.destroy()

Run Your Workflow

Execute your workflow for the first time:
python quickstart.py
You should see output like:
INFO:dbos:Launching DBOS...
INFO:dbos:Starting workflow...
Executing step 1...
Executing step 2 with: Step 1 completed
Executing step 3 with: Step 2 completed
INFO:dbos:Workflow finished!

Result: Workflow completed successfully!
Each step is automatically checkpointed in the database. Let’s see recovery in action next!

Demonstrate Automatic Recovery

Now let’s simulate a crash and watch DBOS automatically recover your workflow.
1

Modify the Workflow to Crash

Update quickstart.py to crash after step 2:
quickstart.py
from dbos import DBOS
import time
import sys

dbos = DBOS()

# Track which run this is
run_count = 0

@DBOS.step()
def step_one():
    print("Executing step 1...")
    time.sleep(1)
    return "Step 1 completed"

@DBOS.step()
def step_two(previous_result: str):
    print(f"Executing step 2 with: {previous_result}")
    time.sleep(1)
    return "Step 2 completed"

@DBOS.step()
def step_three(previous_result: str):
    print(f"Executing step 3 with: {previous_result}")
    time.sleep(1)
    return "Workflow completed successfully!"

@DBOS.workflow()
def recoverable_workflow():
    global run_count
    run_count += 1
    
    DBOS.logger.info(f"Starting workflow (run #{run_count})...")
    
    # Step 1
    result1 = step_one()
    
    # Step 2
    result2 = step_two(result1)
    
    # Simulate a crash on the first run
    if run_count == 1:
        print("\n💥 CRASH! Simulating application failure...\n")
        sys.exit(1)
    
    # Step 3 - only reached after recovery
    result3 = step_three(result2)
    
    DBOS.logger.info("Workflow finished!")
    return result3

if __name__ == "__main__":
    DBOS.launch()
    
    try:
        result = recoverable_workflow()
        print(f"\nResult: {result}")
    except SystemExit:
        print("Application crashed before completion")
    finally:
        DBOS.destroy()
2

Run and Crash

Run the workflow - it will crash after step 2:
python quickstart.py
You’ll see:
INFO:dbos:Starting workflow (run #1)...
Executing step 1...
Executing step 2 with: Step 1 completed

💥 CRASH! Simulating application failure...

Application crashed before completion
3

Recover Automatically

Run it again - DBOS will automatically recover:
python quickstart.py
Notice that steps 1 and 2 are skipped (their results were saved) and execution resumes from step 3:
INFO:dbos:Launching DBOS...
INFO:dbos:Recovering workflows...
INFO:dbos:Starting workflow (run #2)...
Executing step 3 with: Step 2 completed
INFO:dbos:Workflow finished!

Result: Workflow completed successfully!
The workflow automatically resumed from where it left off! Steps 1 and 2 were not re-executed.

How Recovery Works

When your application restarts, DBOS:
  1. Detects incomplete workflows in the database
  2. Replays the workflow from the beginning
  3. Skips steps that already completed (using cached results)
  4. Resumes execution from the first incomplete step
This happens automatically - you don’t need to write any recovery code!
Recovery is triggered when you call DBOS.launch(). All incomplete workflows resume automatically.

Add Database Transactions

Let’s enhance our workflow with database operations. Update your code to include a transaction:
quickstart.py
from dbos import DBOS
import sqlalchemy as sa

dbos = DBOS()

@DBOS.transaction()
def save_progress(step_name: str, status: str):
    """Save workflow progress to the database."""
    DBOS.sql_session.execute(
        sa.text(
            "CREATE TABLE IF NOT EXISTS workflow_progress "
            "(step_name TEXT, status TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)"
        )
    )
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO workflow_progress (step_name, status) "
            "VALUES (:step, :status)"
        ),
        {"step": step_name, "status": status}
    )
    print(f"Saved: {step_name} - {status}")

@DBOS.step()
def process_data(data: str):
    """Process some data."""
    result = f"Processed: {data}"
    print(result)
    return result

@DBOS.workflow()
def database_workflow():
    """Workflow with database operations."""
    DBOS.logger.info("Starting database workflow...")
    
    # Save initial state
    save_progress("start", "initiated")
    
    # Do some work
    result = process_data("important data")
    
    # Save completion
    save_progress("complete", "finished")
    
    return result

if __name__ == "__main__":
    DBOS.launch()
    result = database_workflow()
    print(f"\nWorkflow result: {result}")
    DBOS.destroy()
Run it:
python quickstart.py
Transactions provide ACID guarantees. If the workflow crashes, completed transactions are preserved and never re-executed.

Next Steps

Congratulations! You’ve built your first durable workflow with DBOS Transact. Here’s what to explore next:

Core Concepts

Learn about workflows, steps, and transactions

Queue Tutorial

Control concurrency with durable queues

Database Transactions

Master ACID database operations

Workflow Tutorial

Build advanced workflows step-by-step

Build docs developers (and LLMs) love