Skip to main content

Introduction

DBOS provides powerful error handling capabilities including automatic retries, workflow recovery, and configurable retry strategies. Understanding these mechanisms is essential for building resilient applications.

Workflow Recovery

Workflows automatically recover from failures by replaying from their last completed step:
from dbos import DBOS
import sqlalchemy as sa

@DBOS.transaction()
def save_order(order_id: str, total: float) -> int:
    """This transaction commits if successful."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (order_id, total) VALUES (:id, :total) RETURNING id"
        ),
        {"id": order_id, "total": total}
    )
    return result.fetchone()[0]

@DBOS.step()
def charge_payment(amount: float) -> str:
    """This step's result is cached on success."""
    # External payment API call that might fail
    transaction_id = payment_gateway.charge(amount)
    return transaction_id

@DBOS.step()
def send_confirmation(order_id: str) -> None:
    """Send confirmation email."""
    email_service.send(f"Order {order_id} confirmed")

@DBOS.workflow()
def process_order_workflow(order_id: str, total: float) -> dict:
    """Resilient order processing."""
    
    # Step 1: Save order (committed to database)
    db_id = save_order(order_id, total)
    
    # Step 2: Charge payment (cached on success)
    transaction_id = charge_payment(total)
    
    # Step 3: Send confirmation
    send_confirmation(order_id)
    
    return {"db_id": db_id, "transaction_id": transaction_id}

# If the process crashes after step 2:
# - Step 1 is NOT re-executed (transaction already committed)
# - Step 2 is NOT re-executed (result is cached)
# - Step 3 WILL execute on recovery
# - External systems are NOT called multiple times
Workflow recovery is automatic. DBOS tracks which steps have completed and skips them on replay.

Step Retries

Configure automatic retries for steps:
from dbos import DBOS, StepOptions

# Retry up to 5 times with exponential backoff
@DBOS.step(retries=5)
def unreliable_api_call(data: str) -> dict:
    """This step will retry on failure."""
    response = external_api.post("/endpoint", json={"data": data})
    return response.json()

# Custom retry configuration
@DBOS.step(
    retries=10,
    interval_seconds=2.0,  # Initial delay between retries
    backoff_rate=2.0       # Exponential backoff multiplier
)
def highly_unreliable_operation(value: int) -> str:
    """Retries with custom configuration."""
    # Retry intervals: 2s, 4s, 8s, 16s, 32s, ...
    result = flaky_service.process(value)
    return result

@DBOS.workflow()
def resilient_workflow(data: str) -> dict:
    """Workflow with retry logic."""
    
    # This step automatically retries on failure
    result1 = unreliable_api_call(data)
    
    # This step has aggressive retry policy
    result2 = highly_unreliable_operation(42)
    
    return {"result1": result1, "result2": result2}

Accessing Retry Information

@DBOS.step(retries=5)
def step_with_retry_info() -> dict:
    """Access current retry attempt information."""
    
    step_status = DBOS.step_status
    
    if step_status:
        print(f"Attempt {step_status.current_attempt + 1} of {step_status.max_attempts}")
        
        if step_status.current_attempt > 0:
            DBOS.logger.warning(f"Retrying after {step_status.current_attempt} failures")
    
    # Attempt risky operation
    return perform_operation()

Handling Step Retry Exhaustion

Handle when all retries are exhausted:
from dbos.error import DBOSMaxStepRetriesExceeded

@DBOS.step(retries=3)
def may_fail_step() -> str:
    """Step that might fail."""
    return risky_operation()

@DBOS.workflow()
def handle_retry_exhaustion_workflow() -> dict:
    """Handle retry failures gracefully."""
    
    try:
        result = may_fail_step()
        return {"status": "success", "result": result}
    
    except DBOSMaxStepRetriesExceeded as e:
        # All retries exhausted
        DBOS.logger.error(f"Step {e.step_name} failed after {e.max_retries} retries")
        
        # Log all errors
        for i, error in enumerate(e.errors):
            DBOS.logger.error(f"Attempt {i + 1}: {error}")
        
        # Fallback logic
        return {"status": "failed", "reason": "max_retries_exceeded"}

Max Recovery Attempts

Limit how many times a workflow can be recovered:
from dbos import DBOS
from dbos.error import MaxRecoveryAttemptsExceededError

