Skip to main content
DBOS Transact provides decorators to define workflows, transactions, steps, and event-driven functions. All decorators are accessed via the DBOS class.

@DBOS.workflow

Mark a function as a DBOS workflow. Workflows are durable, reliable, and automatically recoverable.
@DBOS.workflow(
    name=None,
    max_recovery_attempts=None,
    serialization_type=None,
    validate_args=None
)
def my_workflow(arg1: str, arg2: int) -> str:
    ...

Parameters

name
str
default:"None"
Custom name for the workflow. Defaults to the function’s qualified name.
max_recovery_attempts
int
default:"50"
Maximum number of times to attempt recovery if the workflow fails.
serialization_type
WorkflowSerializationFormat
default:"None"
How to serialize workflow arguments and return values. Options: WorkflowSerializationFormat.JSON or WorkflowSerializationFormat.PICKLE.
validate_args
ValidateArgsCallable
default:"None"
Optional function to validate workflow arguments before execution.

Example

from dbos import DBOS

@DBOS.workflow()
def process_order(order_id: str, customer_id: str) -> str:
    # Workflow logic
    payment_id = charge_customer(customer_id)
    ship_order(order_id)
    return payment_id

@DBOS.transaction()
def charge_customer(customer_id: str) -> str:
    # Database transaction
    result = DBOS.sql_session.execute(
        text("INSERT INTO payments (customer_id) VALUES (:id) RETURNING id"),
        {"id": customer_id}
    )
    return str(result.scalar())

@DBOS.step()
def ship_order(order_id: str) -> None:
    # External API call
    shipping_api.create_shipment(order_id)

Workflow Properties

  • Durable: Workflow state is persisted to the database
  • Reliable: Automatically retries on failure
  • Exactly-once: Each step executes exactly once, even on recovery
  • Observable: Track status and results via workflow handles

@DBOS.transaction

Mark a function as a database transaction. Transactions execute in a serializable database transaction with automatic retry on conflicts.
@DBOS.transaction(
    isolation_level="SERIALIZABLE",
    name=None
)
def my_transaction(arg: str) -> int:
    ...

Parameters

isolation_level
IsolationLevel
default:"SERIALIZABLE"
The transaction isolation level. Options:
  • "SERIALIZABLE" - Strictest isolation
  • "REPEATABLE READ" - Prevents non-repeatable reads
  • "READ COMMITTED" - Prevents dirty reads
name
str
default:"None"
Custom name for the transaction. Defaults to the function’s qualified name.

Example

import sqlalchemy as sa
from dbos import DBOS

@DBOS.transaction()
def create_user(username: str, email: str) -> int:
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO users (username, email) "
            "VALUES (:username, :email) RETURNING id"
        ),
        {"username": username, "email": email}
    )
    user_id = result.scalar()
    return user_id

@DBOS.transaction(isolation_level="READ COMMITTED")
def get_user_count() -> int:
    result = DBOS.sql_session.execute(
        sa.text("SELECT COUNT(*) FROM users")
    )
    return result.scalar()

Transaction Properties

  • ACID: Full ACID guarantees from the database
  • Automatic Retry: Serialization failures are automatically retried
  • Session Access: Access via DBOS.sql_session
  • Checkpointed: Results are recorded for workflow recovery

@DBOS.step

Mark a function as a workflow step. Steps are checkpointed, and their results are cached for workflow recovery.
@DBOS.step(
    name=None,
    retries_allowed=False,
    interval_seconds=1.0,
    max_attempts=3,
    backoff_rate=2.0
)
def my_step(arg: str) -> str:
    ...

Parameters

name
str
default:"None"
Custom name for the step. Defaults to the function’s qualified name.
retries_allowed
bool
default:"False"
If True, automatically retry the step on exceptions.
interval_seconds
float
default:"1.0"
Initial delay between retry attempts (in seconds).
max_attempts
int
default:"3"
Maximum number of retry attempts before raising an exception.
backoff_rate
float
default:"2.0"
Multiplier for exponential backoff between retries.

Example

from dbos import DBOS
import requests

@DBOS.step(retries_allowed=True, max_attempts=5, interval_seconds=2.0)
def call_external_api(user_id: str) -> dict:
    response = requests.post(
        "https://api.example.com/users",
        json={"user_id": user_id}
    )
    response.raise_for_status()
    return response.json()

@DBOS.workflow()
def process_user(user_id: str):
    # This step will retry up to 5 times on failure
    api_result = call_external_api(user_id)
    return api_result

Step Properties

  • Checkpointed: Results are persisted for recovery
  • Idempotent Recovery: On recovery, completed steps return cached results
  • Retry Support: Optional automatic retries with exponential backoff
  • Observable: Access retry status via DBOS.step_status

@DBOS.scheduled

Schedule a workflow to run on a cron schedule.
@DBOS.scheduled(cron)
def my_scheduled_workflow(scheduled_time: datetime, context: Any) -> None:
    ...

