Skip to main content
DBOS Transact provides context managers to configure workflow execution and access the current execution context.

SetWorkflowID

Set the workflow ID for the next workflow invocation within the context block.
from dbos import SetWorkflowID

with SetWorkflowID(workflow_id):
    result = my_workflow(...)
workflow_id
str
required
The workflow ID to assign to the next workflow started in this block
Only the first workflow started within a SetWorkflowID block receives the specified ID. Subsequent workflows in the same block use auto-generated IDs.

Example: Idempotent Workflow Execution

from dbos import DBOS, SetWorkflowID
import uuid

@DBOS.workflow()
def process_payment(payment_id: str, amount: float) -> str:
    charge_result = charge_customer(amount)
    send_receipt(payment_id, charge_result)
    return charge_result

@DBOS.transaction()
def charge_customer(amount: float) -> str:
    # Database operations
    return "charge_id_123"

@DBOS.step()
def send_receipt(payment_id: str, charge_id: str) -> None:
    # Send email
    pass

# Use a deterministic ID for idempotency
payment_id = "payment-12345"
with SetWorkflowID(f"process-{payment_id}"):
    result1 = process_payment(payment_id, 99.99)

# Second call with same ID retrieves cached result
with SetWorkflowID(f"process-{payment_id}"):
    result2 = process_payment(payment_id, 99.99)

assert result1 == result2  # Same result, no duplicate charge

Example: Starting Background Workflows

from dbos import DBOS, SetWorkflowID

@DBOS.workflow()
def background_job(job_id: str) -> None:
    # Job logic
    pass

job_id = "job-123"
with SetWorkflowID(job_id):
    handle = DBOS.start_workflow(background_job, job_id)

# Later, retrieve the workflow
handle = DBOS.retrieve_workflow(job_id)
status = handle.get_status()

SetWorkflowTimeout

Set a timeout for workflows started within the context block.
from dbos import SetWorkflowTimeout

with SetWorkflowTimeout(timeout_seconds):
    result = my_workflow(...)
timeout_seconds
float
required
Maximum execution time in seconds. Must be positive.

Example: Timeout Protection

from dbos import DBOS, SetWorkflowTimeout
import time

@DBOS.workflow()
def long_running_task() -> str:
    DBOS.sleep(120)  # Sleep for 2 minutes
    return "completed"

# This workflow will be cancelled after 30 seconds
try:
    with SetWorkflowTimeout(30.0):
        result = long_running_task()
except Exception as e:
    print(f"Workflow timed out: {e}")

Example: Different Timeouts for Different Operations

from dbos import DBOS, SetWorkflowTimeout

@DBOS.workflow()
def quick_operation() -> str:
    return "fast"

@DBOS.workflow()
def slow_operation() -> str:
    DBOS.sleep(10)
    return "slow"

# Quick operations get short timeout
with SetWorkflowTimeout(5.0):
    quick_result = quick_operation()

# Slow operations get longer timeout
with SetWorkflowTimeout(30.0):
    slow_result = slow_operation()

SetEnqueueOptions

Configure options for workflow enqueueing operations.
from dbos import SetEnqueueOptions

with SetEnqueueOptions(
    deduplication_id=None,
    priority=None,
    app_version=None,
    queue_partition_key=None
):
    handle = queue.enqueue(workflow, ...)
deduplication_id
str
default:"None"
ID for preventing duplicate enqueues. Workflows with the same deduplication ID execute only once.
priority
int
default:"None"
Priority for the enqueued workflow. Higher values execute first. Must be between 1 and 2,147,483,647.
app_version
str
default:"None"
Application version for the enqueued workflow.
queue_partition_key
str
default:"None"
Partition key for partitioned queues. Workflows with the same key execute in order.

Example: Priority Queue

from dbos import DBOS, Queue, SetEnqueueOptions

priority_queue = Queue("tasks", priority_enabled=True)

@DBOS.workflow()
def process_task(task_id: str) -> None:
    # Task processing logic
    pass

# High priority task
with SetEnqueueOptions(priority=100):
    urgent_handle = priority_queue.enqueue(process_task, "urgent-task")

# Normal priority task
with SetEnqueueOptions(priority=10):
    normal_handle = priority_queue.enqueue(process_task, "normal-task")

# Low priority task
with SetEnqueueOptions(priority=1):
    low_handle = priority_queue.enqueue(process_task, "low-task")

# urgent-task executes before normal-task, which executes before low-task

Example: Deduplication

from dbos import DBOS, Queue, SetEnqueueOptions

queue = Queue("processing")

@DBOS.workflow()
def process_order(order_id: str) -> str:
    return f"Processed {order_id}"

order_id = "order-123"
dedup_key = f"process-{order_id}"

# First enqueue
with SetEnqueueOptions(deduplication_id=dedup_key):
    handle1 = queue.enqueue(process_order, order_id)

# Second enqueue with same dedup key - returns existing workflow
with SetEnqueueOptions(deduplication_id=dedup_key):
    handle2 = queue.enqueue(process_order, order_id)

assert handle1.workflow_id == handle2.workflow_id

Example: Partitioned Queue

from dbos import DBOS, Queue, SetEnqueueOptions

partitioned_queue = Queue("user_updates", partition_queue=True)

@DBOS.workflow()
def update_user(user_id: str, data: dict) -> None:
    # Update user data
    pass

