Skip to main content
DBOS Transact defines a hierarchy of exception classes to signal different error conditions during workflow execution. Understanding these exceptions helps you build robust error handling in your workflows.

Exception Hierarchy

DBOS exceptions fall into two categories:
  • DBOSException - Catchable exceptions that inherit from Exception. These can be caught with standard try/except blocks.
  • DBOSBaseException - Uncatchable exceptions that inherit from BaseException. These signal critical internal errors and should not be caught by user code.

Error Codes

DBOSErrorCode

Enumeration of error codes used by DBOS exceptions.
from dbos._error import DBOSErrorCode

class DBOSErrorCode(Enum):
    ConflictingIDError = 1
    RecoveryError = 2
    InitializationError = 3
    WorkflowFunctionNotFound = 4
    NonExistentWorkflowError = 5
    MaxRecoveryAttemptsExceeded = 6
    MaxStepRetriesExceeded = 7
    NotAuthorized = 8
    ConflictingWorkflowError = 9
    WorkflowCancelled = 10
    UnexpectedStep = 11
    QueueDeduplicated = 12
    AwaitedWorkflowCancelled = 13
    AwaitedWorkflowMaxRecoveryAttemptsExceeded = 14
    ConflictingRegistrationError = 25

Base Exception Classes

DBOSException

Base class for all catchable DBOS exceptions.
from dbos._error import DBOSException

class DBOSException(Exception):
    def __init__(self, message: str, dbos_error_code: Optional[int] = None):
        ...
Attributes:
message
str
The error message string
dbos_error_code
DBOSErrorCode
The error code from the DBOSErrorCode enum
status_code
Optional[int]
HTTP status code, if applicable (e.g., 403 for authorization errors)
Example:
try:
    DBOS.start_workflow(my_workflow, workflow_id="duplicate-id")
except DBOSException as e:
    print(f"Error code: {e.dbos_error_code}")
    print(f"Message: {e.message}")

DBOSBaseException

Base class for uncatchable DBOS exceptions. These inherit from BaseException instead of Exception, so they cannot be caught except by code specifically trying to catch them.
from dbos._error import DBOSBaseException

class DBOSBaseException(BaseException):
    def __init__(self, message: str, dbos_error_code: Optional[int] = None):
        ...
Attributes:
message
str
The error message string
dbos_error_code
DBOSErrorCode
The error code from the DBOSErrorCode enum
Do not catch DBOSBaseException in your application code. These exceptions signal critical internal errors that should terminate workflow execution.

Workflow Exceptions (Catchable)

DBOSConflictingWorkflowError

Raised when different workflows are started with the same workflow ID.
from dbos._error import DBOSConflictingWorkflowError

DBOSConflictingWorkflowError(workflow_id: str, message: Optional[str] = None)
When raised:
  • Attempting to start a new workflow with an ID that already exists but has different function name or arguments
Example:
from dbos import DBOS
from dbos._error import DBOSConflictingWorkflowError

@DBOS.workflow()
def process_order(order_id: str) -> str:
    return f"Processed {order_id}"

try:
    # Start workflow with specific ID
    DBOS.start_workflow(process_order, "order-123", workflow_id="wf-1")
    
    # Try to start different workflow with same ID
    DBOS.start_workflow(other_workflow, workflow_id="wf-1")
except DBOSConflictingWorkflowError as e:
    print(f"Workflow ID conflict: {e.message}")

DBOSRecoveryError

Raised when a workflow recovery fails.
from dbos._error import DBOSRecoveryError

DBOSRecoveryError(workflow_id: str, message: Optional[str] = None)
When raised:
  • Errors during workflow recovery process
  • Database inconsistencies preventing recovery

DBOSInitializationError

Raised when DBOS initialization does not complete successfully.
from dbos._error import DBOSInitializationError

DBOSInitializationError(message: str)
When raised:
  • Database connection failures during startup
  • Invalid configuration
  • System database schema issues
Example:
from dbos import DBOS, DBOSConfig
from dbos._error import DBOSInitializationError

try:
    config = DBOSConfig(
        database_url="postgresql://invalid:url@localhost/db"
    )
    dbos = DBOS(config=config)
    dbos.launch()
except DBOSInitializationError as e:
    print(f"Failed to initialize DBOS: {e.message}")

DBOSWorkflowFunctionNotFoundError

Raised when the database refers to a workflow function that is not registered in the codebase.
from dbos._error import DBOSWorkflowFunctionNotFoundError

DBOSWorkflowFunctionNotFoundError(workflow_id: str, message: Optional[str] = None)
When raised:
  • Attempting to recover a workflow whose function has been removed or renamed
  • Missing @DBOS.workflow() decorator on a function

