Skip to main content

What is a transaction?

A transaction is a function decorated with @DBOS.transaction() that performs database operations within an ACID-compliant database transaction. Transactions in DBOS provide direct access to your application database through SQLAlchemy, with automatic retry on serialization errors and integration with workflow durability.
from dbos import DBOS
from sqlalchemy import select
from models import User, Order

@DBOS.transaction()
def create_order(user_id: int, items: list[dict]):
    """Create an order in a database transaction"""
    with DBOS.sql_session() as session:
        # Check user exists
        user = session.execute(
            select(User).where(User.id == user_id)
        ).scalar_one()
        
        # Create order
        order = Order(
            user_id=user.id,
            items=items,
            total=sum(item['price'] for item in items)
        )
        session.add(order)
        session.flush()
        
        return {"order_id": order.id, "total": order.total}

Key characteristics

ACID guarantees

All-or-nothing execution with full database consistency

Automatic retry

Retries on deadlocks and serialization failures

Durable

Results are checkpointed for workflow recovery

Isolated

Configurable isolation levels (SERIALIZABLE, REPEATABLE READ, READ COMMITTED)

Transaction decorator

The @DBOS.transaction() decorator accepts optional parameters:
name
str
Custom name for the transaction. Defaults to the function name.
isolation_level
IsolationLevel
SQL isolation level: "SERIALIZABLE", "REPEATABLE READ", or "READ COMMITTED".
@DBOS.transaction(
    name="transfer_funds_v2",
    isolation_level="SERIALIZABLE"
)
def transfer_money(from_account: int, to_account: int, amount: float):
    with DBOS.sql_session() as session:
        # Debit from source
        source = session.execute(
            select(Account).where(Account.id == from_account)
        ).scalar_one()
        source.balance -= amount
        
        # Credit to destination
        dest = session.execute(
            select(Account).where(Account.id == to_account)
        ).scalar_one()
        dest.balance += amount
        
        return {"transferred": amount}

Database access

Access the database through SQLAlchemy within transactions:

Using DBOS.sql_session()

The recommended way to access the database:
@DBOS.transaction()
def get_user_orders(user_id: int):
    with DBOS.sql_session() as session:
        # Execute queries
        orders = session.execute(
            select(Order).where(Order.user_id == user_id)
        ).scalars().all()
        
        return [{
            "id": o.id,
            "total": o.total,
            "created_at": o.created_at
        } for o in orders]

Session lifecycle

The session is automatically committed when the transaction function returns successfully. If an exception is raised, the session is rolled back.
@DBOS.transaction()
def update_user_email(user_id: int, new_email: str):
    with DBOS.sql_session() as session:
        user = session.execute(
            select(User).where(User.id == user_id)
        ).scalar_one_or_none()
        
        if user is None:
            raise ValueError(f"User {user_id} not found")
        
        # Update email
        user.email = new_email
        # Automatic commit on successful return
        
        return {"user_id": user.id, "email": user.email}

Isolation levels

Choose an isolation level based on your consistency requirements:

SERIALIZABLE (strongest)

Provides the highest isolation. Transactions execute as if they run sequentially.
@DBOS.transaction(isolation_level="SERIALIZABLE")
def atomic_transfer(from_id: int, to_id: int, amount: float):
    """Guaranteed no concurrent modifications can interfere"""
    with DBOS.sql_session() as session:
        # Complete isolation from other transactions
        pass

REPEATABLE READ (default)

Prevents dirty reads and non-repeatable reads. Good balance of consistency and performance.
@DBOS.transaction(isolation_level="REPEATABLE READ")
def read_consistent_data(user_id: int):
    """Data read at the start remains consistent throughout"""
    with DBOS.sql_session() as session:
        # Reads are repeatable within this transaction
        pass

READ COMMITTED (weakest)

Only prevents dirty reads. Best performance but least isolation.
@DBOS.transaction(isolation_level="READ COMMITTED")
def read_latest_data(resource_id: int):
    """See committed changes from other transactions"""
    with DBOS.sql_session() as session:
        # Faster but less isolated
        pass

Retry behavior

Transactions are automatically retried on:
  • Deadlocks (when two transactions are waiting for each other)
  • Serialization failures (when concurrent transactions conflict)
  • Network errors (connection drops, timeouts)
@DBOS.transaction()
def high_contention_update(resource_id: int):
    """Automatically retried if serialization fails"""
    with DBOS.sql_session() as session:
        resource = session.execute(
            select(Resource).where(Resource.id == resource_id)
        ).scalar_one()
        
        # If another transaction modified this resource,
        # DBOS automatically retries this transaction
        resource.counter += 1
        
        return resource.counter
Transactions should be idempotent when possible, since they may be retried multiple times.

Using with workflows

Transactions integrate seamlessly with workflows:
@DBOS.workflow()
def process_payment_workflow(order_id: str, amount: float):
    # Charge payment (step - external API call)
    payment_id = charge_payment(amount)
    
    # Record in database (transaction)
    record_payment(order_id, payment_id, amount)
    
    # Update order status (transaction)
    update_order_status(order_id, "paid")
    
    return {"order_id": order_id, "status": "paid"}

