Skip to main content

Introduction

DBOS provides a comprehensive API for managing workflows programmatically. The DBOSClient class allows you to list, monitor, restart, cancel, and fork workflows from external systems or administrative tools.

DBOSClient Setup

Connect to your DBOS system database:
from dbos import DBOSClient

# Connect using database URL
client = DBOSClient(
    system_database_url="postgresql://user:password@localhost:5432/dbos"
)

# Or use separate system and application databases
client = DBOSClient(
    system_database_url="postgresql://user:password@localhost:5432/dbos_system",
    application_database_url="postgresql://user:password@localhost:5432/app_db"
)

# Don't forget to cleanup when done
try:
    # Use client...
    pass
finally:
    client.destroy()
DBOSClient only connects to the system database for reading workflow metadata. It doesn’t require the full application codebase.

Retrieving Workflows

Retrieve workflow handles by ID:
# Retrieve existing workflow
handle = client.retrieve_workflow("workflow-123")

# Get workflow status
status = handle.get_status()
print(f"Status: {status.status}")  # PENDING, SUCCESS, ERROR, etc.
print(f"Created: {status.created_at}")
print(f"Updated: {status.updated_at}")

# Wait for completion and get result
if status.status in ["PENDING", "ENQUEUED"]:
    result = handle.get_result()
    print(f"Result: {result}")

Async Retrieval

import asyncio

async def retrieve_and_wait():
    handle = await client.retrieve_workflow_async("workflow-123")
    result = await handle.get_result()
    return result

result = asyncio.run(retrieve_and_wait())

Listing Workflows

Query workflows with powerful filtering:
# List all workflows
all_workflows = client.list_workflows()

for wf in all_workflows:
    print(f"ID: {wf.workflow_id}")
    print(f"Name: {wf.name}")
    print(f"Status: {wf.status}")
    print(f"Created: {wf.created_at}")
    print()

# Filter by status
pending = client.list_workflows(status="PENDING")
success = client.list_workflows(status="SUCCESS")
errors = client.list_workflows(status="ERROR")

# Multiple statuses
active = client.list_workflows(status=["PENDING", "ENQUEUED"])

Filter by Workflow Name

# Single workflow name
orders = client.list_workflows(name="process_order")

# Multiple workflow names
specific = client.list_workflows(
    name=["process_order", "send_notification"]
)

Filter by Time Range

from datetime import datetime, timedelta

# Last 24 hours
start_time = (datetime.now() - timedelta(days=1)).isoformat()
recent = client.list_workflows(start_time=start_time)

# Specific time range
start = "2024-01-01T00:00:00"
end = "2024-01-31T23:59:59"
january = client.list_workflows(
    start_time=start,
    end_time=end
)

Filter by Workflow ID Prefix

# All workflows starting with "order-"
orders = client.list_workflows(workflow_id_prefix="order-")

# Multiple prefixes
specific = client.list_workflows(
    workflow_id_prefix=["order-", "payment-"]
)

Filter by User

# Workflows for specific user
user_workflows = client.list_workflows(user="[email protected]")

# Multiple users
team_workflows = client.list_workflows(
    user=["[email protected]", "[email protected]"]
)

Filter by Queue

# Workflows in specific queue
queued = client.list_workflows(queue_name="high-priority")

# Only queued workflows (ENQUEUED status)
all_queued = client.list_queued_workflows()

# Queued workflows in specific queue
specific_queue = client.list_queued_workflows(queue_name="email-queue")

Advanced Filtering

# Combine multiple filters
filtered = client.list_workflows(
    status=["ERROR", "CANCELLED"],
    name="process_payment",
    start_time=(datetime.now() - timedelta(hours=1)).isoformat(),
    user="[email protected]",
    limit=100
)

# Pagination
page1 = client.list_workflows(limit=50, offset=0)
page2 = client.list_workflows(limit=50, offset=50)

# Sort descending (newest first)
newest = client.list_workflows(sort_desc=True, limit=10)

Controlling Data Loading

# Load without input/output for faster queries
lightweight = client.list_workflows(
    load_input=False,
    load_output=False,
    limit=1000
)

# Only load outputs
results_only = client.list_workflows(
    load_input=False,
    load_output=True
)

Workflow Status Details

Inspect detailed workflow information:
status = client.retrieve_workflow("workflow-123").get_status()

# Basic info
print(f"Workflow ID: {status.workflow_id}")
print(f"Name: {status.name}")
print(f"Status: {status.status}")
print(f"Class: {status.class_name}")  # For class-based workflows