@DBOS.workflow(max_recovery_attempts=5)
def limited_recovery_workflow(data: str) -> str:
    """This workflow can be recovered at most 5 times."""
    
    # Process data
    process_data(data)
    
    # If this workflow fails and is recovered more than 5 times,
    # it will permanently fail
    return "completed"

@DBOS.step()
def process_data(data: str) -> None:
    """Processing logic."""
    # Might fail
    pass

# Workflows that exceed max recovery attempts will fail permanently
# and cannot be recovered or restarted
Once a workflow exceeds max recovery attempts, it cannot be executed again. Use this to prevent infinite retry loops.

Transaction Serialization Errors

Handle database serialization failures automatically:
@DBOS.transaction()
def concurrent_update(account_id: str, amount: float) -> float:
    """This transaction automatically retries on serialization errors."""
    
    # This might encounter serialization conflicts in high concurrency
    result = DBOS.sql_session.execute(
        sa.text(
            "UPDATE accounts SET balance = balance + :amount "
            "WHERE account_id = :id RETURNING balance"
        ),
        {"amount": amount, "id": account_id}
    )
    
    new_balance = result.fetchone()[0]
    return new_balance

# Serialization errors are automatically retried by the database driver
# No special handling needed
PostgreSQL serialization errors are automatically retried. DBOS handles this transparently.

Error Propagation

Errors propagate through workflow hierarchy:
from dbos import DBOS

@DBOS.step()
def failing_step() -> str:
    """This step will fail."""
    raise ValueError("Something went wrong")

@DBOS.workflow()
def child_workflow() -> str:
    """Child workflow that fails."""
    return failing_step()

@DBOS.workflow()
def parent_workflow() -> dict:
    """Parent workflow handles child failure."""
    
    try:
        result = child_workflow()
        return {"status": "success", "result": result}
    
    except ValueError as e:
        DBOS.logger.error(f"Child workflow failed: {e}")
        
        # Implement fallback logic
        fallback_result = handle_failure()
        
        return {"status": "fallback", "result": fallback_result}

@DBOS.step()
def handle_failure() -> str:
    """Fallback logic."""
    return "fallback_value"

Awaiting Child Workflows

Handle child workflow failures:
from dbos import DBOS
from dbos.error import (
    DBOSAwaitedWorkflowCancelledError,
    DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded
)

@DBOS.workflow()
def child_that_may_fail() -> str:
    """Child workflow that might fail."""
    # Risky operation
    return risky_operation()

@DBOS.workflow()
def parent_with_error_handling() -> dict:
    """Handle various child workflow failures."""
    
    # Start child workflow
    handle = DBOS.start_workflow(child_that_may_fail)
    
    try:
        result = handle.get_result()
        return {"status": "success", "result": result}
    
    except DBOSAwaitedWorkflowCancelledError as e:
        # Child workflow was cancelled
        DBOS.logger.warning(f"Child workflow {e.workflow_id} was cancelled")
        return {"status": "cancelled", "workflow_id": e.workflow_id}
    
    except DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded as e:
        # Child exceeded max recovery attempts
        DBOS.logger.error(f"Child workflow {e.workflow_id} exceeded max recovery attempts")
        return {"status": "max_retries", "workflow_id": e.workflow_id}
    
    except Exception as e:
        # Other errors
        DBOS.logger.error(f"Child workflow failed: {e}")
        return {"status": "error", "error": str(e)}

Timeout Handling

Set workflow timeouts:
from dbos import DBOS, SetWorkflowTimeout
import time

@DBOS.workflow()
def long_running_workflow() -> str:
    """Workflow that might take too long."""
    
    # Simulate long-running work
    DBOS.sleep(120)  # 2 minutes
    
    return "completed"

# Set timeout using context manager
try:
    with SetWorkflowTimeout(60):  # 60 second timeout
        result = long_running_workflow()
except Exception as e:
    print(f"Workflow timed out: {e}")

# Or set timeout when starting workflow
handle = DBOS.start_workflow(long_running_workflow)
# Timeout is enforced automatically

Validation and Input Errors

Validate inputs early:
from pydantic import BaseModel, ValidationError, field_validator
from dbos import pydantic_args_validator, DBOS

