Skip to main content
A WorkflowHandle represents a workflow execution and provides methods to check its status and retrieve its result. Handles are returned when starting or enqueueing workflows.

Overview

Workflow handles allow you to:
  • Track workflow execution status
  • Wait for workflow completion and retrieve results
  • Access the workflow ID for later retrieval
DBOS provides two handle types:
  • WorkflowHandle[R] - For synchronous code
  • WorkflowHandleAsync[R] - For async code
Both provide the same functionality but differ in their method signatures (sync vs async).

Getting a Handle

From start_workflow

from dbos import DBOS

@DBOS.workflow()
def process_order(order_id: str) -> str:
    return f"Processed {order_id}"

# Start a workflow and get a handle
handle = DBOS.start_workflow(process_order, "order-123")

From Queue.enqueue

from dbos import DBOS, Queue

queue = Queue("orders")

@DBOS.workflow()
def process_order(order_id: str) -> str:
    return f"Processed {order_id}"

# Enqueue a workflow and get a handle
handle = queue.enqueue(process_order, "order-123")

From retrieve_workflow

from dbos import DBOS

# Retrieve a handle to an existing workflow
handle = DBOS.retrieve_workflow("my-workflow-id")

From DBOSClient

from dbos import DBOSClient

client = DBOSClient(application_database_url=url)
handle = client.retrieve_workflow("my-workflow-id")

WorkflowHandle Methods

workflow_id

wf_id = handle.workflow_id
str
str
The unique ID of the workflow
Example:
handle = DBOS.start_workflow(process_order, "order-123")
print(f"Started workflow: {handle.workflow_id}")

# Store the ID for later retrieval
workflow_id = handle.workflow_id
save_to_database(workflow_id)

# Later, retrieve the workflow
handle = DBOS.retrieve_workflow(workflow_id)

get_workflow_id

wf_id = handle.get_workflow_id()
Alternative method to access the workflow ID (same as handle.workflow_id).
str
str
The unique ID of the workflow

get_result

result = handle.get_result(polling_interval_sec=1.0)
Wait for the workflow to complete and return its result. Blocks until completion.
polling_interval_sec
float
default:"1.0"
How often to poll for completion (in seconds)
R
R
The workflow’s return value
Example:
@DBOS.workflow()
def calculate_total(items: list) -> float:
    total = sum(item['price'] for item in items)
    return total

handle = DBOS.start_workflow(calculate_total, [{"price": 10.0}, {"price": 20.0}])

# Wait for completion (polls every 1 second by default)
result = handle.get_result()
print(f"Total: ${result}")  # Total: $30.0

# Custom polling interval (checks every 5 seconds)
result = handle.get_result(polling_interval_sec=5.0)
Error Handling:
from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError

@DBOS.workflow()
def might_fail() -> str:
    # Workflow logic
    return "success"

handle = DBOS.start_workflow(might_fail)

try:
    result = handle.get_result()
    print(f"Success: {result}")
except DBOSAwaitedWorkflowCancelledError:
    print("Workflow was cancelled")
except Exception as e:
    print(f"Workflow failed: {e}")

get_status

status = handle.get_status()
Get the current status of the workflow without waiting for completion.
WorkflowStatus
WorkflowStatus
Object containing workflow status information
WorkflowStatus Fields:
  • workflow_id: str - The workflow ID
  • status: str - Current status (see WorkflowStatusString below)
  • name: str - Workflow function name
  • input: Optional[Any] - Workflow input arguments
  • output: Optional[Any] - Workflow return value (if completed)
  • error: Optional[str] - Error message (if failed)
  • created_at: datetime - When the workflow was created
  • updated_at: datetime - When the workflow was last updated
  • app_version: Optional[str] - Application version
  • executor_id: Optional[str] - Executor that ran the workflow
  • queue_name: Optional[str] - Queue name (if enqueued)
  • dequeued_at: Optional[datetime] - When dequeued (if applicable)

WorkflowStatusString

Enumeration of possible workflow status values.
from enum import Enum

class WorkflowStatusString(Enum):
    PENDING = "PENDING"
    SUCCESS = "SUCCESS"
    ERROR = "ERROR"
    MAX_RECOVERY_ATTEMPTS_EXCEEDED = "MAX_RECOVERY_ATTEMPTS_EXCEEDED"
    CANCELLED = "CANCELLED"
    ENQUEUED = "ENQUEUED"