# Timing
print(f"Created: {status.created_at}")
print(f"Updated: {status.updated_at}")
print(f"Dequeued: {status.dequeued_at}")  # For queued workflows

# Relationships
print(f"Parent: {status.parent_workflow_id}")
print(f"Forked From: {status.forked_from}")

# Queue info
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Partition Key: {status.queue_partition_key}")

# Authentication
print(f"User: {status.authenticated_user}")
print(f"Roles: {status.authenticated_roles}")

# Data
print(f"Input: {status.input}")  # Workflow arguments
print(f"Output: {status.output}")  # Workflow result
print(f"Error: {status.error}")  # Error message if failed

# Recovery
print(f"Recovery Attempts: {status.recovery_attempts}")
print(f"App Version: {status.app_version}")
print(f"Executor: {status.executor_id}")

Listing Workflow Steps

Inspect individual steps within a workflow:
# Get all steps for a workflow
steps = client.list_workflow_steps("workflow-123")

for step in steps:
    print(f"Function ID: {step.function_id}")
    print(f"Name: {step.name}")
    print(f"Started: {step.started_at}")
    print(f"Completed: {step.completed_at}")
    print(f"Duration: {(step.completed_at - step.started_at).total_seconds()}s")
    print(f"Output: {step.output}")
    print(f"Error: {step.error}")
    print()
Use step inspection to debug workflow execution and identify slow operations.

Waiting for Workflows

Wait for Single Workflow

handle = client.retrieve_workflow("workflow-123")

# Wait with custom polling interval (default 1 second)
result = handle.get_result(polling_interval_sec=0.5)

Wait for First Completion

# Wait for any one of multiple workflows to complete
handles = [
    client.retrieve_workflow("wf-1"),
    client.retrieve_workflow("wf-2"),
    client.retrieve_workflow("wf-3")
]

# Returns the first handle that completes
completed_handle = client.wait_first(handles)
result = completed_handle.get_result()
print(f"First to complete: {completed_handle.get_workflow_id()}")

# Async version
async def wait_for_first():
    handles = [
        await client.retrieve_workflow_async("wf-1"),
        await client.retrieve_workflow_async("wf-2"),
        await client.retrieve_workflow_async("wf-3")
    ]
    
    completed = await client.wait_first_async(handles)
    return await completed.get_result()

Enqueuing Workflows

Enqueue workflows programmatically:
from dbos import EnqueueOptions

# Enqueue a workflow
handle = client.enqueue(
    EnqueueOptions(
        workflow_name="process_order",
        queue_name="orders",
        workflow_id="order-12345",  # Optional
        priority=100,  # Optional
        max_recovery_attempts=5  # Optional
    ),
    "order-12345",  # Workflow args
    "customer-789",
    {"items": [{"id": 1, "qty": 2}]}
)

# Wait for result
result = handle.get_result()

Advanced Enqueue Options

from dbos import EnqueueOptions, WorkflowSerializationFormat

handle = client.enqueue(
    EnqueueOptions(
        workflow_name="process_data",
        queue_name="data-processing",
        workflow_id="data-proc-456",
        app_version="v1.2.3",
        workflow_timeout=3600.0,  # 1 hour timeout
        deduplication_id="dedup-123",
        priority=50,
        max_recovery_attempts=10,
        queue_partition_key="partition-a",
        authenticated_user="[email protected]",
        authenticated_roles=["admin", "operator"],
        serialization_type=WorkflowSerializationFormat.JSON,
        class_name="DataProcessor",  # For class-based workflows
        instance_name="prod-instance"  # For configured instances
    ),
    {"data": "value"}
)

Cancelling Workflows

Cancel running workflows:
# Cancel a workflow
client.cancel_workflow("workflow-123")

# Verify cancellation
status = client.retrieve_workflow("workflow-123").get_status()
print(status.status)  # "CANCELLED"

# Async version
await client.cancel_workflow_async("workflow-123")
Cancelled workflows will raise DBOSAwaitedWorkflowCancelledError when awaited.

Resuming Workflows

Resume cancelled workflows:
# Resume a cancelled workflow
handle = client.resume_workflow("workflow-123")

# Wait for completion
result = handle.get_result()

# Async version
handle = await client.resume_workflow_async("workflow-123")
result = await handle.get_result()

Forking Workflows

Create a copy of a workflow starting from a specific step:
# Fork from step 5 (useful for debugging or replay)
forked_handle = client.fork_workflow(
    workflow_id="original-wf-123",
    start_step=5
)

