DBOS Transact provides decorators to define workflows, transactions, steps, and event-driven functions. All decorators are accessed via the DBOS class.
@DBOS.workflow
Mark a function as a DBOS workflow. Workflows are durable, reliable, and automatically recoverable.
@DBOS.workflow(
name=None,
max_recovery_attempts=None,
serialization_type=None,
validate_args=None
)
def my_workflow(arg1: str, arg2: int) -> str:
...
Parameters
Custom name for the workflow. Defaults to the function’s qualified name.
Maximum number of times to attempt recovery if the workflow fails.
serialization_type
WorkflowSerializationFormat
default:"None"
How to serialize workflow arguments and return values. Options: WorkflowSerializationFormat.JSON or WorkflowSerializationFormat.PICKLE.
validate_args
ValidateArgsCallable
default:"None"
Optional function to validate workflow arguments before execution.
Example
from dbos import DBOS
@DBOS.workflow()
def process_order(order_id: str, customer_id: str) -> str:
# Workflow logic
payment_id = charge_customer(customer_id)
ship_order(order_id)
return payment_id
@DBOS.transaction()
def charge_customer(customer_id: str) -> str:
# Database transaction
result = DBOS.sql_session.execute(
text("INSERT INTO payments (customer_id) VALUES (:id) RETURNING id"),
{"id": customer_id}
)
return str(result.scalar())
@DBOS.step()
def ship_order(order_id: str) -> None:
# External API call
shipping_api.create_shipment(order_id)
Workflow Properties
- Durable: Workflow state is persisted to the database
- Reliable: Automatically retries on failure
- Exactly-once: Each step executes exactly once, even on recovery
- Observable: Track status and results via workflow handles
@DBOS.transaction
Mark a function as a database transaction. Transactions execute in a serializable database transaction with automatic retry on conflicts.
@DBOS.transaction(
isolation_level="SERIALIZABLE",
name=None
)
def my_transaction(arg: str) -> int:
...
Parameters
isolation_level
IsolationLevel
default:"SERIALIZABLE"
The transaction isolation level. Options:
"SERIALIZABLE" - Strictest isolation
"REPEATABLE READ" - Prevents non-repeatable reads
"READ COMMITTED" - Prevents dirty reads
Custom name for the transaction. Defaults to the function’s qualified name.
Example
import sqlalchemy as sa
from dbos import DBOS
@DBOS.transaction()
def create_user(username: str, email: str) -> int:
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO users (username, email) "
"VALUES (:username, :email) RETURNING id"
),
{"username": username, "email": email}
)
user_id = result.scalar()
return user_id
@DBOS.transaction(isolation_level="READ COMMITTED")
def get_user_count() -> int:
result = DBOS.sql_session.execute(
sa.text("SELECT COUNT(*) FROM users")
)
return result.scalar()
Transaction Properties
- ACID: Full ACID guarantees from the database
- Automatic Retry: Serialization failures are automatically retried
- Session Access: Access via
DBOS.sql_session
- Checkpointed: Results are recorded for workflow recovery
@DBOS.step
Mark a function as a workflow step. Steps are checkpointed, and their results are cached for workflow recovery.
@DBOS.step(
name=None,
retries_allowed=False,
interval_seconds=1.0,
max_attempts=3,
backoff_rate=2.0
)
def my_step(arg: str) -> str:
...
Parameters
Custom name for the step. Defaults to the function’s qualified name.
If True, automatically retry the step on exceptions.
Initial delay between retry attempts (in seconds).
Maximum number of retry attempts before raising an exception.
Multiplier for exponential backoff between retries.
Example
from dbos import DBOS
import requests
@DBOS.step(retries_allowed=True, max_attempts=5, interval_seconds=2.0)
def call_external_api(user_id: str) -> dict:
response = requests.post(
"https://api.example.com/users",
json={"user_id": user_id}
)
response.raise_for_status()
return response.json()
@DBOS.workflow()
def process_user(user_id: str):
# This step will retry up to 5 times on failure
api_result = call_external_api(user_id)
return api_result
Step Properties
- Checkpointed: Results are persisted for recovery
- Idempotent Recovery: On recovery, completed steps return cached results
- Retry Support: Optional automatic retries with exponential backoff
- Observable: Access retry status via
DBOS.step_status
@DBOS.scheduled
Schedule a workflow to run on a cron schedule.
@DBOS.scheduled(cron)
def my_scheduled_workflow(scheduled_time: datetime, context: Any) -> None:
...
Parameters
A cron expression defining the schedule. Supports 6 fields (second minute hour day month weekday).
Example
from datetime import datetime
from dbos import DBOS
@DBOS.scheduled("0 0 * * * *") # Every hour at minute 0
@DBOS.workflow()
def hourly_cleanup(scheduled_time: datetime, context: Any) -> None:
DBOS.logger.info(f"Running cleanup at {scheduled_time}")
cleanup_old_records()
@DBOS.transaction()
def cleanup_old_records() -> int:
result = DBOS.sql_session.execute(
text("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '7 days'")
)
return result.rowcount
# Run every weekday at 9:00 AM
@DBOS.scheduled("0 0 9 * * 1-5")
@DBOS.workflow()
def weekday_report(scheduled_time: datetime, context: Any) -> None:
generate_daily_report()
Scheduled Workflow Properties
- Reliable: Missed executions are caught up on restart
- Deterministic: Workflow IDs are deterministic based on schedule time
- Idempotent: Duplicate executions are prevented
- Configurable: Managed via
DBOS.create_schedule() or decorator
@DBOS.kafka_consumer
Consume messages from Kafka topics and process them as workflows.
@DBOS.kafka_consumer(
config,
topics,
in_order=False
)
def my_kafka_handler(message: KafkaMessage) -> None:
...
Parameters
Kafka consumer configuration (compatible with kafka-python).
List of Kafka topics to subscribe to.
If True, process messages from each partition sequentially.
Example
from dbos import DBOS, KafkaMessage
kafka_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
}
@DBOS.kafka_consumer(kafka_config, ["orders"])
@DBOS.workflow()
def process_order_event(message: KafkaMessage) -> None:
order_data = message.value
DBOS.logger.info(f"Processing order: {order_data}")
# Process the order
save_order(order_data)
send_confirmation(order_data["customer_email"])
@DBOS.transaction()
def save_order(order_data: dict) -> None:
DBOS.sql_session.execute(
text("INSERT INTO orders (data) VALUES (:data)"),
{"data": order_data}
)
Kafka Consumer Properties
- Reliable: Messages are processed exactly once
- Ordered: Optional in-order processing per partition
- Durable: Processing state survives crashes
- Scalable: Multiple consumers can process in parallel
KafkaMessage
The KafkaMessage dataclass represents a message consumed from Kafka.
from dataclasses import dataclass
from typing import Optional, Union
@dataclass
class KafkaMessage:
headers: Optional[list[tuple[str, Union[str, bytes]]]]
key: Optional[Union[str, bytes]]
latency: Optional[float]
leader_epoch: Optional[int]
offset: Optional[int]
partition: Optional[int]
timestamp: tuple[int, int]
topic: Optional[str]
value: Optional[Union[str, bytes]]
Fields:
Message headers as key-value pairs
key
Optional[Union[str, bytes]]
Message key
Consumer latency in seconds
Leader epoch of the partition
Message offset in the partition
Timestamp tuple (timestamp_type, timestamp_ms)
value
Optional[Union[str, bytes]]
Message payload
Example:
from dbos import DBOS, KafkaMessage
import json
@DBOS.kafka_consumer(kafka_config, ["events"])
@DBOS.workflow()
def process_event(message: KafkaMessage) -> None:
# Access message metadata
DBOS.logger.info(f"Topic: {message.topic}")
DBOS.logger.info(f"Partition: {message.partition}")
DBOS.logger.info(f"Offset: {message.offset}")
# Parse message value
if isinstance(message.value, bytes):
data = json.loads(message.value.decode('utf-8'))
else:
data = json.loads(message.value)
# Process the data
process_data(data)
@DBOS.dbos_class
Register a class that contains DBOS member functions.
@DBOS.dbos_class(class_name=None)
class MyService:
...
Parameters
Custom name for the class. Defaults to the class’s __qualname__.
Example
from dbos import DBOS
@DBOS.dbos_class()
class OrderService:
@DBOS.workflow()
def process_order(self, order_id: str) -> str:
payment_id = self.charge_payment(order_id)
self.ship_order(order_id)
return payment_id
@DBOS.transaction()
def charge_payment(self, order_id: str) -> str:
result = DBOS.sql_session.execute(
text("INSERT INTO payments (order_id) VALUES (:id) RETURNING id"),
{"id": order_id}
)
return str(result.scalar())
@DBOS.step()
def ship_order(self, order_id: str) -> None:
# Call shipping API
pass
# Use the service
service = OrderService()
service.process_order("order-123")
Class Decorator Properties
- Required: All classes with DBOS methods must be decorated
- Unique Names: Class names must be globally unique
- Recovery Support: Enables proper recovery of class methods
@DBOS.required_roles
Require specific roles for a function to execute.
@DBOS.required_roles(roles)
def my_function(arg: str) -> str:
...
Parameters
List of required role names. User must have at least one of these roles.
Example
from dbos import DBOS
@DBOS.dbos_class()
@DBOS.default_required_roles(["user"])
class UserService:
@DBOS.workflow()
def view_profile(self, user_id: str) -> dict:
# All users can view profiles
return get_user_data(user_id)
@DBOS.required_roles(["admin"])
@DBOS.workflow()
def delete_user(self, user_id: str) -> None:
# Only admins can delete users
remove_user(user_id)
Supporting Classes and Functions
Base class for classes containing DBOS member functions that access instance state.
When a class contains DBOS functions that access instance state, the DBOS workflow executor needs a name for the instance. This name is recorded in the database and used to refer to the proper instance upon recovery.
from dbos import DBOSConfiguredInstance
class DBOSConfiguredInstance:
def __init__(self, config_name: str) -> None:
self.config_name = config_name
DBOS.register_instance(self)
Attributes:
Instance name used for recovery
Example:
from dbos import DBOS, DBOSConfiguredInstance
@DBOS.dbos_class()
class OrderProcessor(DBOSConfiguredInstance):
def __init__(self, config_name: str, api_key: str):
super().__init__(config_name)
self.api_key = api_key
@DBOS.workflow()
def process_order(self, order_id: str) -> str:
# Uses instance state (api_key)
result = self.call_external_api(order_id)
return result
@DBOS.step()
def call_external_api(self, order_id: str) -> str:
# API call using self.api_key
return f"Processed {order_id} with key {self.api_key}"
# Create configured instances
prod_processor = OrderProcessor(\"prod\", \"prod-api-key-123\")
test_processor = OrderProcessor(\"test\", \"test-api-key-456\")
# When recovered, DBOS will use the config_name to find the right instance
prod_processor.process_order(\"order-789\")
pydantic_args_validator
Ready-made argument validator that uses Pydantic to coerce and validate deserialized workflow arguments against the function’s type hints.
from dbos import DBOS
from dbos._validation import pydantic_args_validator
@DBOS.workflow(validate_args=pydantic_args_validator)
def my_workflow(name: str, count: int) -> str:
return f\"{name}: {count}\"
Requirements:
- Requires Pydantic >= 2.0
- Install with:
pip install pydantic or pip install dbos[validation]
Example:
from dbos import DBOS
from dbos._validation import pydantic_args_validator
from pydantic import BaseModel, EmailStr
class UserInput(BaseModel):
email: EmailStr
age: int
name: str
@DBOS.workflow(validate_args=pydantic_args_validator)
def create_user(user: UserInput) -> str:
# user is validated and coerced by Pydantic
return f\"Created user {user.name} ({user.email})\"
# Valid input
handle = DBOS.start_workflow(
create_user,
{\"email\": \"[email protected]\", \"age\": \"25\", \"name\": \"Alice\"}
)
# age=\"25\" is coerced to int(25)
# Invalid input raises validation error
try:
DBOS.start_workflow(
create_user,
{\"email\": \"invalid-email\", \"age\": \"not-a-number\", \"name\": \"Bob\"}
)
except ValueError as e:
print(f\"Validation failed: {e}\")
make_pydantic_args_validator
Create a custom argument validator for a specific function.
from dbos._validation import make_pydantic_args_validator
from typing import Tuple, Dict, Any
def make_pydantic_args_validator(
func: Callable[..., Any]
) -> Callable[[Tuple[Any, ...], Dict[str, Any]], Tuple[Tuple[Any, ...], Dict[str, Any]]]:
...
Parameters:
func
Callable[..., Any]
required
The function to create a validator for
Returns:
A callable (args, kwargs) -> (args, kwargs) suitable for the validate_args parameter
Example:
from dbos import DBOS
from dbos._validation import make_pydantic_args_validator
def my_workflow(name: str, count: int) -> str:
return f\"{name}: {count}\"
# Create a custom validator
validator = make_pydantic_args_validator(my_workflow)
# Use it with the decorator
@DBOS.workflow(validate_args=validator)
def validated_workflow(name: str, count: int) -> str:
return f\"{name}: {count}\"