DBOS Transact provides context managers to configure workflow execution and access the current execution context.
SetWorkflowID
Set the workflow ID for the next workflow invocation within the context block.
from dbos import SetWorkflowID
with SetWorkflowID(workflow_id):
result = my_workflow(...)
The workflow ID to assign to the next workflow started in this block
Only the first workflow started within a SetWorkflowID block receives the specified ID. Subsequent workflows in the same block use auto-generated IDs.
Example: Idempotent Workflow Execution
from dbos import DBOS, SetWorkflowID
import uuid
@DBOS.workflow()
def process_payment(payment_id: str, amount: float) -> str:
charge_result = charge_customer(amount)
send_receipt(payment_id, charge_result)
return charge_result
@DBOS.transaction()
def charge_customer(amount: float) -> str:
# Database operations
return "charge_id_123"
@DBOS.step()
def send_receipt(payment_id: str, charge_id: str) -> None:
# Send email
pass
# Use a deterministic ID for idempotency
payment_id = "payment-12345"
with SetWorkflowID(f"process-{payment_id}"):
result1 = process_payment(payment_id, 99.99)
# Second call with same ID retrieves cached result
with SetWorkflowID(f"process-{payment_id}"):
result2 = process_payment(payment_id, 99.99)
assert result1 == result2 # Same result, no duplicate charge
Example: Starting Background Workflows
from dbos import DBOS, SetWorkflowID
@DBOS.workflow()
def background_job(job_id: str) -> None:
# Job logic
pass
job_id = "job-123"
with SetWorkflowID(job_id):
handle = DBOS.start_workflow(background_job, job_id)
# Later, retrieve the workflow
handle = DBOS.retrieve_workflow(job_id)
status = handle.get_status()
SetWorkflowTimeout
Set a timeout for workflows started within the context block.
from dbos import SetWorkflowTimeout
with SetWorkflowTimeout(timeout_seconds):
result = my_workflow(...)
Maximum execution time in seconds. Must be positive.
Example: Timeout Protection
from dbos import DBOS, SetWorkflowTimeout
import time
@DBOS.workflow()
def long_running_task() -> str:
DBOS.sleep(120) # Sleep for 2 minutes
return "completed"
# This workflow will be cancelled after 30 seconds
try:
with SetWorkflowTimeout(30.0):
result = long_running_task()
except Exception as e:
print(f"Workflow timed out: {e}")
Example: Different Timeouts for Different Operations
from dbos import DBOS, SetWorkflowTimeout
@DBOS.workflow()
def quick_operation() -> str:
return "fast"
@DBOS.workflow()
def slow_operation() -> str:
DBOS.sleep(10)
return "slow"
# Quick operations get short timeout
with SetWorkflowTimeout(5.0):
quick_result = quick_operation()
# Slow operations get longer timeout
with SetWorkflowTimeout(30.0):
slow_result = slow_operation()
SetEnqueueOptions
Configure options for workflow enqueueing operations.
from dbos import SetEnqueueOptions
with SetEnqueueOptions(
deduplication_id=None,
priority=None,
app_version=None,
queue_partition_key=None
):
handle = queue.enqueue(workflow, ...)
ID for preventing duplicate enqueues. Workflows with the same deduplication ID execute only once.
Priority for the enqueued workflow. Higher values execute first. Must be between 1 and 2,147,483,647.
Application version for the enqueued workflow.
Partition key for partitioned queues. Workflows with the same key execute in order.
Example: Priority Queue
from dbos import DBOS, Queue, SetEnqueueOptions
priority_queue = Queue("tasks", priority_enabled=True)
@DBOS.workflow()
def process_task(task_id: str) -> None:
# Task processing logic
pass
# High priority task
with SetEnqueueOptions(priority=100):
urgent_handle = priority_queue.enqueue(process_task, "urgent-task")
# Normal priority task
with SetEnqueueOptions(priority=10):
normal_handle = priority_queue.enqueue(process_task, "normal-task")
# Low priority task
with SetEnqueueOptions(priority=1):
low_handle = priority_queue.enqueue(process_task, "low-task")
# urgent-task executes before normal-task, which executes before low-task
Example: Deduplication
from dbos import DBOS, Queue, SetEnqueueOptions
queue = Queue("processing")
@DBOS.workflow()
def process_order(order_id: str) -> str:
return f"Processed {order_id}"
order_id = "order-123"
dedup_key = f"process-{order_id}"
# First enqueue
with SetEnqueueOptions(deduplication_id=dedup_key):
handle1 = queue.enqueue(process_order, order_id)
# Second enqueue with same dedup key - returns existing workflow
with SetEnqueueOptions(deduplication_id=dedup_key):
handle2 = queue.enqueue(process_order, order_id)
assert handle1.workflow_id == handle2.workflow_id
Example: Partitioned Queue
from dbos import DBOS, Queue, SetEnqueueOptions
partitioned_queue = Queue("user_updates", partition_queue=True)
@DBOS.workflow()
def update_user(user_id: str, data: dict) -> None:
# Update user data
pass
# All updates for user-123 execute in order
for i in range(5):
with SetEnqueueOptions(queue_partition_key="user-123"):
partitioned_queue.enqueue(update_user, "user-123", {"update": i})
# Updates for user-456 can run in parallel with user-123
for i in range(5):
with SetEnqueueOptions(queue_partition_key="user-456"):
partitioned_queue.enqueue(update_user, "user-456", {"update": i})
DBOSContextSetAuth
Set authentication information for the current context.
from dbos import DBOSContextSetAuth
with DBOSContextSetAuth(user, roles):
# Operations in this block have authentication set
result = my_workflow(...)
The authenticated user name or ID
List of roles granted to the user
Example: Setting Authentication
from dbos import DBOS, DBOSContextSetAuth
@DBOS.workflow()
def admin_operation() -> str:
# Check current user
user = DBOS.authenticated_user
roles = DBOS.authenticated_roles
if "admin" not in roles:
raise PermissionError("Admin access required")
return f"Admin operation by {user}"
# Set authentication context
with DBOSContextSetAuth("[email protected]", ["admin", "user"]):
result = admin_operation() # Succeeds
with DBOSContextSetAuth("[email protected]", ["user"]):
try:
result = admin_operation() # Fails
except PermissionError as e:
print(f"Access denied: {e}")
Example: Role-Based Workflow Access
from dbos import DBOS, DBOSContextSetAuth
@DBOS.dbos_class()
@DBOS.default_required_roles(["user"])
class UserService:
@DBOS.workflow()
def view_profile(self) -> dict:
return {"user": DBOS.authenticated_user}
@DBOS.required_roles(["admin"])
@DBOS.workflow()
def delete_account(self) -> None:
# Only admins can delete
pass
service = UserService()
# User can view profile
with DBOSContextSetAuth("[email protected]", ["user"]):
profile = service.view_profile() # Works
# User cannot delete
with DBOSContextSetAuth("[email protected]", ["user"]):
try:
service.delete_account() # Fails - requires admin role
except Exception as e:
print(f"Permission denied: {e}")
# Admin can delete
with DBOSContextSetAuth("[email protected]", ["admin", "user"]):
service.delete_account() # Works
DBOSContextEnsure
Ensure a DBOS context exists for the current block. Creates one if needed.
from dbos import DBOSContextEnsure
with DBOSContextEnsure():
# DBOS context is guaranteed to exist
logger = DBOS.logger
logger.info("Context available")
This is primarily used internally but can be useful for library code that needs to ensure context availability.
DBOSContext
The DBOSContext object provides access to the current execution context. It’s typically accessed via DBOS class properties.
Properties
workflow_id: Optional[str] - Current workflow ID
authenticated_user: Optional[str] - Current authenticated user
authenticated_roles: Optional[List[str]] - User’s roles
assumed_role: Optional[str] - Currently assumed role
logger: Logger - DBOS logger instance
sql_session: Optional[Session] - SQLAlchemy session (in transactions only)
step_status: Optional[StepStatus] - Current step execution status
Accessing Context
from dbos import DBOS
@DBOS.workflow()
def my_workflow() -> None:
# Access context via DBOS class properties
wf_id = DBOS.workflow_id
user = DBOS.authenticated_user
roles = DBOS.authenticated_roles
DBOS.logger.info(f"Workflow {wf_id} started by {user}")
# Check authorization
if "admin" not in (roles or []):
raise PermissionError("Admin access required")
StepStatus
Provides information about the current step execution, including retry status.
from dataclasses import dataclass
@dataclass
class StepStatus:
step_id: int # Step ID within the workflow
current_attempt: Optional[int] # Current retry attempt (0-indexed)
max_attempts: Optional[int] # Maximum retry attempts
Example: Retry-Aware Step
from dbos import DBOS
@DBOS.step(retries_allowed=True, max_attempts=5, interval_seconds=2.0)
def flaky_operation() -> str:
status = DBOS.step_status
if status and status.current_attempt is not None:
DBOS.logger.warning(
f"Retry attempt {status.current_attempt + 1} of {status.max_attempts}"
)
# Simulated flaky operation
import random
if random.random() < 0.7: # 70% failure rate
raise Exception("Operation failed")
return "success"
@DBOS.workflow()
def resilient_workflow() -> str:
# This will retry up to 5 times
return flaky_operation()
Combining Context Managers
Context managers can be nested to configure multiple aspects:
from dbos import (
DBOS,
SetWorkflowID,
SetWorkflowTimeout,
DBOSContextSetAuth,
SetEnqueueOptions,
Queue
)
queue = Queue("tasks", priority_enabled=True, partition_queue=True)
@DBOS.workflow()
def complex_task(task_id: str) -> str:
return f"Completed {task_id}"
# Combine multiple context managers
with DBOSContextSetAuth("[email protected]", ["admin"]):
with SetWorkflowTimeout(300.0): # 5 minute timeout
with SetWorkflowID("unique-task-123"):
with SetEnqueueOptions(
priority=100,
queue_partition_key="admin-tasks",
deduplication_id="task-dedup-123"
):
handle = queue.enqueue(complex_task, "task-123")
result = handle.get_result()