# Fork to a specific application version
forked_handle = client.fork_workflow(
    workflow_id="original-wf-123",
    start_step=3,
    application_version="v1.2.4"
)

# Get the new forked workflow ID
forked_id = forked_handle.get_workflow_id()

# Wait for forked workflow to complete
result = forked_handle.get_result()

# Check relationship
status = forked_handle.get_status()
print(f"Forked from: {status.forked_from}")  # "original-wf-123"
Forking is useful for debugging failed workflows or replaying from a specific point with updated code.

Sending Messages to Workflows

Send notifications to running workflows:
# Send a message
client.send(
    destination_id="workflow-123",
    message={"command": "process", "data": [1, 2, 3]}
)

# Send with topic
client.send(
    destination_id="workflow-123",
    message="approved",
    topic="approval"
)

# Send with idempotency key (prevents duplicates)
client.send(
    destination_id="workflow-123",
    message="important data",
    topic="data",
    idempotency_key="msg-unique-id-123"
)

# Async version
await client.send_async(
    destination_id="workflow-123",
    message={"status": "ready"},
    topic="status"
)

Reading Workflow Events

Retrieve events set by workflows:
# Get an event (blocks until available or timeout)
event_value = client.get_event(
    workflow_id="workflow-123",
    key="deployment_started",
    timeout_seconds=60
)

print(f"Event value: {event_value}")

# Async version
event_value = await client.get_event_async(
    workflow_id="workflow-123",
    key="processing_complete",
    timeout_seconds=30
)

Streaming Workflow Output

Read stream output from workflows:
# Read stream as generator
for value in client.read_stream("workflow-123", "progress"):
    print(f"Progress update: {value}")

# Async version
async for value in client.read_stream_async("workflow-123", "results"):
    print(f"Result: {value}")
Streams allow workflows to output intermediate results that can be consumed in real-time.

Managing Schedules

Create Schedules

client.create_schedule(
    schedule_name="daily-backup",
    workflow_name="backup_database",
    schedule="0 0 2 * * *",  # 2 AM daily
    context={"retention_days": 30}
)

List Schedules

# List all schedules
schedules = client.list_schedules()

# Filter by status
active = client.list_schedules(status="ACTIVE")
paused = client.list_schedules(status="PAUSED")

# Filter by workflow name
backups = client.list_schedules(workflow_name="backup_database")

# Filter by name prefix
reports = client.list_schedules(schedule_name_prefix="report-")

Manage Schedules

# Get specific schedule
schedule = client.get_schedule("daily-backup")
if schedule:
    print(f"Cron: {schedule['schedule']}")
    print(f"Context: {schedule['context']}")

# Pause schedule
client.pause_schedule("daily-backup")

# Resume schedule
client.resume_schedule("daily-backup")

# Delete schedule
client.delete_schedule("daily-backup")

Trigger Schedules

# Manually trigger a scheduled workflow
handle = client.trigger_schedule("daily-backup")
result = handle.get_result()

# Backfill schedule
from datetime import datetime, timedelta

start = datetime.now() - timedelta(days=7)
end = datetime.now()

handles = client.backfill_schedule(
    schedule_name="daily-backup",
    start=start,
    end=end
)

print(f"Enqueued {len(handles)} backfill executions")

Application Version Management

# List all application versions
versions = client.list_application_versions()

for version in versions:
    print(f"Version: {version['version_name']}")
    print(f"Timestamp: {version['timestamp']}")
    print()

# Get latest version
latest = client.get_latest_application_version()
print(f"Current version: {latest['version_name']}")

# Promote a version to latest
client.set_latest_application_version("v1.2.3")

Complete Example: Workflow Dashboard

from dbos import DBOSClient
from datetime import datetime, timedelta
import time