# All updates for user-123 execute in order
for i in range(5):
    with SetEnqueueOptions(queue_partition_key="user-123"):
        partitioned_queue.enqueue(update_user, "user-123", {"update": i})

# Updates for user-456 can run in parallel with user-123
for i in range(5):
    with SetEnqueueOptions(queue_partition_key="user-456"):
        partitioned_queue.enqueue(update_user, "user-456", {"update": i})

DBOSContextSetAuth

Set authentication information for the current context.
from dbos import DBOSContextSetAuth

with DBOSContextSetAuth(user, roles):
    # Operations in this block have authentication set
    result = my_workflow(...)
user
str
required
The authenticated user name or ID
roles
List[str]
required
List of roles granted to the user

Example: Setting Authentication

from dbos import DBOS, DBOSContextSetAuth

@DBOS.workflow()
def admin_operation() -> str:
    # Check current user
    user = DBOS.authenticated_user
    roles = DBOS.authenticated_roles
    
    if "admin" not in roles:
        raise PermissionError("Admin access required")
    
    return f"Admin operation by {user}"

# Set authentication context
with DBOSContextSetAuth("[email protected]", ["admin", "user"]):
    result = admin_operation()  # Succeeds

with DBOSContextSetAuth("[email protected]", ["user"]):
    try:
        result = admin_operation()  # Fails
    except PermissionError as e:
        print(f"Access denied: {e}")

Example: Role-Based Workflow Access

from dbos import DBOS, DBOSContextSetAuth

@DBOS.dbos_class()
@DBOS.default_required_roles(["user"])
class UserService:
    @DBOS.workflow()
    def view_profile(self) -> dict:
        return {"user": DBOS.authenticated_user}
    
    @DBOS.required_roles(["admin"])
    @DBOS.workflow()
    def delete_account(self) -> None:
        # Only admins can delete
        pass

service = UserService()

# User can view profile
with DBOSContextSetAuth("[email protected]", ["user"]):
    profile = service.view_profile()  # Works

# User cannot delete
with DBOSContextSetAuth("[email protected]", ["user"]):
    try:
        service.delete_account()  # Fails - requires admin role
    except Exception as e:
        print(f"Permission denied: {e}")

# Admin can delete
with DBOSContextSetAuth("[email protected]", ["admin", "user"]):
    service.delete_account()  # Works

DBOSContextEnsure

Ensure a DBOS context exists for the current block. Creates one if needed.
from dbos import DBOSContextEnsure

with DBOSContextEnsure():
    # DBOS context is guaranteed to exist
    logger = DBOS.logger
    logger.info("Context available")
This is primarily used internally but can be useful for library code that needs to ensure context availability.

DBOSContext

The DBOSContext object provides access to the current execution context. It’s typically accessed via DBOS class properties.

Properties

  • workflow_id: Optional[str] - Current workflow ID
  • authenticated_user: Optional[str] - Current authenticated user
  • authenticated_roles: Optional[List[str]] - User’s roles
  • assumed_role: Optional[str] - Currently assumed role
  • logger: Logger - DBOS logger instance
  • sql_session: Optional[Session] - SQLAlchemy session (in transactions only)
  • step_status: Optional[StepStatus] - Current step execution status

Accessing Context

from dbos import DBOS

@DBOS.workflow()
def my_workflow() -> None:
    # Access context via DBOS class properties
    wf_id = DBOS.workflow_id
    user = DBOS.authenticated_user
    roles = DBOS.authenticated_roles
    
    DBOS.logger.info(f"Workflow {wf_id} started by {user}")
    
    # Check authorization
    if "admin" not in (roles or []):
        raise PermissionError("Admin access required")

StepStatus

Provides information about the current step execution, including retry status.
from dataclasses import dataclass

@dataclass
class StepStatus:
    step_id: int                      # Step ID within the workflow
    current_attempt: Optional[int]    # Current retry attempt (0-indexed)
    max_attempts: Optional[int]       # Maximum retry attempts

Example: Retry-Aware Step

from dbos import DBOS

@DBOS.step(retries_allowed=True, max_attempts=5, interval_seconds=2.0)
def flaky_operation() -> str:
    status = DBOS.step_status
    
    if status and status.current_attempt is not None:
        DBOS.logger.warning(
            f"Retry attempt {status.current_attempt + 1} of {status.max_attempts}"
        )
    
    # Simulated flaky operation
    import random
    if random.random() < 0.7:  # 70% failure rate
        raise Exception("Operation failed")
    
    return "success"

@DBOS.workflow()
def resilient_workflow() -> str:
    # This will retry up to 5 times
    return flaky_operation()

Combining Context Managers

Context managers can be nested to configure multiple aspects:
from dbos import (
    DBOS,
    SetWorkflowID,
    SetWorkflowTimeout,
    DBOSContextSetAuth,
    SetEnqueueOptions,
    Queue
)

queue = Queue("tasks", priority_enabled=True, partition_queue=True)

@DBOS.workflow()
def complex_task(task_id: str) -> str:
    return f"Completed {task_id}"

# Combine multiple context managers
with DBOSContextSetAuth("[email protected]", ["admin"]):
    with SetWorkflowTimeout(300.0):  # 5 minute timeout
        with SetWorkflowID("unique-task-123"):
            with SetEnqueueOptions(
                priority=100,
                queue_partition_key="admin-tasks",
                deduplication_id="task-dedup-123"
            ):
                handle = queue.enqueue(complex_task, "task-123")

result = handle.get_result()

Build docs developers (and LLMs) love