A transaction is a function decorated with @DBOS.transaction() that performs database operations within an ACID-compliant database transaction. Transactions in DBOS provide direct access to your application database through SQLAlchemy, with automatic retry on serialization errors and integration with workflow durability.
from dbos import DBOSfrom sqlalchemy import selectfrom models import User, Order@DBOS.transaction()def create_order(user_id: int, items: list[dict]): """Create an order in a database transaction""" with DBOS.sql_session() as session: # Check user exists user = session.execute( select(User).where(User.id == user_id) ).scalar_one() # Create order order = Order( user_id=user.id, items=items, total=sum(item['price'] for item in items) ) session.add(order) session.flush() return {"order_id": order.id, "total": order.total}
Provides the highest isolation. Transactions execute as if they run sequentially.
@DBOS.transaction(isolation_level="SERIALIZABLE")def atomic_transfer(from_id: int, to_id: int, amount: float): """Guaranteed no concurrent modifications can interfere""" with DBOS.sql_session() as session: # Complete isolation from other transactions pass
Prevents dirty reads and non-repeatable reads. Good balance of consistency and performance.
@DBOS.transaction(isolation_level="REPEATABLE READ")def read_consistent_data(user_id: int): """Data read at the start remains consistent throughout""" with DBOS.sql_session() as session: # Reads are repeatable within this transaction pass
Only prevents dirty reads. Best performance but least isolation.
@DBOS.transaction(isolation_level="READ COMMITTED")def read_latest_data(resource_id: int): """See committed changes from other transactions""" with DBOS.sql_session() as session: # Faster but less isolated pass
System database: Stores workflow execution state (checkpoints, events, queues)
Application database: Your app’s data (users, orders, etc.)
# By default, transactions use the application database@DBOS.transaction()def app_transaction(): with DBOS.sql_session() as session: # This accesses your application database pass# The system database is managed by DBOS internally# You typically don't access it directly
You can configure both databases to use the same Postgres instance or separate them for better isolation.
@DBOS.transaction()def execute_raw_sql(user_id: int): with DBOS.sql_session() as session: # Execute raw SQL result = session.execute( sa.text(""" SELECT o.id, o.total, u.name FROM orders o JOIN users u ON o.user_id = u.id WHERE u.id = :user_id ORDER BY o.created_at DESC LIMIT 10 """), {"user_id": user_id} ) return [{ "order_id": row[0], "total": row[1], "user_name": row[2] } for row in result]
Transactions can be async for improved concurrency:
@DBOS.transaction()async def async_database_operation(user_id: int): with DBOS.sql_session() as session: # SQLAlchemy operations (synchronous session) user = session.execute( select(User).where(User.id == user_id) ).scalar_one() return {"id": user.id, "name": user.name}
Even in async transactions, the database session itself is synchronous (SQLAlchemy limitation). The async decorator helps with workflow concurrency, not database operations.
Minimize the time spent holding database locks to improve concurrency.
# ❌ Long transaction@DBOS.transaction()def bad_transaction(): with DBOS.sql_session() as session: data = session.execute(select(Data)).scalars().all() # Long computation while holding lock processed = expensive_computation(data) update_results(session, processed)# ✅ Short transaction@DBOS.workflow()def good_workflow(): data = load_data() # Transaction processed = process_data(data) # Step (no lock) save_results(processed) # Transaction
Use appropriate isolation levels
Choose the weakest isolation level that meets your requirements for better performance.
# Use SERIALIZABLE only when necessary@DBOS.transaction(isolation_level="SERIALIZABLE")def critical_financial_operation(): pass# READ COMMITTED is often sufficient@DBOS.transaction(isolation_level="READ COMMITTED")def read_latest_status(): pass
Make transactions idempotent
Design transactions so they can be safely retried.
# ❌ Not idempotent@DBOS.transaction()def increment_counter(): with DBOS.sql_session() as session: counter = session.execute(select(Counter)).scalar_one() counter.value += 1 # Different result on retry# ✅ Idempotent@DBOS.transaction()def set_counter(value: int): with DBOS.sql_session() as session: counter = session.execute(select(Counter)).scalar_one() counter.value = value # Same result on retry
Handle constraint violations gracefully
Catch and handle database errors appropriately.
from sqlalchemy.exc import IntegrityError@DBOS.transaction()def upsert_user(email: str, name: str): with DBOS.sql_session() as session: try: user = User(email=email, name=name) session.add(user) session.flush() return {"user_id": user.id, "action": "created"} except IntegrityError: session.rollback() user = session.execute( select(User).where(User.email == email) ).scalar_one() user.name = name return {"user_id": user.id, "action": "updated"}