Introduction
DBOS provides a comprehensive API for managing workflows programmatically. TheDBOSClient class allows you to list, monitor, restart, cancel, and fork workflows from external systems or administrative tools.
DBOSClient Setup
Connect to your DBOS system database:from dbos import DBOSClient
# Connect using database URL
client = DBOSClient(
system_database_url="postgresql://user:password@localhost:5432/dbos"
)
# Or use separate system and application databases
client = DBOSClient(
system_database_url="postgresql://user:password@localhost:5432/dbos_system",
application_database_url="postgresql://user:password@localhost:5432/app_db"
)
# Don't forget to cleanup when done
try:
# Use client...
pass
finally:
client.destroy()
DBOSClient only connects to the system database for reading workflow metadata. It doesn’t require the full application codebase.Retrieving Workflows
Retrieve workflow handles by ID:# Retrieve existing workflow
handle = client.retrieve_workflow("workflow-123")
# Get workflow status
status = handle.get_status()
print(f"Status: {status.status}") # PENDING, SUCCESS, ERROR, etc.
print(f"Created: {status.created_at}")
print(f"Updated: {status.updated_at}")
# Wait for completion and get result
if status.status in ["PENDING", "ENQUEUED"]:
result = handle.get_result()
print(f"Result: {result}")
Async Retrieval
import asyncio
async def retrieve_and_wait():
handle = await client.retrieve_workflow_async("workflow-123")
result = await handle.get_result()
return result
result = asyncio.run(retrieve_and_wait())
Listing Workflows
Query workflows with powerful filtering:# List all workflows
all_workflows = client.list_workflows()
for wf in all_workflows:
print(f"ID: {wf.workflow_id}")
print(f"Name: {wf.name}")
print(f"Status: {wf.status}")
print(f"Created: {wf.created_at}")
print()
# Filter by status
pending = client.list_workflows(status="PENDING")
success = client.list_workflows(status="SUCCESS")
errors = client.list_workflows(status="ERROR")
# Multiple statuses
active = client.list_workflows(status=["PENDING", "ENQUEUED"])
Filter by Workflow Name
# Single workflow name
orders = client.list_workflows(name="process_order")
# Multiple workflow names
specific = client.list_workflows(
name=["process_order", "send_notification"]
)
Filter by Time Range
from datetime import datetime, timedelta
# Last 24 hours
start_time = (datetime.now() - timedelta(days=1)).isoformat()
recent = client.list_workflows(start_time=start_time)
# Specific time range
start = "2024-01-01T00:00:00"
end = "2024-01-31T23:59:59"
january = client.list_workflows(
start_time=start,
end_time=end
)
Filter by Workflow ID Prefix
# All workflows starting with "order-"
orders = client.list_workflows(workflow_id_prefix="order-")
# Multiple prefixes
specific = client.list_workflows(
workflow_id_prefix=["order-", "payment-"]
)
Filter by User
# Workflows for specific user
user_workflows = client.list_workflows(user="[email protected]")
# Multiple users
team_workflows = client.list_workflows(
user=["[email protected]", "[email protected]"]
)
Filter by Queue
# Workflows in specific queue
queued = client.list_workflows(queue_name="high-priority")
# Only queued workflows (ENQUEUED status)
all_queued = client.list_queued_workflows()
# Queued workflows in specific queue
specific_queue = client.list_queued_workflows(queue_name="email-queue")
Advanced Filtering
# Combine multiple filters
filtered = client.list_workflows(
status=["ERROR", "CANCELLED"],
name="process_payment",
start_time=(datetime.now() - timedelta(hours=1)).isoformat(),
user="[email protected]",
limit=100
)
# Pagination
page1 = client.list_workflows(limit=50, offset=0)
page2 = client.list_workflows(limit=50, offset=50)
# Sort descending (newest first)
newest = client.list_workflows(sort_desc=True, limit=10)
Controlling Data Loading
# Load without input/output for faster queries
lightweight = client.list_workflows(
load_input=False,
load_output=False,
limit=1000
)
# Only load outputs
results_only = client.list_workflows(
load_input=False,
load_output=True
)
Workflow Status Details
Inspect detailed workflow information:status = client.retrieve_workflow("workflow-123").get_status()
# Basic info
print(f"Workflow ID: {status.workflow_id}")
print(f"Name: {status.name}")
print(f"Status: {status.status}")
print(f"Class: {status.class_name}") # For class-based workflows
# Timing
print(f"Created: {status.created_at}")
print(f"Updated: {status.updated_at}")
print(f"Dequeued: {status.dequeued_at}") # For queued workflows
# Relationships
print(f"Parent: {status.parent_workflow_id}")
print(f"Forked From: {status.forked_from}")
# Queue info
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Partition Key: {status.queue_partition_key}")
# Authentication
print(f"User: {status.authenticated_user}")
print(f"Roles: {status.authenticated_roles}")
# Data
print(f"Input: {status.input}") # Workflow arguments
print(f"Output: {status.output}") # Workflow result
print(f"Error: {status.error}") # Error message if failed
# Recovery
print(f"Recovery Attempts: {status.recovery_attempts}")
print(f"App Version: {status.app_version}")
print(f"Executor: {status.executor_id}")
Listing Workflow Steps
Inspect individual steps within a workflow:# Get all steps for a workflow
steps = client.list_workflow_steps("workflow-123")
for step in steps:
print(f"Function ID: {step.function_id}")
print(f"Name: {step.name}")
print(f"Started: {step.started_at}")
print(f"Completed: {step.completed_at}")
print(f"Duration: {(step.completed_at - step.started_at).total_seconds()}s")
print(f"Output: {step.output}")
print(f"Error: {step.error}")
print()
Use step inspection to debug workflow execution and identify slow operations.
Waiting for Workflows
Wait for Single Workflow
handle = client.retrieve_workflow("workflow-123")
# Wait with custom polling interval (default 1 second)
result = handle.get_result(polling_interval_sec=0.5)
Wait for First Completion
# Wait for any one of multiple workflows to complete
handles = [
client.retrieve_workflow("wf-1"),
client.retrieve_workflow("wf-2"),
client.retrieve_workflow("wf-3")
]
# Returns the first handle that completes
completed_handle = client.wait_first(handles)
result = completed_handle.get_result()
print(f"First to complete: {completed_handle.get_workflow_id()}")
# Async version
async def wait_for_first():
handles = [
await client.retrieve_workflow_async("wf-1"),
await client.retrieve_workflow_async("wf-2"),
await client.retrieve_workflow_async("wf-3")
]
completed = await client.wait_first_async(handles)
return await completed.get_result()
Enqueuing Workflows
Enqueue workflows programmatically:from dbos import EnqueueOptions
# Enqueue a workflow
handle = client.enqueue(
EnqueueOptions(
workflow_name="process_order",
queue_name="orders",
workflow_id="order-12345", # Optional
priority=100, # Optional
max_recovery_attempts=5 # Optional
),
"order-12345", # Workflow args
"customer-789",
{"items": [{"id": 1, "qty": 2}]}
)
# Wait for result
result = handle.get_result()
Advanced Enqueue Options
from dbos import EnqueueOptions, WorkflowSerializationFormat
handle = client.enqueue(
EnqueueOptions(
workflow_name="process_data",
queue_name="data-processing",
workflow_id="data-proc-456",
app_version="v1.2.3",
workflow_timeout=3600.0, # 1 hour timeout
deduplication_id="dedup-123",
priority=50,
max_recovery_attempts=10,
queue_partition_key="partition-a",
authenticated_user="[email protected]",
authenticated_roles=["admin", "operator"],
serialization_type=WorkflowSerializationFormat.JSON,
class_name="DataProcessor", # For class-based workflows
instance_name="prod-instance" # For configured instances
),
{"data": "value"}
)
Cancelling Workflows
Cancel running workflows:# Cancel a workflow
client.cancel_workflow("workflow-123")
# Verify cancellation
status = client.retrieve_workflow("workflow-123").get_status()
print(status.status) # "CANCELLED"
# Async version
await client.cancel_workflow_async("workflow-123")
Cancelled workflows will raise
DBOSAwaitedWorkflowCancelledError when awaited.Resuming Workflows
Resume cancelled workflows:# Resume a cancelled workflow
handle = client.resume_workflow("workflow-123")
# Wait for completion
result = handle.get_result()
# Async version
handle = await client.resume_workflow_async("workflow-123")
result = await handle.get_result()
Forking Workflows
Create a copy of a workflow starting from a specific step:# Fork from step 5 (useful for debugging or replay)
forked_handle = client.fork_workflow(
workflow_id="original-wf-123",
start_step=5
)
# Fork to a specific application version
forked_handle = client.fork_workflow(
workflow_id="original-wf-123",
start_step=3,
application_version="v1.2.4"
)
# Get the new forked workflow ID
forked_id = forked_handle.get_workflow_id()
# Wait for forked workflow to complete
result = forked_handle.get_result()
# Check relationship
status = forked_handle.get_status()
print(f"Forked from: {status.forked_from}") # "original-wf-123"
Forking is useful for debugging failed workflows or replaying from a specific point with updated code.
Sending Messages to Workflows
Send notifications to running workflows:# Send a message
client.send(
destination_id="workflow-123",
message={"command": "process", "data": [1, 2, 3]}
)
# Send with topic
client.send(
destination_id="workflow-123",
message="approved",
topic="approval"
)
# Send with idempotency key (prevents duplicates)
client.send(
destination_id="workflow-123",
message="important data",
topic="data",
idempotency_key="msg-unique-id-123"
)
# Async version
await client.send_async(
destination_id="workflow-123",
message={"status": "ready"},
topic="status"
)
Reading Workflow Events
Retrieve events set by workflows:# Get an event (blocks until available or timeout)
event_value = client.get_event(
workflow_id="workflow-123",
key="deployment_started",
timeout_seconds=60
)
print(f"Event value: {event_value}")
# Async version
event_value = await client.get_event_async(
workflow_id="workflow-123",
key="processing_complete",
timeout_seconds=30
)
Streaming Workflow Output
Read stream output from workflows:# Read stream as generator
for value in client.read_stream("workflow-123", "progress"):
print(f"Progress update: {value}")
# Async version
async for value in client.read_stream_async("workflow-123", "results"):
print(f"Result: {value}")
Streams allow workflows to output intermediate results that can be consumed in real-time.
Managing Schedules
Create Schedules
client.create_schedule(
schedule_name="daily-backup",
workflow_name="backup_database",
schedule="0 0 2 * * *", # 2 AM daily
context={"retention_days": 30}
)
List Schedules
# List all schedules
schedules = client.list_schedules()
# Filter by status
active = client.list_schedules(status="ACTIVE")
paused = client.list_schedules(status="PAUSED")
# Filter by workflow name
backups = client.list_schedules(workflow_name="backup_database")
# Filter by name prefix
reports = client.list_schedules(schedule_name_prefix="report-")
Manage Schedules
# Get specific schedule
schedule = client.get_schedule("daily-backup")
if schedule:
print(f"Cron: {schedule['schedule']}")
print(f"Context: {schedule['context']}")
# Pause schedule
client.pause_schedule("daily-backup")
# Resume schedule
client.resume_schedule("daily-backup")
# Delete schedule
client.delete_schedule("daily-backup")
Trigger Schedules
# Manually trigger a scheduled workflow
handle = client.trigger_schedule("daily-backup")
result = handle.get_result()
# Backfill schedule
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=7)
end = datetime.now()
handles = client.backfill_schedule(
schedule_name="daily-backup",
start=start,
end=end
)
print(f"Enqueued {len(handles)} backfill executions")
Application Version Management
# List all application versions
versions = client.list_application_versions()
for version in versions:
print(f"Version: {version['version_name']}")
print(f"Timestamp: {version['timestamp']}")
print()
# Get latest version
latest = client.get_latest_application_version()
print(f"Current version: {latest['version_name']}")
# Promote a version to latest
client.set_latest_application_version("v1.2.3")
Complete Example: Workflow Dashboard
from dbos import DBOSClient
from datetime import datetime, timedelta
import time
class WorkflowDashboard:
def __init__(self, system_database_url: str):
self.client = DBOSClient(system_database_url=system_database_url)
def get_stats(self) -> dict:
"""Get overall workflow statistics."""
# Get workflows from last hour
start_time = (datetime.now() - timedelta(hours=1)).isoformat()
recent = self.client.list_workflows(start_time=start_time)
# Count by status
stats = {
"total": len(recent),
"success": 0,
"error": 0,
"pending": 0,
"cancelled": 0,
"enqueued": 0
}
for wf in recent:
status = wf.status.lower()
if status in stats:
stats[status] += 1
return stats
def get_slow_workflows(self, threshold_seconds: int = 60) -> list:
"""Find workflows exceeding time threshold."""
slow = []
recent = self.client.list_workflows(
start_time=(datetime.now() - timedelta(hours=24)).isoformat(),
status="SUCCESS"
)
for wf in recent:
if wf.created_at and wf.updated_at:
duration = (wf.updated_at - wf.created_at).total_seconds()
if duration > threshold_seconds:
slow.append({
"workflow_id": wf.workflow_id,
"name": wf.name,
"duration": duration
})
return sorted(slow, key=lambda x: x["duration"], reverse=True)
def get_failed_workflows(self, limit: int = 10) -> list:
"""Get recent failed workflows."""
failed = self.client.list_workflows(
status="ERROR",
sort_desc=True,
limit=limit
)
return [{
"workflow_id": wf.workflow_id,
"name": wf.name,
"error": wf.error,
"created_at": wf.created_at
} for wf in failed]
def retry_failed_workflow(self, workflow_id: str) -> str:
"""Retry a failed workflow by forking from step 1."""
handle = self.client.fork_workflow(
workflow_id=workflow_id,
start_step=1
)
return handle.get_workflow_id()
def get_queue_stats(self) -> dict:
"""Get statistics for queued workflows."""
queued = self.client.list_queued_workflows()
stats = {}
for wf in queued:
queue = wf.queue_name or "default"
if queue not in stats:
stats[queue] = {"total": 0, "by_status": {}}
stats[queue]["total"] += 1
status = wf.status
stats[queue]["by_status"][status] = stats[queue]["by_status"].get(status, 0) + 1
return stats
def monitor_workflow(self, workflow_id: str, poll_interval: float = 1.0):
"""Monitor a workflow until completion."""
handle = self.client.retrieve_workflow(workflow_id)
print(f"Monitoring workflow {workflow_id}...")
while True:
status = handle.get_status()
print(f"Status: {status.status}")
if status.status in ["SUCCESS", "ERROR", "CANCELLED"]:
if status.status == "SUCCESS":
print(f"Result: {status.output}")
elif status.status == "ERROR":
print(f"Error: {status.error}")
break
time.sleep(poll_interval)
def cleanup(self):
"""Cleanup client resources."""
self.client.destroy()
# Usage
dashboard = WorkflowDashboard(
system_database_url="postgresql://user:pass@localhost/dbos"
)
try:
# Get stats
stats = dashboard.get_stats()
print(f"Workflow Stats (last hour): {stats}")
# Find slow workflows
slow = dashboard.get_slow_workflows(threshold_seconds=120)
print(f"\nSlow workflows (>2 minutes): {len(slow)}")
for wf in slow[:5]:
print(f" {wf['name']}: {wf['duration']:.1f}s")
# Get failed workflows
failed = dashboard.get_failed_workflows(limit=5)
print(f"\nRecent failures: {len(failed)}")
for wf in failed:
print(f" {wf['workflow_id']}: {wf['error']}")
# Queue stats
queue_stats = dashboard.get_queue_stats()
print(f"\nQueue Statistics:")
for queue, stats in queue_stats.items():
print(f" {queue}: {stats['total']} workflows")
# Monitor specific workflow
# dashboard.monitor_workflow("workflow-123")
finally:
dashboard.cleanup()
Best Practices
Connection Management
Connection Management
- Always call
client.destroy()when done - Use try/finally blocks to ensure cleanup
- Reuse client instances when possible
- Consider connection pooling for high-traffic applications
Querying Efficiently
Querying Efficiently
- Use filters to reduce result set size
- Set appropriate limits for large queries
- Disable
load_input/load_outputwhen not needed - Use pagination for very large result sets
Error Handling
Error Handling
- Handle
DBOSNonExistentWorkflowErrorwhen retrieving workflows - Check workflow status before waiting for results
- Implement timeouts for long-running operations
- Log all API errors for debugging
Monitoring
Monitoring
- Regularly check for ERROR status workflows
- Monitor queue depths
- Track workflow duration trends
- Set up alerts for critical failures
Next Steps
Queue Tutorial
Learn about workflow queuing and management
Scheduled Workflows
Manage scheduled workflows programmatically
Error Handling
Implement robust error handling strategies
Configuration
Configure DBOS client connections