Parameters

cron
str
required
A cron expression defining the schedule. Supports 6 fields (second minute hour day month weekday).

Example

from datetime import datetime
from dbos import DBOS

@DBOS.scheduled("0 0 * * * *")  # Every hour at minute 0
@DBOS.workflow()
def hourly_cleanup(scheduled_time: datetime, context: Any) -> None:
    DBOS.logger.info(f"Running cleanup at {scheduled_time}")
    cleanup_old_records()

@DBOS.transaction()
def cleanup_old_records() -> int:
    result = DBOS.sql_session.execute(
        text("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '7 days'")
    )
    return result.rowcount

# Run every weekday at 9:00 AM
@DBOS.scheduled("0 0 9 * * 1-5")
@DBOS.workflow()
def weekday_report(scheduled_time: datetime, context: Any) -> None:
    generate_daily_report()

Scheduled Workflow Properties

  • Reliable: Missed executions are caught up on restart
  • Deterministic: Workflow IDs are deterministic based on schedule time
  • Idempotent: Duplicate executions are prevented
  • Configurable: Managed via DBOS.create_schedule() or decorator

@DBOS.kafka_consumer

Consume messages from Kafka topics and process them as workflows.
@DBOS.kafka_consumer(
    config,
    topics,
    in_order=False
)
def my_kafka_handler(message: KafkaMessage) -> None:
    ...

Parameters

config
dict[str, Any]
required
Kafka consumer configuration (compatible with kafka-python).
topics
list[str]
required
List of Kafka topics to subscribe to.
in_order
bool
default:"False"
If True, process messages from each partition sequentially.

Example

from dbos import DBOS, KafkaMessage

kafka_config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "my-consumer-group",
}

@DBOS.kafka_consumer(kafka_config, ["orders"])
@DBOS.workflow()
def process_order_event(message: KafkaMessage) -> None:
    order_data = message.value
    DBOS.logger.info(f"Processing order: {order_data}")
    
    # Process the order
    save_order(order_data)
    send_confirmation(order_data["customer_email"])

@DBOS.transaction()
def save_order(order_data: dict) -> None:
    DBOS.sql_session.execute(
        text("INSERT INTO orders (data) VALUES (:data)"),
        {"data": order_data}
    )

Kafka Consumer Properties

  • Reliable: Messages are processed exactly once
  • Ordered: Optional in-order processing per partition
  • Durable: Processing state survives crashes
  • Scalable: Multiple consumers can process in parallel

KafkaMessage

The KafkaMessage dataclass represents a message consumed from Kafka.
from dataclasses import dataclass
from typing import Optional, Union

@dataclass
class KafkaMessage:
    headers: Optional[list[tuple[str, Union[str, bytes]]]]
    key: Optional[Union[str, bytes]]
    latency: Optional[float]
    leader_epoch: Optional[int]
    offset: Optional[int]
    partition: Optional[int]
    timestamp: tuple[int, int]
    topic: Optional[str]
    value: Optional[Union[str, bytes]]
Fields:
headers
Optional[list[tuple[str, Union[str, bytes]]]]
Message headers as key-value pairs
key
Optional[Union[str, bytes]]
Message key
latency
Optional[float]
Consumer latency in seconds
leader_epoch
Optional[int]
Leader epoch of the partition
offset
Optional[int]
Message offset in the partition
partition
Optional[int]
Partition number
timestamp
tuple[int, int]
Timestamp tuple (timestamp_type, timestamp_ms)
topic
Optional[str]
Topic name
value
Optional[Union[str, bytes]]
Message payload
Example:
from dbos import DBOS, KafkaMessage
import json

@DBOS.kafka_consumer(kafka_config, ["events"])
@DBOS.workflow()
def process_event(message: KafkaMessage) -> None:
    # Access message metadata
    DBOS.logger.info(f"Topic: {message.topic}")
    DBOS.logger.info(f"Partition: {message.partition}")
    DBOS.logger.info(f"Offset: {message.offset}")
    
    # Parse message value
    if isinstance(message.value, bytes):
        data = json.loads(message.value.decode('utf-8'))
    else:
        data = json.loads(message.value)
    
    # Process the data
    process_data(data)

@DBOS.dbos_class

Register a class that contains DBOS member functions.
@DBOS.dbos_class(class_name=None)
class MyService:
    ...

Parameters

class_name
str
default:"None"
Custom name for the class. Defaults to the class’s __qualname__.

Example

from dbos import DBOS

@DBOS.dbos_class()
class OrderService:
    @DBOS.workflow()
    def process_order(self, order_id: str) -> str:
        payment_id = self.charge_payment(order_id)
        self.ship_order(order_id)
        return payment_id
    
    @DBOS.transaction()
    def charge_payment(self, order_id: str) -> str:
        result = DBOS.sql_session.execute(
            text("INSERT INTO payments (order_id) VALUES (:id) RETURNING id"),
            {"id": order_id}
        )
        return str(result.scalar())
    
    @DBOS.step()
    def ship_order(self, order_id: str) -> None:
        # Call shipping API
        pass