class OrderInput(BaseModel):
    order_id: str
    customer_email: str
    total: float
    
    @field_validator('total')
    @classmethod
    def validate_total(cls, v: float) -> float:
        if v <= 0:
            raise ValueError('total must be positive')
        return v
    
    @field_validator('customer_email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        if '@' not in v:
            raise ValueError('invalid email address')
        return v

@DBOS.workflow()
@pydantic_args_validator(OrderInput)
def validated_order_workflow(order_id: str, customer_email: str, total: float) -> dict:
    """Workflow with validated inputs."""
    # Inputs are guaranteed to be valid
    return process_order(order_id, customer_email, total)

# Invalid input raises ValidationError before workflow starts
try:
    validated_order_workflow("order-1", "invalid-email", -10.0)
except ValidationError as e:
    print(f"Validation failed: {e}")

Custom Error Types

Define domain-specific errors:
class InsufficientInventoryError(Exception):
    """Raised when inventory is insufficient."""
    def __init__(self, product_id: str, requested: int, available: int):
        self.product_id = product_id
        self.requested = requested
        self.available = available
        super().__init__(
            f"Insufficient inventory for {product_id}: "
            f"requested {requested}, available {available}"
        )

class PaymentDeclinedError(Exception):
    """Raised when payment is declined."""
    def __init__(self, reason: str):
        self.reason = reason
        super().__init__(f"Payment declined: {reason}")

@DBOS.step()
def check_inventory(product_id: str, quantity: int) -> None:
    """Check if inventory is sufficient."""
    available = get_inventory_level(product_id)
    if available < quantity:
        raise InsufficientInventoryError(product_id, quantity, available)

@DBOS.step()
def process_payment(amount: float) -> str:
    """Process payment."""
    result = payment_gateway.charge(amount)
    if not result.approved:
        raise PaymentDeclinedError(result.decline_reason)
    return result.transaction_id

@DBOS.workflow()
def order_workflow(product_id: str, quantity: int, amount: float) -> dict:
    """Handle domain-specific errors."""
    
    try:
        # Check inventory
        check_inventory(product_id, quantity)
        
        # Process payment
        transaction_id = process_payment(amount)
        
        return {"status": "success", "transaction_id": transaction_id}
    
    except InsufficientInventoryError as e:
        DBOS.logger.warning(f"Inventory insufficient: {e}")
        return {
            "status": "inventory_error",
            "product_id": e.product_id,
            "requested": e.requested,
            "available": e.available
        }
    
    except PaymentDeclinedError as e:
        DBOS.logger.warning(f"Payment declined: {e.reason}")
        return {
            "status": "payment_declined",
            "reason": e.reason
        }

Compensating Actions

Implement rollback logic:
@DBOS.step()
def reserve_hotel(hotel_id: str, dates: dict) -> str:
    """Reserve hotel."""
    reservation_id = hotel_api.reserve(hotel_id, dates)
    return reservation_id

@DBOS.step()
def cancel_hotel_reservation(reservation_id: str) -> None:
    """Cancel hotel reservation."""
    hotel_api.cancel(reservation_id)

@DBOS.step()
def book_flight(flight_id: str) -> str:
    """Book flight."""
    booking_id = flight_api.book(flight_id)
    return booking_id

@DBOS.step()
def cancel_flight(booking_id: str) -> None:
    """Cancel flight."""
    flight_api.cancel(booking_id)

@DBOS.workflow()
def travel_booking_workflow(hotel_id: str, flight_id: str, dates: dict) -> dict:
    """Book travel with compensating actions."""
    
    hotel_reservation = None
    flight_booking = None
    
    try:
        # Reserve hotel
        hotel_reservation = reserve_hotel(hotel_id, dates)
        
        # Book flight
        flight_booking = book_flight(flight_id)
        
        return {
            "status": "success",
            "hotel_reservation": hotel_reservation,
            "flight_booking": flight_booking
        }
    
    except Exception as e:
        DBOS.logger.error(f"Booking failed: {e}")
        
        # Compensate by cancelling successful bookings
        if flight_booking:
            try:
                cancel_flight(flight_booking)
                DBOS.logger.info("Flight cancelled")
            except Exception as ce:
                DBOS.logger.error(f"Failed to cancel flight: {ce}")
        
        if hotel_reservation:
            try:
                cancel_hotel_reservation(hotel_reservation)
                DBOS.logger.info("Hotel reservation cancelled")
            except Exception as ce:
                DBOS.logger.error(f"Failed to cancel hotel: {ce}")
        
        return {
            "status": "failed",
            "error": str(e),
            "compensated": True
        }

Circuit Breaker Pattern

Implement circuit breaker for external services:
from datetime import datetime, timedelta
from typing import Optional

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.failure_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = "closed"  # closed, open, half_open
    
    def call(self, func, *args, **kwargs):
        # Check if circuit should reset
        if self.state == "open":
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed > self.timeout_seconds:
                    self.state = "half_open"
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is open")
        
        try:
            result = func(*args, **kwargs)
            
            # Success - reset if in half_open
            if self.state == "half_open":
                self.state = "closed"
                self.failure_count = 0
            
            return result
        
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            
            raise e

# Global circuit breaker instance
external_service_breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)