@DBOS.step()
def charge_payment(amount: float):
    return payment_api.charge(amount)

@DBOS.transaction()
def record_payment(order_id: str, payment_id: str, amount: float):
    with DBOS.sql_session() as session:
        payment = Payment(
            order_id=order_id,
            payment_id=payment_id,
            amount=amount
        )
        session.add(payment)

@DBOS.transaction()
def update_order_status(order_id: str, status: str):
    with DBOS.sql_session() as session:
        order = session.execute(
            select(Order).where(Order.id == order_id)
        ).scalar_one()
        order.status = status

System vs application database

DBOS uses two databases:
  • System database: Stores workflow execution state (checkpoints, events, queues)
  • Application database: Your app’s data (users, orders, etc.)
# By default, transactions use the application database
@DBOS.transaction()
def app_transaction():
    with DBOS.sql_session() as session:
        # This accesses your application database
        pass

# The system database is managed by DBOS internally
# You typically don't access it directly
You can configure both databases to use the same Postgres instance or separate them for better isolation.

Raw SQL queries

Execute raw SQL when needed:
@DBOS.transaction()
def execute_raw_sql(user_id: int):
    with DBOS.sql_session() as session:
        # Execute raw SQL
        result = session.execute(
            sa.text("""
                SELECT o.id, o.total, u.name
                FROM orders o
                JOIN users u ON o.user_id = u.id
                WHERE u.id = :user_id
                ORDER BY o.created_at DESC
                LIMIT 10
            """),
            {"user_id": user_id}
        )
        
        return [{
            "order_id": row[0],
            "total": row[1],
            "user_name": row[2]
        } for row in result]

Async transactions

Transactions can be async for improved concurrency:
@DBOS.transaction()
async def async_database_operation(user_id: int):
    with DBOS.sql_session() as session:
        # SQLAlchemy operations (synchronous session)
        user = session.execute(
            select(User).where(User.id == user_id)
        ).scalar_one()
        
        return {"id": user.id, "name": user.name}
Even in async transactions, the database session itself is synchronous (SQLAlchemy limitation). The async decorator helps with workflow concurrency, not database operations.

Error handling

Handle database errors in your workflows:
from sqlalchemy.exc import IntegrityError, NoResultFound

@DBOS.workflow()
def safe_user_creation(email: str, name: str):
    try:
        user_id = create_user(email, name)
        return {"user_id": user_id, "created": True}
    except IntegrityError:
        # User already exists (unique constraint violation)
        DBOS.logger.warning(f"User {email} already exists")
        user_id = find_user_by_email(email)
        return {"user_id": user_id, "created": False}

@DBOS.transaction()
def create_user(email: str, name: str):
    with DBOS.sql_session() as session:
        user = User(email=email, name=name)
        session.add(user)
        session.flush()
        return user.id

@DBOS.transaction()
def find_user_by_email(email: str):
    with DBOS.sql_session() as session:
        user = session.execute(
            select(User).where(User.email == email)
        ).scalar_one()
        return user.id

Best practices

Minimize the time spent holding database locks to improve concurrency.
# ❌ Long transaction
@DBOS.transaction()
def bad_transaction():
    with DBOS.sql_session() as session:
        data = session.execute(select(Data)).scalars().all()
        # Long computation while holding lock
        processed = expensive_computation(data)
        update_results(session, processed)

# ✅ Short transaction
@DBOS.workflow()
def good_workflow():
    data = load_data()  # Transaction
    processed = process_data(data)  # Step (no lock)
    save_results(processed)  # Transaction
Choose the weakest isolation level that meets your requirements for better performance.
# Use SERIALIZABLE only when necessary
@DBOS.transaction(isolation_level="SERIALIZABLE")
def critical_financial_operation():
    pass

# READ COMMITTED is often sufficient
@DBOS.transaction(isolation_level="READ COMMITTED")
def read_latest_status():
    pass
Design transactions so they can be safely retried.
# ❌ Not idempotent
@DBOS.transaction()
def increment_counter():
    with DBOS.sql_session() as session:
        counter = session.execute(select(Counter)).scalar_one()
        counter.value += 1  # Different result on retry

# ✅ Idempotent
@DBOS.transaction()
def set_counter(value: int):
    with DBOS.sql_session() as session:
        counter = session.execute(select(Counter)).scalar_one()
        counter.value = value  # Same result on retry
Catch and handle database errors appropriately.
from sqlalchemy.exc import IntegrityError

@DBOS.transaction()
def upsert_user(email: str, name: str):
    with DBOS.sql_session() as session:
        try:
            user = User(email=email, name=name)
            session.add(user)
            session.flush()
            return {"user_id": user.id, "action": "created"}
        except IntegrityError:
            session.rollback()
            user = session.execute(
                select(User).where(User.email == email)
            ).scalar_one()
            user.name = name
            return {"user_id": user.id, "action": "updated"}

Next steps

Workflows

Learn about durable workflows

Steps

Learn about workflow steps

Database integration

Configure database connections

Workflow tutorial

Build workflows with transactions

Build docs developers (and LLMs) love