A WorkflowHandle represents a workflow execution and provides methods to check its status and retrieve its result. Handles are returned when starting or enqueueing workflows.
Overview
Workflow handles allow you to:
- Track workflow execution status
- Wait for workflow completion and retrieve results
- Access the workflow ID for later retrieval
DBOS provides two handle types:
WorkflowHandle[R] - For synchronous code
WorkflowHandleAsync[R] - For async code
Both provide the same functionality but differ in their method signatures (sync vs async).
Getting a Handle
From start_workflow
from dbos import DBOS
@DBOS.workflow()
def process_order(order_id: str) -> str:
return f"Processed {order_id}"
# Start a workflow and get a handle
handle = DBOS.start_workflow(process_order, "order-123")
From Queue.enqueue
from dbos import DBOS, Queue
queue = Queue("orders")
@DBOS.workflow()
def process_order(order_id: str) -> str:
return f"Processed {order_id}"
# Enqueue a workflow and get a handle
handle = queue.enqueue(process_order, "order-123")
From retrieve_workflow
from dbos import DBOS
# Retrieve a handle to an existing workflow
handle = DBOS.retrieve_workflow("my-workflow-id")
From DBOSClient
from dbos import DBOSClient
client = DBOSClient(application_database_url=url)
handle = client.retrieve_workflow("my-workflow-id")
WorkflowHandle Methods
workflow_id
wf_id = handle.workflow_id
The unique ID of the workflow
Example:
handle = DBOS.start_workflow(process_order, "order-123")
print(f"Started workflow: {handle.workflow_id}")
# Store the ID for later retrieval
workflow_id = handle.workflow_id
save_to_database(workflow_id)
# Later, retrieve the workflow
handle = DBOS.retrieve_workflow(workflow_id)
get_workflow_id
wf_id = handle.get_workflow_id()
Alternative method to access the workflow ID (same as handle.workflow_id).
The unique ID of the workflow
get_result
result = handle.get_result(polling_interval_sec=1.0)
Wait for the workflow to complete and return its result. Blocks until completion.
How often to poll for completion (in seconds)
The workflow’s return value
Example:
@DBOS.workflow()
def calculate_total(items: list) -> float:
total = sum(item['price'] for item in items)
return total
handle = DBOS.start_workflow(calculate_total, [{"price": 10.0}, {"price": 20.0}])
# Wait for completion (polls every 1 second by default)
result = handle.get_result()
print(f"Total: ${result}") # Total: $30.0
# Custom polling interval (checks every 5 seconds)
result = handle.get_result(polling_interval_sec=5.0)
Error Handling:
from dbos import DBOS
from dbos._error import DBOSAwaitedWorkflowCancelledError
@DBOS.workflow()
def might_fail() -> str:
# Workflow logic
return "success"
handle = DBOS.start_workflow(might_fail)
try:
result = handle.get_result()
print(f"Success: {result}")
except DBOSAwaitedWorkflowCancelledError:
print("Workflow was cancelled")
except Exception as e:
print(f"Workflow failed: {e}")
get_status
status = handle.get_status()
Get the current status of the workflow without waiting for completion.
Object containing workflow status information
WorkflowStatus Fields:
workflow_id: str - The workflow ID
status: str - Current status (see WorkflowStatusString below)
name: str - Workflow function name
input: Optional[Any] - Workflow input arguments
output: Optional[Any] - Workflow return value (if completed)
error: Optional[str] - Error message (if failed)
created_at: datetime - When the workflow was created
updated_at: datetime - When the workflow was last updated
app_version: Optional[str] - Application version
executor_id: Optional[str] - Executor that ran the workflow
queue_name: Optional[str] - Queue name (if enqueued)
dequeued_at: Optional[datetime] - When dequeued (if applicable)
WorkflowStatusString
Enumeration of possible workflow status values.
from enum import Enum
class WorkflowStatusString(Enum):
PENDING = "PENDING"
SUCCESS = "SUCCESS"
ERROR = "ERROR"
MAX_RECOVERY_ATTEMPTS_EXCEEDED = "MAX_RECOVERY_ATTEMPTS_EXCEEDED"
CANCELLED = "CANCELLED"
ENQUEUED = "ENQUEUED"
Status Values:
Workflow is currently running
Workflow completed successfully
Workflow failed with an error
MAX_RECOVERY_ATTEMPTS_EXCEEDED
Workflow exhausted its maximum recovery attempts
Workflow is enqueued and waiting to be executed
Example:
@DBOS.workflow()
def long_running_task() -> str:
DBOS.sleep(60)
return "done"
handle = DBOS.start_workflow(long_running_task)
# Check status without blocking
status = handle.get_status()
print(f"Status: {status.status}") # "PENDING"
print(f"Created: {status.created_at}")
print(f"Workflow: {status.name}")
# Check if completed
if status.status == "SUCCESS":
print(f"Result: {status.output}")
elif status.status == "ERROR":
print(f"Error: {status.error}")
else:
print(f"Still running...")
Polling for Completion:
import time
from dbos import DBOS
@DBOS.workflow()
def background_job() -> str:
DBOS.sleep(30)
return "completed"
handle = DBOS.start_workflow(background_job)
# Poll manually with custom logic
while True:
status = handle.get_status()
if status.status == "SUCCESS":
print(f"Job completed: {status.output}")
break
elif status.status == "ERROR":
print(f"Job failed: {status.error}")
break
else:
print(f"Job status: {status.status}")
time.sleep(5) # Wait 5 seconds before checking again
WorkflowHandleAsync Methods
The async handle has the same methods but they’re async functions:
get_result (async)
result = await handle.get_result(polling_interval_sec=1.0)
Example:
import asyncio
from dbos import DBOS
@DBOS.workflow()
async def async_workflow(value: int) -> int:
await DBOS.sleep_async(5)
return value * 2
async def main():
handle = await DBOS.start_workflow_async(async_workflow, 21)
result = await handle.get_result()
print(f"Result: {result}") # Result: 42
asyncio.run(main())
get_status (async)
status = await handle.get_status()
Example:
import asyncio
from dbos import DBOS
@DBOS.workflow()
async def async_task() -> str:
await DBOS.sleep_async(10)
return "done"
async def monitor_workflow():
handle = await DBOS.start_workflow_async(async_task)
while True:
status = await handle.get_status()
if status.status == "SUCCESS":
print(f"Completed: {status.output}")
break
elif status.status == "ERROR":
print(f"Failed: {status.error}")
break
else:
print(f"Status: {status.status}")
await asyncio.sleep(2)
asyncio.run(monitor_workflow())
Pattern: Fire and Forget
from dbos import DBOS, Queue
queue = Queue("background_tasks")
@DBOS.workflow()
def send_email(to: str, subject: str, body: str) -> None:
# Send email logic
pass
# Enqueue and don't wait for result
queue.enqueue(send_email, "[email protected]", "Welcome", "Thanks for signing up!")
# Continue immediately without blocking
Pattern: Batch Processing
from dbos import DBOS, Queue
from typing import List
queue = Queue("batch_jobs", concurrency=5)
@DBOS.workflow()
def process_item(item_id: str) -> str:
# Process item logic
return f"Processed {item_id}"
# Start many workflows
handles: List[WorkflowHandle] = []
for i in range(100):
handle = queue.enqueue(process_item, f"item-{i}")
handles.append(handle)
# Wait for all to complete
results = [h.get_result() for h in handles]
print(f"Processed {len(results)} items")
Pattern: Conditional Waiting
from dbos import DBOS
import time
@DBOS.workflow()
def important_task() -> str:
DBOS.sleep(30)
return "critical result"
@DBOS.workflow()
def optional_task() -> str:
DBOS.sleep(30)
return "optional result"
important = DBOS.start_workflow(important_task)
optional = DBOS.start_workflow(optional_task)
# Wait for important task
important_result = important.get_result()
print(f"Important: {important_result}")
# Check optional task without blocking
optional_status = optional.get_status()
if optional_status.status == "SUCCESS":
print(f"Optional: {optional_status.output}")
else:
print("Optional task still running, continuing without it")
Pattern: Wait for First Completion
from dbos import DBOS
from typing import List
@DBOS.workflow()
def query_api(api_name: str) -> dict:
# Query external API
return {"api": api_name, "data": "..."}
# Start multiple redundant queries
handles = [
DBOS.start_workflow(query_api, "api1"),
DBOS.start_workflow(query_api, "api2"),
DBOS.start_workflow(query_api, "api3"),
]
# Use the first one that completes
completed = DBOS.wait_first(handles)
result = completed.get_result()
print(f"First result from: {result['api']}")