DBOS Transact defines a hierarchy of exception classes to signal different error conditions during workflow execution. Understanding these exceptions helps you build robust error handling in your workflows.
Exception Hierarchy
DBOS exceptions fall into two categories:
- DBOSException - Catchable exceptions that inherit from
Exception. These can be caught with standard try/except blocks.
- DBOSBaseException - Uncatchable exceptions that inherit from
BaseException. These signal critical internal errors and should not be caught by user code.
Error Codes
DBOSErrorCode
Enumeration of error codes used by DBOS exceptions.
from dbos._error import DBOSErrorCode
class DBOSErrorCode(Enum):
ConflictingIDError = 1
RecoveryError = 2
InitializationError = 3
WorkflowFunctionNotFound = 4
NonExistentWorkflowError = 5
MaxRecoveryAttemptsExceeded = 6
MaxStepRetriesExceeded = 7
NotAuthorized = 8
ConflictingWorkflowError = 9
WorkflowCancelled = 10
UnexpectedStep = 11
QueueDeduplicated = 12
AwaitedWorkflowCancelled = 13
AwaitedWorkflowMaxRecoveryAttemptsExceeded = 14
ConflictingRegistrationError = 25
Base Exception Classes
DBOSException
Base class for all catchable DBOS exceptions.
from dbos._error import DBOSException
class DBOSException(Exception):
def __init__(self, message: str, dbos_error_code: Optional[int] = None):
...
Attributes:
The error code from the DBOSErrorCode enum
HTTP status code, if applicable (e.g., 403 for authorization errors)
Example:
try:
DBOS.start_workflow(my_workflow, workflow_id="duplicate-id")
except DBOSException as e:
print(f"Error code: {e.dbos_error_code}")
print(f"Message: {e.message}")
DBOSBaseException
Base class for uncatchable DBOS exceptions. These inherit from BaseException instead of Exception, so they cannot be caught except by code specifically trying to catch them.
from dbos._error import DBOSBaseException
class DBOSBaseException(BaseException):
def __init__(self, message: str, dbos_error_code: Optional[int] = None):
...
Attributes:
The error code from the DBOSErrorCode enum
Do not catch DBOSBaseException in your application code. These exceptions signal critical internal errors that should terminate workflow execution.
Workflow Exceptions (Catchable)
DBOSConflictingWorkflowError
Raised when different workflows are started with the same workflow ID.
from dbos._error import DBOSConflictingWorkflowError
DBOSConflictingWorkflowError(workflow_id: str, message: Optional[str] = None)
When raised:
- Attempting to start a new workflow with an ID that already exists but has different function name or arguments
Example:
from dbos import DBOS
from dbos._error import DBOSConflictingWorkflowError
@DBOS.workflow()
def process_order(order_id: str) -> str:
return f"Processed {order_id}"
try:
# Start workflow with specific ID
DBOS.start_workflow(process_order, "order-123", workflow_id="wf-1")
# Try to start different workflow with same ID
DBOS.start_workflow(other_workflow, workflow_id="wf-1")
except DBOSConflictingWorkflowError as e:
print(f"Workflow ID conflict: {e.message}")
DBOSRecoveryError
Raised when a workflow recovery fails.
from dbos._error import DBOSRecoveryError
DBOSRecoveryError(workflow_id: str, message: Optional[str] = None)
When raised:
- Errors during workflow recovery process
- Database inconsistencies preventing recovery
DBOSInitializationError
Raised when DBOS initialization does not complete successfully.
from dbos._error import DBOSInitializationError
DBOSInitializationError(message: str)
When raised:
- Database connection failures during startup
- Invalid configuration
- System database schema issues
Example:
from dbos import DBOS, DBOSConfig
from dbos._error import DBOSInitializationError
try:
config = DBOSConfig(
database_url="postgresql://invalid:url@localhost/db"
)
dbos = DBOS(config=config)
dbos.launch()
except DBOSInitializationError as e:
print(f"Failed to initialize DBOS: {e.message}")
DBOSWorkflowFunctionNotFoundError
Raised when the database refers to a workflow function that is not registered in the codebase.
from dbos._error import DBOSWorkflowFunctionNotFoundError
DBOSWorkflowFunctionNotFoundError(workflow_id: str, message: Optional[str] = None)
When raised:
- Attempting to recover a workflow whose function has been removed or renamed
- Missing
@DBOS.workflow() decorator on a function
DBOSNonExistentWorkflowError
Raised when a workflow database record does not exist for a given ID.
from dbos._error import DBOSNonExistentWorkflowError
DBOSNonExistentWorkflowError(destination: str, destination_id: str)
When raised:
- Calling
retrieve_workflow() with an invalid workflow ID
- Sending messages to non-existent workflows
Example:
from dbos import DBOS
from dbos._error import DBOSNonExistentWorkflowError
try:
handle = DBOS.retrieve_workflow("non-existent-id")
except DBOSNonExistentWorkflowError as e:
print(f"Workflow not found: {e.message}")
MaxRecoveryAttemptsExceededError
Raised when a workflow exceeds its maximum recovery attempts.
from dbos._error import MaxRecoveryAttemptsExceededError
MaxRecoveryAttemptsExceededError(wf_id: str, max_retries: int)
When raised:
- A workflow has been attempted more times than
max_recovery_attempts
- Further execution or recovery attempts will fail
Example:
from dbos import DBOS
from dbos._error import MaxRecoveryAttemptsExceededError
@DBOS.workflow(max_recovery_attempts=3)
def unreliable_workflow() -> str:
# Workflow that keeps failing
raise Exception("Always fails")
try:
handle = DBOS.start_workflow(unreliable_workflow)
result = handle.get_result()
except MaxRecoveryAttemptsExceededError as e:
print(f"Workflow exhausted retries: {e.message}")
DBOSNotAuthorizedError
Raised by DBOS role-based security when the user is not authorized to access a function.
from dbos._error import DBOSNotAuthorizedError
DBOSNotAuthorizedError(msg: str)
Attributes:
Always set to 403 (HTTP Forbidden)
When raised:
- User lacks required roles for a
@DBOS.required_roles() decorated function
Example:
from dbos import DBOS
from dbos._error import DBOSNotAuthorizedError
@DBOS.required_roles(["admin"])
@DBOS.workflow()
def delete_all_data() -> None:
# Admin-only operation
pass
try:
# User without admin role tries to call it
DBOS.start_workflow(delete_all_data)
except DBOSNotAuthorizedError as e:
print(f"Access denied: {e.message}")
print(f"HTTP status: {e.status_code}") # 403
DBOSMaxStepRetriesExceeded
Raised when a step was retried the maximum number of times without success.
from dbos._error import DBOSMaxStepRetriesExceeded
DBOSMaxStepRetriesExceeded(
step_name: str,
max_retries: int,
errors: list[Exception]
)
Attributes:
Name of the step that failed
Maximum number of retry attempts configured
List of all exceptions encountered during retry attempts
When raised:
- A
@DBOS.step() with retries_allowed=True exceeds max_attempts
Example:
from dbos import DBOS
from dbos._error import DBOSMaxStepRetriesExceeded
@DBOS.step(retries_allowed=True, max_attempts=3)
def flaky_api_call() -> str:
# API that keeps failing
raise Exception("API timeout")
@DBOS.workflow()
def process_with_retry():
try:
result = flaky_api_call()
except DBOSMaxStepRetriesExceeded as e:
print(f"Step {e.step_name} failed after {e.max_retries} attempts")
print(f"Errors: {e.errors}")
# Handle exhausted retries
DBOSConflictingRegistrationError
Raised when conflicting decorators are applied to the same function.
from dbos._error import DBOSConflictingRegistrationError
DBOSConflictingRegistrationError(name: str)
When raised:
- Applying multiple DBOS decorators (e.g., both
@DBOS.workflow() and @DBOS.transaction()) to the same function
- Registering the same function name with different types
Example:
from dbos import DBOS
from dbos._error import DBOSConflictingRegistrationError
# This will raise an error:
try:
@DBOS.workflow()
@DBOS.transaction() # Can't be both!
def conflicting_function():
pass
except DBOSConflictingRegistrationError as e:
print(f"Registration conflict: {e.message}")
DBOSUnexpectedStepError
Raised when a step has an unexpected recorded name, indicating a non-deterministic workflow.
from dbos._error import DBOSUnexpectedStepError
DBOSUnexpectedStepError(
workflow_id: str,
step_id: int,
expected_name: str,
recorded_name: str
)
When raised:
- During recovery, a different step function is executed than what was recorded
- Workflow code is non-deterministic (e.g., conditional step execution based on random values)
Example:
import random
from dbos import DBOS
@DBOS.workflow()
def non_deterministic_workflow():
# BAD: Non-deterministic code in workflow
if random.random() > 0.5:
step_a() # Sometimes called
else:
step_b() # Other times called
# On recovery, this may raise DBOSUnexpectedStepError
Workflows must be deterministic. Always execute steps in the same order with the same functions.
DBOSQueueDeduplicatedError
Raised when a workflow is deduplicated in the queue.
from dbos._error import DBOSQueueDeduplicatedError
DBOSQueueDeduplicatedError(
workflow_id: str,
queue_name: str,
deduplication_id: str
)
Attributes:
ID of the deduplicated workflow
The deduplication ID that matched an existing workflow
When raised:
- Enqueueing a workflow with a
deduplication_id that already exists in the queue
Example:
from dbos import DBOS, Queue
from dbos._error import DBOSQueueDeduplicatedError
queue = Queue("tasks")
@DBOS.workflow()
def process_task(task_id: str) -> str:
return f"Processed {task_id}"
try:
# Enqueue with deduplication ID
queue.enqueue(process_task, "task-1", deduplication_id="dedup-1")
# Try to enqueue again with same deduplication ID
queue.enqueue(process_task, "task-1", deduplication_id="dedup-1")
except DBOSQueueDeduplicatedError as e:
print(f"Workflow deduplicated in queue {e.queue_name}")
DBOSAwaitedWorkflowCancelledError
Raised when an awaited workflow was cancelled.
from dbos._error import DBOSAwaitedWorkflowCancelledError
DBOSAwaitedWorkflowCancelledError(workflow_id: str)
When raised:
- Calling
get_result() on a workflow that has been cancelled
Example:
from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError
@DBOS.workflow()
def long_task() -> str:
DBOS.sleep(3600)
return "done"
handle = DBOS.start_workflow(long_task)
DBOS.cancel_workflow(handle.workflow_id)
try:
result = handle.get_result()
except DBOSAwaitedWorkflowCancelledError as e:
print(f"Workflow {e.workflow_id} was cancelled")
DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded
Raised when an awaited workflow exceeded its maximum recovery attempts.
from dbos._error import DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded
DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded(workflow_id: str)
When raised:
- Calling
get_result() on a workflow that exhausted its recovery attempts
Critical Exceptions (Uncatchable)
DBOSWorkflowCancelledError
BaseException raised when a workflow has been cancelled. Inherits from BaseException so it cannot be caught by normal exception handling.
from dbos._error import DBOSWorkflowCancelledError
DBOSWorkflowCancelledError(msg: str)
When raised:
- During workflow execution when cancellation is detected
Do not catch this exception. It signals that the workflow should terminate immediately.
DBOSWorkflowConflictIDError
BaseException raised when a workflow database record already exists.
from dbos._error import DBOSWorkflowConflictIDError
DBOSWorkflowConflictIDError(workflow_id: str)
When raised:
- Internal conflict during workflow record creation
Do not catch this exception. It signals a critical internal error.
Best Practices
Error Handling in Workflows
from dbos import DBOS
from dbos._error import (
DBOSMaxStepRetriesExceeded,
DBOSNotAuthorizedError,
MaxRecoveryAttemptsExceededError
)
@DBOS.workflow()
def robust_workflow(order_id: str) -> str:
try:
# Step with retries
payment = charge_payment(order_id)
# Another step
shipment = create_shipment(order_id)
return f"Success: {payment}, {shipment}"
except DBOSMaxStepRetriesExceeded as e:
# Handle step retry exhaustion
DBOS.logger.error(f"Step failed after retries: {e.step_name}")
# Implement fallback logic
return "failed"
except DBOSNotAuthorizedError:
# Don't retry authorization errors
DBOS.logger.error("User not authorized")
raise
@DBOS.step(retries_allowed=True, max_attempts=5)
def charge_payment(order_id: str) -> str:
# Payment logic
return "payment-123"
@DBOS.step()
def create_shipment(order_id: str) -> str:
# Shipment logic
return "shipment-456"
Checking Workflow Status
from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError
@DBOS.workflow()
def parent_workflow() -> str:
# Start child workflow
child_handle = DBOS.start_workflow(child_workflow, "arg")
try:
# Wait for child to complete
result = child_handle.get_result()
return f"Child succeeded: {result}"
except DBOSAwaitedWorkflowCancelledError:
# Child was cancelled
DBOS.logger.warning("Child workflow was cancelled")
return "cancelled"