@DBOS.step(retries=0)  # No retries, circuit breaker handles it
def call_external_service(data: str) -> dict:
    """Call external service with circuit breaker."""
    return external_service_breaker.call(
        lambda: external_api.process(data)
    )

@DBOS.workflow()
def workflow_with_circuit_breaker(data: str) -> dict:
    """Workflow using circuit breaker pattern."""
    
    try:
        result = call_external_service(data)
        return {"status": "success", "result": result}
    
    except Exception as e:
        if "Circuit breaker is open" in str(e):
            # Use fallback when circuit is open
            DBOS.logger.warning("Circuit breaker open, using fallback")
            return {"status": "fallback", "result": get_cached_result()}
        else:
            raise

@DBOS.step()
def get_cached_result() -> dict:
    """Fallback cached result."""
    return {"cached": True}

Complete Example: Resilient Payment Processing

from dbos import DBOS, StepOptions, SetWorkflowID, pydantic_args_validator
from dbos.error import DBOSMaxStepRetriesExceeded
from pydantic import BaseModel, field_validator
import sqlalchemy as sa
from datetime import datetime
from typing import Optional

class PaymentError(Exception):
    """Base payment error."""
    pass

class InsufficientFundsError(PaymentError):
    """Insufficient funds."""
    pass

class FraudDetectedError(PaymentError):
    """Fraud detected."""
    pass

class PaymentRequest(BaseModel):
    payment_id: str
    customer_id: str
    amount: float
    currency: str = "USD"
    
    @field_validator('amount')
    @classmethod
    def validate_amount(cls, v: float) -> float:
        if v <= 0:
            raise ValueError('amount must be positive')
        if v > 100000:
            raise ValueError('amount exceeds maximum')
        return v