DBOSNonExistentWorkflowError

Raised when a workflow database record does not exist for a given ID.
from dbos._error import DBOSNonExistentWorkflowError

DBOSNonExistentWorkflowError(destination: str, destination_id: str)
When raised:
  • Calling retrieve_workflow() with an invalid workflow ID
  • Sending messages to non-existent workflows
Example:
from dbos import DBOS
from dbos._error import DBOSNonExistentWorkflowError

try:
    handle = DBOS.retrieve_workflow("non-existent-id")
except DBOSNonExistentWorkflowError as e:
    print(f"Workflow not found: {e.message}")

MaxRecoveryAttemptsExceededError

Raised when a workflow exceeds its maximum recovery attempts.
from dbos._error import MaxRecoveryAttemptsExceededError

MaxRecoveryAttemptsExceededError(wf_id: str, max_retries: int)
When raised:
  • A workflow has been attempted more times than max_recovery_attempts
  • Further execution or recovery attempts will fail
Example:
from dbos import DBOS
from dbos._error import MaxRecoveryAttemptsExceededError

@DBOS.workflow(max_recovery_attempts=3)
def unreliable_workflow() -> str:
    # Workflow that keeps failing
    raise Exception("Always fails")

try:
    handle = DBOS.start_workflow(unreliable_workflow)
    result = handle.get_result()
except MaxRecoveryAttemptsExceededError as e:
    print(f"Workflow exhausted retries: {e.message}")

DBOSNotAuthorizedError

Raised by DBOS role-based security when the user is not authorized to access a function.
from dbos._error import DBOSNotAuthorizedError

DBOSNotAuthorizedError(msg: str)
Attributes:
status_code
int
Always set to 403 (HTTP Forbidden)
When raised:
  • User lacks required roles for a @DBOS.required_roles() decorated function
Example:
from dbos import DBOS
from dbos._error import DBOSNotAuthorizedError

@DBOS.required_roles(["admin"])
@DBOS.workflow()
def delete_all_data() -> None:
    # Admin-only operation
    pass

try:
    # User without admin role tries to call it
    DBOS.start_workflow(delete_all_data)
except DBOSNotAuthorizedError as e:
    print(f"Access denied: {e.message}")
    print(f"HTTP status: {e.status_code}")  # 403

DBOSMaxStepRetriesExceeded

Raised when a step was retried the maximum number of times without success.
from dbos._error import DBOSMaxStepRetriesExceeded

DBOSMaxStepRetriesExceeded(
    step_name: str,
    max_retries: int,
    errors: list[Exception]
)
Attributes:
step_name
str
Name of the step that failed
max_retries
int
Maximum number of retry attempts configured
errors
list[Exception]
List of all exceptions encountered during retry attempts
When raised:
  • A @DBOS.step() with retries_allowed=True exceeds max_attempts
Example:
from dbos import DBOS
from dbos._error import DBOSMaxStepRetriesExceeded

@DBOS.step(retries_allowed=True, max_attempts=3)
def flaky_api_call() -> str:
    # API that keeps failing
    raise Exception("API timeout")

@DBOS.workflow()
def process_with_retry():
    try:
        result = flaky_api_call()
    except DBOSMaxStepRetriesExceeded as e:
        print(f"Step {e.step_name} failed after {e.max_retries} attempts")
        print(f"Errors: {e.errors}")
        # Handle exhausted retries

DBOSConflictingRegistrationError

Raised when conflicting decorators are applied to the same function.
from dbos._error import DBOSConflictingRegistrationError

DBOSConflictingRegistrationError(name: str)
When raised:
  • Applying multiple DBOS decorators (e.g., both @DBOS.workflow() and @DBOS.transaction()) to the same function
  • Registering the same function name with different types
Example:
from dbos import DBOS
from dbos._error import DBOSConflictingRegistrationError

# This will raise an error:
try:
    @DBOS.workflow()
    @DBOS.transaction()  # Can't be both!
    def conflicting_function():
        pass
except DBOSConflictingRegistrationError as e:
    print(f"Registration conflict: {e.message}")

DBOSUnexpectedStepError

Raised when a step has an unexpected recorded name, indicating a non-deterministic workflow.
from dbos._error import DBOSUnexpectedStepError

DBOSUnexpectedStepError(
    workflow_id: str,
    step_id: int,
    expected_name: str,
    recorded_name: str
)
When raised:
  • During recovery, a different step function is executed than what was recorded
  • Workflow code is non-deterministic (e.g., conditional step execution based on random values)
Example:
import random
from dbos import DBOS

