Introduction
DBOS provides powerful error handling capabilities including automatic retries, workflow recovery, and configurable retry strategies. Understanding these mechanisms is essential for building resilient applications.Workflow Recovery
Workflows automatically recover from failures by replaying from their last completed step:from dbos import DBOS
import sqlalchemy as sa
@DBOS.transaction()
def save_order(order_id: str, total: float) -> int:
"""This transaction commits if successful."""
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO orders (order_id, total) VALUES (:id, :total) RETURNING id"
),
{"id": order_id, "total": total}
)
return result.fetchone()[0]
@DBOS.step()
def charge_payment(amount: float) -> str:
"""This step's result is cached on success."""
# External payment API call that might fail
transaction_id = payment_gateway.charge(amount)
return transaction_id
@DBOS.step()
def send_confirmation(order_id: str) -> None:
"""Send confirmation email."""
email_service.send(f"Order {order_id} confirmed")
@DBOS.workflow()
def process_order_workflow(order_id: str, total: float) -> dict:
"""Resilient order processing."""
# Step 1: Save order (committed to database)
db_id = save_order(order_id, total)
# Step 2: Charge payment (cached on success)
transaction_id = charge_payment(total)
# Step 3: Send confirmation
send_confirmation(order_id)
return {"db_id": db_id, "transaction_id": transaction_id}
# If the process crashes after step 2:
# - Step 1 is NOT re-executed (transaction already committed)
# - Step 2 is NOT re-executed (result is cached)
# - Step 3 WILL execute on recovery
# - External systems are NOT called multiple times
Workflow recovery is automatic. DBOS tracks which steps have completed and skips them on replay.
Step Retries
Configure automatic retries for steps:from dbos import DBOS, StepOptions
# Retry up to 5 times with exponential backoff
@DBOS.step(retries=5)
def unreliable_api_call(data: str) -> dict:
"""This step will retry on failure."""
response = external_api.post("/endpoint", json={"data": data})
return response.json()
# Custom retry configuration
@DBOS.step(
retries=10,
interval_seconds=2.0, # Initial delay between retries
backoff_rate=2.0 # Exponential backoff multiplier
)
def highly_unreliable_operation(value: int) -> str:
"""Retries with custom configuration."""
# Retry intervals: 2s, 4s, 8s, 16s, 32s, ...
result = flaky_service.process(value)
return result
@DBOS.workflow()
def resilient_workflow(data: str) -> dict:
"""Workflow with retry logic."""
# This step automatically retries on failure
result1 = unreliable_api_call(data)
# This step has aggressive retry policy
result2 = highly_unreliable_operation(42)
return {"result1": result1, "result2": result2}
Accessing Retry Information
@DBOS.step(retries=5)
def step_with_retry_info() -> dict:
"""Access current retry attempt information."""
step_status = DBOS.step_status
if step_status:
print(f"Attempt {step_status.current_attempt + 1} of {step_status.max_attempts}")
if step_status.current_attempt > 0:
DBOS.logger.warning(f"Retrying after {step_status.current_attempt} failures")
# Attempt risky operation
return perform_operation()
Handling Step Retry Exhaustion
Handle when all retries are exhausted:from dbos.error import DBOSMaxStepRetriesExceeded
@DBOS.step(retries=3)
def may_fail_step() -> str:
"""Step that might fail."""
return risky_operation()
@DBOS.workflow()
def handle_retry_exhaustion_workflow() -> dict:
"""Handle retry failures gracefully."""
try:
result = may_fail_step()
return {"status": "success", "result": result}
except DBOSMaxStepRetriesExceeded as e:
# All retries exhausted
DBOS.logger.error(f"Step {e.step_name} failed after {e.max_retries} retries")
# Log all errors
for i, error in enumerate(e.errors):
DBOS.logger.error(f"Attempt {i + 1}: {error}")
# Fallback logic
return {"status": "failed", "reason": "max_retries_exceeded"}
Max Recovery Attempts
Limit how many times a workflow can be recovered:from dbos import DBOS
from dbos.error import MaxRecoveryAttemptsExceededError
@DBOS.workflow(max_recovery_attempts=5)
def limited_recovery_workflow(data: str) -> str:
"""This workflow can be recovered at most 5 times."""
# Process data
process_data(data)
# If this workflow fails and is recovered more than 5 times,
# it will permanently fail
return "completed"
@DBOS.step()
def process_data(data: str) -> None:
"""Processing logic."""
# Might fail
pass
# Workflows that exceed max recovery attempts will fail permanently
# and cannot be recovered or restarted
Once a workflow exceeds max recovery attempts, it cannot be executed again. Use this to prevent infinite retry loops.
Transaction Serialization Errors
Handle database serialization failures automatically:@DBOS.transaction()
def concurrent_update(account_id: str, amount: float) -> float:
"""This transaction automatically retries on serialization errors."""
# This might encounter serialization conflicts in high concurrency
result = DBOS.sql_session.execute(
sa.text(
"UPDATE accounts SET balance = balance + :amount "
"WHERE account_id = :id RETURNING balance"
),
{"amount": amount, "id": account_id}
)
new_balance = result.fetchone()[0]
return new_balance
# Serialization errors are automatically retried by the database driver
# No special handling needed
PostgreSQL serialization errors are automatically retried. DBOS handles this transparently.
Error Propagation
Errors propagate through workflow hierarchy:from dbos import DBOS
@DBOS.step()
def failing_step() -> str:
"""This step will fail."""
raise ValueError("Something went wrong")
@DBOS.workflow()
def child_workflow() -> str:
"""Child workflow that fails."""
return failing_step()
@DBOS.workflow()
def parent_workflow() -> dict:
"""Parent workflow handles child failure."""
try:
result = child_workflow()
return {"status": "success", "result": result}
except ValueError as e:
DBOS.logger.error(f"Child workflow failed: {e}")
# Implement fallback logic
fallback_result = handle_failure()
return {"status": "fallback", "result": fallback_result}
@DBOS.step()
def handle_failure() -> str:
"""Fallback logic."""
return "fallback_value"
Awaiting Child Workflows
Handle child workflow failures:from dbos import DBOS
from dbos.error import (
DBOSAwaitedWorkflowCancelledError,
DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded
)
@DBOS.workflow()
def child_that_may_fail() -> str:
"""Child workflow that might fail."""
# Risky operation
return risky_operation()
@DBOS.workflow()
def parent_with_error_handling() -> dict:
"""Handle various child workflow failures."""
# Start child workflow
handle = DBOS.start_workflow(child_that_may_fail)
try:
result = handle.get_result()
return {"status": "success", "result": result}
except DBOSAwaitedWorkflowCancelledError as e:
# Child workflow was cancelled
DBOS.logger.warning(f"Child workflow {e.workflow_id} was cancelled")
return {"status": "cancelled", "workflow_id": e.workflow_id}
except DBOSAwaitedWorkflowMaxRecoveryAttemptsExceeded as e:
# Child exceeded max recovery attempts
DBOS.logger.error(f"Child workflow {e.workflow_id} exceeded max recovery attempts")
return {"status": "max_retries", "workflow_id": e.workflow_id}
except Exception as e:
# Other errors
DBOS.logger.error(f"Child workflow failed: {e}")
return {"status": "error", "error": str(e)}
Timeout Handling
Set workflow timeouts:from dbos import DBOS, SetWorkflowTimeout
import time
@DBOS.workflow()
def long_running_workflow() -> str:
"""Workflow that might take too long."""
# Simulate long-running work
DBOS.sleep(120) # 2 minutes
return "completed"
# Set timeout using context manager
try:
with SetWorkflowTimeout(60): # 60 second timeout
result = long_running_workflow()
except Exception as e:
print(f"Workflow timed out: {e}")
# Or set timeout when starting workflow
handle = DBOS.start_workflow(long_running_workflow)
# Timeout is enforced automatically
Validation and Input Errors
Validate inputs early:from pydantic import BaseModel, ValidationError, field_validator
from dbos import pydantic_args_validator, DBOS
class OrderInput(BaseModel):
order_id: str
customer_email: str
total: float
@field_validator('total')
@classmethod
def validate_total(cls, v: float) -> float:
if v <= 0:
raise ValueError('total must be positive')
return v
@field_validator('customer_email')
@classmethod
def validate_email(cls, v: str) -> str:
if '@' not in v:
raise ValueError('invalid email address')
return v
@DBOS.workflow()
@pydantic_args_validator(OrderInput)
def validated_order_workflow(order_id: str, customer_email: str, total: float) -> dict:
"""Workflow with validated inputs."""
# Inputs are guaranteed to be valid
return process_order(order_id, customer_email, total)
# Invalid input raises ValidationError before workflow starts
try:
validated_order_workflow("order-1", "invalid-email", -10.0)
except ValidationError as e:
print(f"Validation failed: {e}")
Custom Error Types
Define domain-specific errors:class InsufficientInventoryError(Exception):
"""Raised when inventory is insufficient."""
def __init__(self, product_id: str, requested: int, available: int):
self.product_id = product_id
self.requested = requested
self.available = available
super().__init__(
f"Insufficient inventory for {product_id}: "
f"requested {requested}, available {available}"
)
class PaymentDeclinedError(Exception):
"""Raised when payment is declined."""
def __init__(self, reason: str):
self.reason = reason
super().__init__(f"Payment declined: {reason}")
@DBOS.step()
def check_inventory(product_id: str, quantity: int) -> None:
"""Check if inventory is sufficient."""
available = get_inventory_level(product_id)
if available < quantity:
raise InsufficientInventoryError(product_id, quantity, available)
@DBOS.step()
def process_payment(amount: float) -> str:
"""Process payment."""
result = payment_gateway.charge(amount)
if not result.approved:
raise PaymentDeclinedError(result.decline_reason)
return result.transaction_id
@DBOS.workflow()
def order_workflow(product_id: str, quantity: int, amount: float) -> dict:
"""Handle domain-specific errors."""
try:
# Check inventory
check_inventory(product_id, quantity)
# Process payment
transaction_id = process_payment(amount)
return {"status": "success", "transaction_id": transaction_id}
except InsufficientInventoryError as e:
DBOS.logger.warning(f"Inventory insufficient: {e}")
return {
"status": "inventory_error",
"product_id": e.product_id,
"requested": e.requested,
"available": e.available
}
except PaymentDeclinedError as e:
DBOS.logger.warning(f"Payment declined: {e.reason}")
return {
"status": "payment_declined",
"reason": e.reason
}
Compensating Actions
Implement rollback logic:@DBOS.step()
def reserve_hotel(hotel_id: str, dates: dict) -> str:
"""Reserve hotel."""
reservation_id = hotel_api.reserve(hotel_id, dates)
return reservation_id
@DBOS.step()
def cancel_hotel_reservation(reservation_id: str) -> None:
"""Cancel hotel reservation."""
hotel_api.cancel(reservation_id)
@DBOS.step()
def book_flight(flight_id: str) -> str:
"""Book flight."""
booking_id = flight_api.book(flight_id)
return booking_id
@DBOS.step()
def cancel_flight(booking_id: str) -> None:
"""Cancel flight."""
flight_api.cancel(booking_id)
@DBOS.workflow()
def travel_booking_workflow(hotel_id: str, flight_id: str, dates: dict) -> dict:
"""Book travel with compensating actions."""
hotel_reservation = None
flight_booking = None
try:
# Reserve hotel
hotel_reservation = reserve_hotel(hotel_id, dates)
# Book flight
flight_booking = book_flight(flight_id)
return {
"status": "success",
"hotel_reservation": hotel_reservation,
"flight_booking": flight_booking
}
except Exception as e:
DBOS.logger.error(f"Booking failed: {e}")
# Compensate by cancelling successful bookings
if flight_booking:
try:
cancel_flight(flight_booking)
DBOS.logger.info("Flight cancelled")
except Exception as ce:
DBOS.logger.error(f"Failed to cancel flight: {ce}")
if hotel_reservation:
try:
cancel_hotel_reservation(hotel_reservation)
DBOS.logger.info("Hotel reservation cancelled")
except Exception as ce:
DBOS.logger.error(f"Failed to cancel hotel: {ce}")
return {
"status": "failed",
"error": str(e),
"compensated": True
}
Circuit Breaker Pattern
Implement circuit breaker for external services:from datetime import datetime, timedelta
from typing import Optional
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.state = "closed" # closed, open, half_open
def call(self, func, *args, **kwargs):
# Check if circuit should reset
if self.state == "open":
if self.last_failure_time:
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
if elapsed > self.timeout_seconds:
self.state = "half_open"
self.failure_count = 0
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
# Success - reset if in half_open
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise e
# Global circuit breaker instance
external_service_breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)
@DBOS.step(retries=0) # No retries, circuit breaker handles it
def call_external_service(data: str) -> dict:
"""Call external service with circuit breaker."""
return external_service_breaker.call(
lambda: external_api.process(data)
)
@DBOS.workflow()
def workflow_with_circuit_breaker(data: str) -> dict:
"""Workflow using circuit breaker pattern."""
try:
result = call_external_service(data)
return {"status": "success", "result": result}
except Exception as e:
if "Circuit breaker is open" in str(e):
# Use fallback when circuit is open
DBOS.logger.warning("Circuit breaker open, using fallback")
return {"status": "fallback", "result": get_cached_result()}
else:
raise
@DBOS.step()
def get_cached_result() -> dict:
"""Fallback cached result."""
return {"cached": True}
Complete Example: Resilient Payment Processing
from dbos import DBOS, StepOptions, SetWorkflowID, pydantic_args_validator
from dbos.error import DBOSMaxStepRetriesExceeded
from pydantic import BaseModel, field_validator
import sqlalchemy as sa
from datetime import datetime
from typing import Optional
class PaymentError(Exception):
"""Base payment error."""
pass
class InsufficientFundsError(PaymentError):
"""Insufficient funds."""
pass
class FraudDetectedError(PaymentError):
"""Fraud detected."""
pass
class PaymentRequest(BaseModel):
payment_id: str
customer_id: str
amount: float
currency: str = "USD"
@field_validator('amount')
@classmethod
def validate_amount(cls, v: float) -> float:
if v <= 0:
raise ValueError('amount must be positive')
if v > 100000:
raise ValueError('amount exceeds maximum')
return v
@DBOS.transaction()
def create_payment_record(payment_id: str, customer_id: str, amount: float, currency: str) -> int:
"""Create payment record in database."""
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO payments (payment_id, customer_id, amount, currency, status, created_at) "
"VALUES (:pid, :cid, :amount, :currency, 'pending', :created) RETURNING id"
),
{
"pid": payment_id,
"cid": customer_id,
"amount": amount,
"currency": currency,
"created": datetime.now()
}
)
return result.fetchone()[0]
@DBOS.transaction()
def update_payment_status(payment_id: str, status: str, error_message: Optional[str] = None) -> None:
"""Update payment status."""
DBOS.sql_session.execute(
sa.text(
"UPDATE payments SET status = :status, error_message = :error, updated_at = :updated "
"WHERE payment_id = :pid"
),
{
"status": status,
"error": error_message,
"updated": datetime.now(),
"pid": payment_id
}
)
@DBOS.step(retries=3, interval_seconds=1.0, backoff_rate=2.0)
def check_fraud(customer_id: str, amount: float) -> dict:
"""Check for fraud with retries."""
result = fraud_service.check(customer_id, amount)
if result.fraud_detected:
raise FraudDetectedError(result.reason)
return {"risk_score": result.risk_score, "approved": True}
@DBOS.step(retries=5, interval_seconds=2.0, backoff_rate=1.5)
def process_payment_gateway(payment_id: str, customer_id: str, amount: float, currency: str) -> str:
"""Process payment through gateway with retries."""
try:
transaction = payment_gateway.charge(
customer_id=customer_id,
amount=amount,
currency=currency,
idempotency_key=payment_id
)
if transaction.status == "insufficient_funds":
raise InsufficientFundsError("Customer has insufficient funds")
if transaction.status != "approved":
raise PaymentError(f"Payment failed: {transaction.status}")
return transaction.id
except PaymentError:
raise # Re-raise payment errors
except Exception as e:
# Wrap other errors for retry
raise PaymentError(f"Gateway error: {e}")
@DBOS.step()
def send_payment_notification(customer_id: str, payment_id: str, status: str, amount: float) -> None:
"""Send payment notification."""
notification_service.send(
customer_id,
f"Payment {payment_id} {status}: ${amount:.2f}"
)
@DBOS.workflow(max_recovery_attempts=10)
@pydantic_args_validator(PaymentRequest)
def payment_processing_workflow(
payment_id: str,
customer_id: str,
amount: float,
currency: str = "USD"
) -> dict:
"""Process payment with comprehensive error handling."""
DBOS.logger.info(f"Processing payment {payment_id} for ${amount}")
# Create payment record
db_id = create_payment_record(payment_id, customer_id, amount, currency)
update_payment_status(payment_id, "fraud_check")
# Fraud check
try:
fraud_result = check_fraud(customer_id, amount)
DBOS.logger.info(f"Fraud check passed (risk score: {fraud_result['risk_score']})")
except FraudDetectedError as e:
DBOS.logger.error(f"Fraud detected: {e}")
update_payment_status(payment_id, "fraud_rejected", str(e))
send_payment_notification(customer_id, payment_id, "rejected (fraud)", amount)
return {"status": "fraud_rejected", "payment_id": payment_id}
except DBOSMaxStepRetriesExceeded as e:
DBOS.logger.error(f"Fraud check failed after retries: {e}")
update_payment_status(payment_id, "fraud_check_failed", "Service unavailable")
return {"status": "fraud_check_failed", "payment_id": payment_id}
# Process payment
update_payment_status(payment_id, "processing")
try:
transaction_id = process_payment_gateway(payment_id, customer_id, amount, currency)
DBOS.logger.info(f"Payment processed: {transaction_id}")
update_payment_status(payment_id, "completed")
send_payment_notification(customer_id, payment_id, "completed", amount)
return {
"status": "success",
"payment_id": payment_id,
"transaction_id": transaction_id,
"db_id": db_id
}
except InsufficientFundsError as e:
DBOS.logger.warning(f"Insufficient funds: {e}")
update_payment_status(payment_id, "insufficient_funds", str(e))
send_payment_notification(customer_id, payment_id, "failed (insufficient funds)", amount)
return {"status": "insufficient_funds", "payment_id": payment_id}
except DBOSMaxStepRetriesExceeded as e:
DBOS.logger.error(f"Payment gateway failed after retries")
update_payment_status(payment_id, "gateway_error", "Service unavailable")
send_payment_notification(customer_id, payment_id, "failed (service error)", amount)
return {"status": "gateway_error", "payment_id": payment_id}
except PaymentError as e:
DBOS.logger.error(f"Payment error: {e}")
update_payment_status(payment_id, "error", str(e))
send_payment_notification(customer_id, payment_id, "failed", amount)
return {"status": "error", "payment_id": payment_id, "error": str(e)}
# API endpoint
def process_payment(payment_request: PaymentRequest) -> dict:
"""Process payment with idempotency."""
with SetWorkflowID(f"payment-{payment_request.payment_id}"):
result = payment_processing_workflow(
payment_request.payment_id,
payment_request.customer_id,
payment_request.amount,
payment_request.currency
)
return result
Best Practices
Design for Idempotency
Design for Idempotency
- Make steps idempotent (safe to retry)
- Use SetWorkflowID for request deduplication
- Store step results to avoid re-execution
- Use external idempotency keys when calling APIs
Retry Strategy
Retry Strategy
- Use step retries for transient failures
- Implement exponential backoff
- Set reasonable max retries
- Don’t retry on business logic errors
Error Handling
Error Handling
- Catch specific exceptions, not generic Exception
- Log errors with context
- Implement graceful degradation
- Use compensating actions for cleanup
Recovery Limits
Recovery Limits
- Set max_recovery_attempts to prevent infinite loops
- Monitor workflows exceeding recovery limits
- Implement dead letter handling
- Alert on persistent failures
Next Steps
Workflow Tutorial
Learn workflow fundamentals
Configuration
Configure retry and recovery settings
Workflow Management
Monitor and manage failed workflows
Queue Tutorial
Control workflow execution with queues