@DBOS.transaction()
def create_payment_record(payment_id: str, customer_id: str, amount: float, currency: str) -> int:
    """Create payment record in database."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO payments (payment_id, customer_id, amount, currency, status, created_at) "
            "VALUES (:pid, :cid, :amount, :currency, 'pending', :created) RETURNING id"
        ),
        {
            "pid": payment_id,
            "cid": customer_id,
            "amount": amount,
            "currency": currency,
            "created": datetime.now()
        }
    )
    return result.fetchone()[0]

@DBOS.transaction()
def update_payment_status(payment_id: str, status: str, error_message: Optional[str] = None) -> None:
    """Update payment status."""
    DBOS.sql_session.execute(
        sa.text(
            "UPDATE payments SET status = :status, error_message = :error, updated_at = :updated "
            "WHERE payment_id = :pid"
        ),
        {
            "status": status,
            "error": error_message,
            "updated": datetime.now(),
            "pid": payment_id
        }
    )

@DBOS.step(retries=3, interval_seconds=1.0, backoff_rate=2.0)
def check_fraud(customer_id: str, amount: float) -> dict:
    """Check for fraud with retries."""
    result = fraud_service.check(customer_id, amount)
    
    if result.fraud_detected:
        raise FraudDetectedError(result.reason)
    
    return {"risk_score": result.risk_score, "approved": True}

@DBOS.step(retries=5, interval_seconds=2.0, backoff_rate=1.5)
def process_payment_gateway(payment_id: str, customer_id: str, amount: float, currency: str) -> str:
    """Process payment through gateway with retries."""
    
    try:
        transaction = payment_gateway.charge(
            customer_id=customer_id,
            amount=amount,
            currency=currency,
            idempotency_key=payment_id
        )
        
        if transaction.status == "insufficient_funds":
            raise InsufficientFundsError("Customer has insufficient funds")
        
        if transaction.status != "approved":
            raise PaymentError(f"Payment failed: {transaction.status}")
        
        return transaction.id
    
    except PaymentError:
        raise  # Re-raise payment errors
    except Exception as e:
        # Wrap other errors for retry
        raise PaymentError(f"Gateway error: {e}")

@DBOS.step()
def send_payment_notification(customer_id: str, payment_id: str, status: str, amount: float) -> None:
    """Send payment notification."""
    notification_service.send(
        customer_id,
        f"Payment {payment_id} {status}: ${amount:.2f}"
    )

@DBOS.workflow(max_recovery_attempts=10)
@pydantic_args_validator(PaymentRequest)
def payment_processing_workflow(
    payment_id: str,
    customer_id: str,
    amount: float,
    currency: str = "USD"
) -> dict:
    """Process payment with comprehensive error handling."""
    
    DBOS.logger.info(f"Processing payment {payment_id} for ${amount}")
    
    # Create payment record
    db_id = create_payment_record(payment_id, customer_id, amount, currency)
    update_payment_status(payment_id, "fraud_check")
    
    # Fraud check
    try:
        fraud_result = check_fraud(customer_id, amount)
        DBOS.logger.info(f"Fraud check passed (risk score: {fraud_result['risk_score']})")
    
    except FraudDetectedError as e:
        DBOS.logger.error(f"Fraud detected: {e}")
        update_payment_status(payment_id, "fraud_rejected", str(e))
        send_payment_notification(customer_id, payment_id, "rejected (fraud)", amount)
        return {"status": "fraud_rejected", "payment_id": payment_id}
    
    except DBOSMaxStepRetriesExceeded as e:
        DBOS.logger.error(f"Fraud check failed after retries: {e}")
        update_payment_status(payment_id, "fraud_check_failed", "Service unavailable")
        return {"status": "fraud_check_failed", "payment_id": payment_id}
    
    # Process payment
    update_payment_status(payment_id, "processing")
    
    try:
        transaction_id = process_payment_gateway(payment_id, customer_id, amount, currency)
        DBOS.logger.info(f"Payment processed: {transaction_id}")
        
        update_payment_status(payment_id, "completed")
        send_payment_notification(customer_id, payment_id, "completed", amount)
        
        return {
            "status": "success",
            "payment_id": payment_id,
            "transaction_id": transaction_id,
            "db_id": db_id
        }
    
    except InsufficientFundsError as e:
        DBOS.logger.warning(f"Insufficient funds: {e}")
        update_payment_status(payment_id, "insufficient_funds", str(e))
        send_payment_notification(customer_id, payment_id, "failed (insufficient funds)", amount)
        return {"status": "insufficient_funds", "payment_id": payment_id}
    
    except DBOSMaxStepRetriesExceeded as e:
        DBOS.logger.error(f"Payment gateway failed after retries")
        update_payment_status(payment_id, "gateway_error", "Service unavailable")
        send_payment_notification(customer_id, payment_id, "failed (service error)", amount)
        return {"status": "gateway_error", "payment_id": payment_id}
    
    except PaymentError as e:
        DBOS.logger.error(f"Payment error: {e}")
        update_payment_status(payment_id, "error", str(e))
        send_payment_notification(customer_id, payment_id, "failed", amount)
        return {"status": "error", "payment_id": payment_id, "error": str(e)}

# API endpoint
def process_payment(payment_request: PaymentRequest) -> dict:
    """Process payment with idempotency."""
    
    with SetWorkflowID(f"payment-{payment_request.payment_id}"):
        result = payment_processing_workflow(
            payment_request.payment_id,
            payment_request.customer_id,
            payment_request.amount,
            payment_request.currency
        )
    
    return result

Best Practices

  • Make steps idempotent (safe to retry)
  • Use SetWorkflowID for request deduplication
  • Store step results to avoid re-execution
  • Use external idempotency keys when calling APIs
  • Use step retries for transient failures
  • Implement exponential backoff
  • Set reasonable max retries
  • Don’t retry on business logic errors
  • Catch specific exceptions, not generic Exception
  • Log errors with context
  • Implement graceful degradation
  • Use compensating actions for cleanup
  • Set max_recovery_attempts to prevent infinite loops
  • Monitor workflows exceeding recovery limits
  • Implement dead letter handling
  • Alert on persistent failures

Next Steps

Workflow Tutorial

Learn workflow fundamentals

Configuration

Configure retry and recovery settings

Workflow Management

Monitor and manage failed workflows

Queue Tutorial

Control workflow execution with queues

Build docs developers (and LLMs) love