Status Values:
PENDING
str
Workflow is currently running
SUCCESS
str
Workflow completed successfully
ERROR
str
Workflow failed with an error
MAX_RECOVERY_ATTEMPTS_EXCEEDED
str
Workflow exhausted its maximum recovery attempts
CANCELLED
str
Workflow was cancelled
ENQUEUED
str
Workflow is enqueued and waiting to be executed
Example:
@DBOS.workflow()
def long_running_task() -> str:
    DBOS.sleep(60)
    return "done"

handle = DBOS.start_workflow(long_running_task)

# Check status without blocking
status = handle.get_status()
print(f"Status: {status.status}")  # "PENDING"
print(f"Created: {status.created_at}")
print(f"Workflow: {status.name}")

# Check if completed
if status.status == "SUCCESS":
    print(f"Result: {status.output}")
elif status.status == "ERROR":
    print(f"Error: {status.error}")
else:
    print(f"Still running...")
Polling for Completion:
import time
from dbos import DBOS

@DBOS.workflow()
def background_job() -> str:
    DBOS.sleep(30)
    return "completed"

handle = DBOS.start_workflow(background_job)

# Poll manually with custom logic
while True:
    status = handle.get_status()
    
    if status.status == "SUCCESS":
        print(f"Job completed: {status.output}")
        break
    elif status.status == "ERROR":
        print(f"Job failed: {status.error}")
        break
    else:
        print(f"Job status: {status.status}")
        time.sleep(5)  # Wait 5 seconds before checking again

WorkflowHandleAsync Methods

The async handle has the same methods but they’re async functions:

get_result (async)

result = await handle.get_result(polling_interval_sec=1.0)
Example:
import asyncio
from dbos import DBOS

@DBOS.workflow()
async def async_workflow(value: int) -> int:
    await DBOS.sleep_async(5)
    return value * 2

async def main():
    handle = await DBOS.start_workflow_async(async_workflow, 21)
    result = await handle.get_result()
    print(f"Result: {result}")  # Result: 42

asyncio.run(main())

get_status (async)

status = await handle.get_status()
Example:
import asyncio
from dbos import DBOS

@DBOS.workflow()
async def async_task() -> str:
    await DBOS.sleep_async(10)
    return "done"

async def monitor_workflow():
    handle = await DBOS.start_workflow_async(async_task)
    
    while True:
        status = await handle.get_status()
        
        if status.status == "SUCCESS":
            print(f"Completed: {status.output}")
            break
        elif status.status == "ERROR":
            print(f"Failed: {status.error}")
            break
        else:
            print(f"Status: {status.status}")
            await asyncio.sleep(2)

asyncio.run(monitor_workflow())

Pattern: Fire and Forget

from dbos import DBOS, Queue

queue = Queue("background_tasks")

@DBOS.workflow()
def send_email(to: str, subject: str, body: str) -> None:
    # Send email logic
    pass

# Enqueue and don't wait for result
queue.enqueue(send_email, "[email protected]", "Welcome", "Thanks for signing up!")
# Continue immediately without blocking

Pattern: Batch Processing

from dbos import DBOS, Queue
from typing import List

queue = Queue("batch_jobs", concurrency=5)

@DBOS.workflow()
def process_item(item_id: str) -> str:
    # Process item logic
    return f"Processed {item_id}"

# Start many workflows
handles: List[WorkflowHandle] = []
for i in range(100):
    handle = queue.enqueue(process_item, f"item-{i}")
    handles.append(handle)

# Wait for all to complete
results = [h.get_result() for h in handles]
print(f"Processed {len(results)} items")

Pattern: Conditional Waiting

from dbos import DBOS
import time

@DBOS.workflow()
def important_task() -> str:
    DBOS.sleep(30)
    return "critical result"

@DBOS.workflow()
def optional_task() -> str:
    DBOS.sleep(30)
    return "optional result"

important = DBOS.start_workflow(important_task)
optional = DBOS.start_workflow(optional_task)

# Wait for important task
important_result = important.get_result()
print(f"Important: {important_result}")

# Check optional task without blocking
optional_status = optional.get_status()
if optional_status.status == "SUCCESS":
    print(f"Optional: {optional_status.output}")
else:
    print("Optional task still running, continuing without it")

Pattern: Wait for First Completion

from dbos import DBOS
from typing import List

@DBOS.workflow()
def query_api(api_name: str) -> dict:
    # Query external API
    return {"api": api_name, "data": "..."}

# Start multiple redundant queries
handles = [
    DBOS.start_workflow(query_api, "api1"),
    DBOS.start_workflow(query_api, "api2"),
    DBOS.start_workflow(query_api, "api3"),
]

# Use the first one that completes
completed = DBOS.wait_first(handles)
result = completed.get_result()
print(f"First result from: {result['api']}")

Build docs developers (and LLMs) love