@DBOS.workflow()
def non_deterministic_workflow():
    # BAD: Non-deterministic code in workflow
    if random.random() > 0.5:
        step_a()  # Sometimes called
    else:
        step_b()  # Other times called
    # On recovery, this may raise DBOSUnexpectedStepError
Workflows must be deterministic. Always execute steps in the same order with the same functions.

DBOSQueueDeduplicatedError

Raised when a workflow is deduplicated in the queue.
from dbos._error import DBOSQueueDeduplicatedError

DBOSQueueDeduplicatedError(
    workflow_id: str,
    queue_name: str,
    deduplication_id: str
)
Attributes:
workflow_id
str
ID of the deduplicated workflow
queue_name
str
Name of the queue
deduplication_id
str
The deduplication ID that matched an existing workflow
When raised:
  • Enqueueing a workflow with a deduplication_id that already exists in the queue
Example:
from dbos import DBOS, Queue
from dbos._error import DBOSQueueDeduplicatedError

queue = Queue("tasks")

@DBOS.workflow()
def process_task(task_id: str) -> str:
    return f"Processed {task_id}"

try:
    # Enqueue with deduplication ID
    queue.enqueue(process_task, "task-1", deduplication_id="dedup-1")
    
    # Try to enqueue again with same deduplication ID
    queue.enqueue(process_task, "task-1", deduplication_id="dedup-1")
except DBOSQueueDeduplicatedError as e:
    print(f"Workflow deduplicated in queue {e.queue_name}")

DBOSAwaitedWorkflowCancelledError

Raised when an awaited workflow was cancelled.
from dbos._error import DBOSAwaitedWorkflowCancelledError

DBOSAwaitedWorkflowCancelledError(workflow_id: str)
When raised:
  • Calling get_result() on a workflow that has been cancelled
Example:
from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError

@DBOS.workflow()
def long_task() -> str:
    DBOS.sleep(3600)
    return "done"

handle = DBOS.start_workflow(long_task)
DBOS.cancel_workflow(handle.workflow_id)

try:
    result = handle.get_result()
except DBOSAwaitedWorkflowCancelledError as e:
    print(f"Workflow {e.workflow_id} was cancelled")

DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded

Raised when an awaited workflow exceeded its maximum recovery attempts.
from dbos._error import DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded

DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded(workflow_id: str)
When raised:
  • Calling get_result() on a workflow that exhausted its recovery attempts

Critical Exceptions (Uncatchable)

DBOSWorkflowCancelledError

BaseException raised when a workflow has been cancelled. Inherits from BaseException so it cannot be caught by normal exception handling.
from dbos._error import DBOSWorkflowCancelledError

DBOSWorkflowCancelledError(msg: str)
When raised:
  • During workflow execution when cancellation is detected
Do not catch this exception. It signals that the workflow should terminate immediately.

DBOSWorkflowConflictIDError

BaseException raised when a workflow database record already exists.
from dbos._error import DBOSWorkflowConflictIDError

DBOSWorkflowConflictIDError(workflow_id: str)
When raised:
  • Internal conflict during workflow record creation
Do not catch this exception. It signals a critical internal error.

Best Practices

Error Handling in Workflows

from dbos import DBOS
from dbos._error import (
    DBOSMaxStepRetriesExceeded,
    DBOSNotAuthorizedError,
    MaxRecoveryAttemptsExceededError
)

@DBOS.workflow()
def robust_workflow(order_id: str) -> str:
    try:
        # Step with retries
        payment = charge_payment(order_id)
        
        # Another step
        shipment = create_shipment(order_id)
        
        return f"Success: {payment}, {shipment}"
        
    except DBOSMaxStepRetriesExceeded as e:
        # Handle step retry exhaustion
        DBOS.logger.error(f"Step failed after retries: {e.step_name}")
        # Implement fallback logic
        return "failed"
        
    except DBOSNotAuthorizedError:
        # Don't retry authorization errors
        DBOS.logger.error("User not authorized")
        raise

@DBOS.step(retries_allowed=True, max_attempts=5)
def charge_payment(order_id: str) -> str:
    # Payment logic
    return "payment-123"

@DBOS.step()
def create_shipment(order_id: str) -> str:
    # Shipment logic
    return "shipment-456"

Checking Workflow Status

from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError

@DBOS.workflow()
def parent_workflow() -> str:
    # Start child workflow
    child_handle = DBOS.start_workflow(child_workflow, "arg")
    
    try:
        # Wait for child to complete
        result = child_handle.get_result()
        return f"Child succeeded: {result}"
        
    except DBOSAwaitedWorkflowCancelledError:
        # Child was cancelled
        DBOS.logger.warning("Child workflow was cancelled")
        return "cancelled"

Build docs developers (and LLMs) love