# Use the service
service = OrderService()
service.process_order("order-123")

Class Decorator Properties

  • Required: All classes with DBOS methods must be decorated
  • Unique Names: Class names must be globally unique
  • Recovery Support: Enables proper recovery of class methods

@DBOS.required_roles

Require specific roles for a function to execute.
@DBOS.required_roles(roles)
def my_function(arg: str) -> str:
    ...

Parameters

roles
List[str]
required
List of required role names. User must have at least one of these roles.

Example

from dbos import DBOS

@DBOS.dbos_class()
@DBOS.default_required_roles(["user"])
class UserService:
    @DBOS.workflow()
    def view_profile(self, user_id: str) -> dict:
        # All users can view profiles
        return get_user_data(user_id)
    
    @DBOS.required_roles(["admin"])
    @DBOS.workflow()
    def delete_user(self, user_id: str) -> None:
        # Only admins can delete users
        remove_user(user_id)

Supporting Classes and Functions

DBOSConfiguredInstance

Base class for classes containing DBOS member functions that access instance state. When a class contains DBOS functions that access instance state, the DBOS workflow executor needs a name for the instance. This name is recorded in the database and used to refer to the proper instance upon recovery.
from dbos import DBOSConfiguredInstance

class DBOSConfiguredInstance:
    def __init__(self, config_name: str) -> None:
        self.config_name = config_name
        DBOS.register_instance(self)
Attributes:
config_name
str
Instance name used for recovery
Example:
from dbos import DBOS, DBOSConfiguredInstance

@DBOS.dbos_class()
class OrderProcessor(DBOSConfiguredInstance):
    def __init__(self, config_name: str, api_key: str):
        super().__init__(config_name)
        self.api_key = api_key
    
    @DBOS.workflow()
    def process_order(self, order_id: str) -> str:
        # Uses instance state (api_key)
        result = self.call_external_api(order_id)
        return result
    
    @DBOS.step()
    def call_external_api(self, order_id: str) -> str:
        # API call using self.api_key
        return f"Processed {order_id} with key {self.api_key}"

# Create configured instances
prod_processor = OrderProcessor(\"prod\", \"prod-api-key-123\")
test_processor = OrderProcessor(\"test\", \"test-api-key-456\")

# When recovered, DBOS will use the config_name to find the right instance
prod_processor.process_order(\"order-789\")

pydantic_args_validator

Ready-made argument validator that uses Pydantic to coerce and validate deserialized workflow arguments against the function’s type hints.
from dbos import DBOS
from dbos._validation import pydantic_args_validator

@DBOS.workflow(validate_args=pydantic_args_validator)
def my_workflow(name: str, count: int) -> str:
    return f\"{name}: {count}\"
Requirements:
  • Requires Pydantic >= 2.0
  • Install with: pip install pydantic or pip install dbos[validation]
Example:
from dbos import DBOS
from dbos._validation import pydantic_args_validator
from pydantic import BaseModel, EmailStr

class UserInput(BaseModel):
    email: EmailStr
    age: int
    name: str

@DBOS.workflow(validate_args=pydantic_args_validator)
def create_user(user: UserInput) -> str:
    # user is validated and coerced by Pydantic
    return f\"Created user {user.name} ({user.email})\"

# Valid input
handle = DBOS.start_workflow(
    create_user,
    {\"email\": \"[email protected]\", \"age\": \"25\", \"name\": \"Alice\"}
)
# age=\"25\" is coerced to int(25)

# Invalid input raises validation error
try:
    DBOS.start_workflow(
        create_user,
        {\"email\": \"invalid-email\", \"age\": \"not-a-number\", \"name\": \"Bob\"}
    )
except ValueError as e:
    print(f\"Validation failed: {e}\")

make_pydantic_args_validator

Create a custom argument validator for a specific function.
from dbos._validation import make_pydantic_args_validator
from typing import Tuple, Dict, Any

def make_pydantic_args_validator(
    func: Callable[..., Any]
) -> Callable[[Tuple[Any, ...], Dict[str, Any]], Tuple[Tuple[Any, ...], Dict[str, Any]]]:
    ...
Parameters:
func
Callable[..., Any]
required
The function to create a validator for
Returns:
ValidateArgsCallable
Callable
A callable (args, kwargs) -> (args, kwargs) suitable for the validate_args parameter
Example:
from dbos import DBOS
from dbos._validation import make_pydantic_args_validator

def my_workflow(name: str, count: int) -> str:
    return f\"{name}: {count}\"

# Create a custom validator
validator = make_pydantic_args_validator(my_workflow)

# Use it with the decorator
@DBOS.workflow(validate_args=validator)
def validated_workflow(name: str, count: int) -> str:
    return f\"{name}: {count}\"

Build docs developers (and LLMs) love