class WorkflowDashboard:
    def __init__(self, system_database_url: str):
        self.client = DBOSClient(system_database_url=system_database_url)
    
    def get_stats(self) -> dict:
        """Get overall workflow statistics."""
        
        # Get workflows from last hour
        start_time = (datetime.now() - timedelta(hours=1)).isoformat()
        recent = self.client.list_workflows(start_time=start_time)
        
        # Count by status
        stats = {
            "total": len(recent),
            "success": 0,
            "error": 0,
            "pending": 0,
            "cancelled": 0,
            "enqueued": 0
        }
        
        for wf in recent:
            status = wf.status.lower()
            if status in stats:
                stats[status] += 1
        
        return stats
    
    def get_slow_workflows(self, threshold_seconds: int = 60) -> list:
        """Find workflows exceeding time threshold."""
        
        slow = []
        recent = self.client.list_workflows(
            start_time=(datetime.now() - timedelta(hours=24)).isoformat(),
            status="SUCCESS"
        )
        
        for wf in recent:
            if wf.created_at and wf.updated_at:
                duration = (wf.updated_at - wf.created_at).total_seconds()
                if duration > threshold_seconds:
                    slow.append({
                        "workflow_id": wf.workflow_id,
                        "name": wf.name,
                        "duration": duration
                    })
        
        return sorted(slow, key=lambda x: x["duration"], reverse=True)
    
    def get_failed_workflows(self, limit: int = 10) -> list:
        """Get recent failed workflows."""
        
        failed = self.client.list_workflows(
            status="ERROR",
            sort_desc=True,
            limit=limit
        )
        
        return [{
            "workflow_id": wf.workflow_id,
            "name": wf.name,
            "error": wf.error,
            "created_at": wf.created_at
        } for wf in failed]
    
    def retry_failed_workflow(self, workflow_id: str) -> str:
        """Retry a failed workflow by forking from step 1."""
        
        handle = self.client.fork_workflow(
            workflow_id=workflow_id,
            start_step=1
        )
        
        return handle.get_workflow_id()
    
    def get_queue_stats(self) -> dict:
        """Get statistics for queued workflows."""
        
        queued = self.client.list_queued_workflows()
        
        stats = {}
        for wf in queued:
            queue = wf.queue_name or "default"
            if queue not in stats:
                stats[queue] = {"total": 0, "by_status": {}}
            
            stats[queue]["total"] += 1
            status = wf.status
            stats[queue]["by_status"][status] = stats[queue]["by_status"].get(status, 0) + 1
        
        return stats
    
    def monitor_workflow(self, workflow_id: str, poll_interval: float = 1.0):
        """Monitor a workflow until completion."""
        
        handle = self.client.retrieve_workflow(workflow_id)
        
        print(f"Monitoring workflow {workflow_id}...")
        
        while True:
            status = handle.get_status()
            
            print(f"Status: {status.status}")
            
            if status.status in ["SUCCESS", "ERROR", "CANCELLED"]:
                if status.status == "SUCCESS":
                    print(f"Result: {status.output}")
                elif status.status == "ERROR":
                    print(f"Error: {status.error}")
                break
            
            time.sleep(poll_interval)
    
    def cleanup(self):
        """Cleanup client resources."""
        self.client.destroy()

# Usage
dashboard = WorkflowDashboard(
    system_database_url="postgresql://user:pass@localhost/dbos"
)

try:
    # Get stats
    stats = dashboard.get_stats()
    print(f"Workflow Stats (last hour): {stats}")
    
    # Find slow workflows
    slow = dashboard.get_slow_workflows(threshold_seconds=120)
    print(f"\nSlow workflows (>2 minutes): {len(slow)}")
    for wf in slow[:5]:
        print(f"  {wf['name']}: {wf['duration']:.1f}s")
    
    # Get failed workflows
    failed = dashboard.get_failed_workflows(limit=5)
    print(f"\nRecent failures: {len(failed)}")
    for wf in failed:
        print(f"  {wf['workflow_id']}: {wf['error']}")
    
    # Queue stats
    queue_stats = dashboard.get_queue_stats()
    print(f"\nQueue Statistics:")
    for queue, stats in queue_stats.items():
        print(f"  {queue}: {stats['total']} workflows")
    
    # Monitor specific workflow
    # dashboard.monitor_workflow("workflow-123")

finally:
    dashboard.cleanup()

Best Practices

  • Always call client.destroy() when done
  • Use try/finally blocks to ensure cleanup
  • Reuse client instances when possible
  • Consider connection pooling for high-traffic applications
  • Use filters to reduce result set size
  • Set appropriate limits for large queries
  • Disable load_input/load_output when not needed
  • Use pagination for very large result sets
  • Handle DBOSNonExistentWorkflowError when retrieving workflows
  • Check workflow status before waiting for results
  • Implement timeouts for long-running operations
  • Log all API errors for debugging
  • Regularly check for ERROR status workflows
  • Monitor queue depths
  • Track workflow duration trends
  • Set up alerts for critical failures

Next Steps

Queue Tutorial

Learn about workflow queuing and management

Scheduled Workflows

Manage scheduled workflows programmatically

Error Handling

Implement robust error handling strategies

Configuration

Configure DBOS client connections

Build docs developers (and LLMs) love