Skip to main content
The DBOSClient class provides a lightweight client for remotely managing DBOS workflows, queues, and schedules without running a full DBOS application. It’s ideal for:
  • Enqueueing workflows from external services
  • Monitoring workflow status from scripts or dashboards
  • Managing schedules programmatically
  • Building custom workflow management tools

Constructor

from dbos import DBOSClient

client = DBOSClient(
    system_database_url=None,
    application_database_url=None,
    system_database_engine=None,
    dbos_system_schema="dbos",
    serializer=DefaultSerializer()
)
system_database_url
str
default:"None"
URL for the DBOS system database. If not provided, derived from application_database_url.
application_database_url
str
default:"None"
URL for the application database. Used to derive system database URL if system_database_url is not provided.
system_database_engine
sqlalchemy.Engine
default:"None"
Pre-configured SQLAlchemy engine for the system database. If provided, URLs are ignored.
dbos_system_schema
str
default:"dbos"
Schema name for DBOS system tables.
serializer
Serializer
default:"DefaultSerializer()"
Serializer for workflow arguments and return values.

Example: Basic Usage

from dbos import DBOSClient, EnqueueOptions

# Create a client
client = DBOSClient(
    application_database_url="postgresql://user:pass@localhost/myapp"
)

# Enqueue a workflow
options: EnqueueOptions = {
    "workflow_name": "process_order",
    "queue_name": "orders",
}

handle = client.enqueue(options, "order-123", customer_id="456")

# Wait for result
result = handle.get_result()
print(f"Result: {result}")

# Check status
status = handle.get_status()
print(f"Status: {status.status}")

# Clean up
client.destroy()

Workflow Management

enqueue

handle = client.enqueue(options, *args, **kwargs)
Enqueue a workflow for execution.
options
EnqueueOptions
required
Configuration for the workflow. Must include workflow_name and queue_name.
args
Any
Positional arguments for the workflow function
kwargs
Any
Keyword arguments for the workflow function
WorkflowHandle[R]
WorkflowHandle[R]
A handle to track the workflow execution
Example:
from dbos import EnqueueOptions

options: EnqueueOptions = {
    "workflow_name": "send_email",
    "queue_name": "emails",
    "workflow_id": "email-unique-123",
    "priority": 10,
    "authenticated_user": "[email protected]",
}

handle = client.enqueue(
    options,
    to="[email protected]",
    subject="Welcome",
    body="Thanks for signing up!"
)

enqueue_async

handle = await client.enqueue_async(options, *args, **kwargs)
Async version of enqueue().
WorkflowHandleAsync[R]
WorkflowHandleAsync[R]
An async handle to track the workflow

retrieve_workflow

handle = client.retrieve_workflow(workflow_id)
Retrieve a handle to an existing workflow.
workflow_id
str
required
The unique workflow ID
WorkflowHandle[R]
WorkflowHandle[R]
A handle to the workflow
Example:
# Retrieve a previously enqueued workflow
handle = client.retrieve_workflow("my-workflow-id")
status = handle.get_status()
print(f"Workflow status: {status.status}")

if status.status == "SUCCESS":
    result = handle.get_result()
    print(f"Result: {result}")

retrieve_workflow_async

handle = await client.retrieve_workflow_async(workflow_id)
Async version of retrieve_workflow().

cancel_workflow

client.cancel_workflow(workflow_id)
Cancel a running or pending workflow.
workflow_id
str
required
The workflow ID to cancel

resume_workflow

handle = client.resume_workflow(workflow_id)
Resume a cancelled workflow.
workflow_id
str
required
The workflow ID to resume
WorkflowHandle[Any]
WorkflowHandle[Any]
A handle to the resumed workflow

list_workflows

workflows = client.list_workflows(
    workflow_ids=None,
    status=None,
    start_time=None,
    end_time=None,
    name=None,
    app_version=None,
    queue_name=None,
    limit=None,
    offset=None,
    sort_desc=False
)
List workflows matching specified criteria.
workflow_ids
List[str]
default:"None"
Filter by specific workflow IDs
status
str | list[str]
default:"None"
Filter by status (e.g., “SUCCESS”, “PENDING”, “ERROR”)
start_time
str
default:"None"
Filter workflows created after this time (ISO 8601 format)
end_time
str
default:"None"
Filter workflows created before this time (ISO 8601 format)
name
str | list[str]
default:"None"
Filter by workflow name
queue_name
str | list[str]
default:"None"
Filter by queue name
limit
int
default:"None"
Maximum number of results
offset
int
default:"None"
Number of results to skip
sort_desc
bool
default:"False"
Sort in descending order by creation time
List[WorkflowStatus]
List[WorkflowStatus]
List of matching workflow status objects
Example:
# List all failed workflows from the last hour
import datetime

end = datetime.datetime.now()
start = end - datetime.timedelta(hours=1)

failed = client.list_workflows(
    status="ERROR",
    start_time=start.isoformat(),
    end_time=end.isoformat(),
    sort_desc=True
)

for wf in failed:
    print(f"Failed workflow: {wf.workflow_id} - {wf.error}")

list_queued_workflows

queued = client.list_queued_workflows(
    queue_name=None,
    status=None,
    limit=None
)
List workflows currently in queues. Accepts the same parameters as list_workflows(), but filters to only queued workflows.

list_workflow_steps

steps = client.list_workflow_steps(workflow_id)
List all steps executed by a workflow.
workflow_id
str
required
The workflow ID to query
List[StepInfo]
List[StepInfo]
List of step information

fork_workflow

handle = client.fork_workflow(workflow_id, start_step, application_version=None)
Create a new workflow by forking from a specific step.
workflow_id
str
required
The workflow to fork from
start_step
int
required
Step ID to start from
application_version
str
default:"None"
Application version for the forked workflow
WorkflowHandle[Any]
WorkflowHandle[Any]
Handle to the forked workflow

wait_first

completed = client.wait_first(handles, polling_interval_sec=1.0)
Wait for the first of multiple workflows to complete.
handles
List[WorkflowHandle[Any]]
required
List of workflow handles to wait on
polling_interval_sec
float
default:"1.0"
Polling interval in seconds
WorkflowHandle[Any]
WorkflowHandle[Any]
The first handle that completed

Workflow Communication

send

client.send(destination_id, message, topic=None, idempotency_key=None)
Send a message to a workflow.
destination_id
str
required
The workflow ID to send to
message
Any
required
The message to send (must be serializable)
topic
str
default:"None"
Optional topic for message filtering
idempotency_key
str
default:"None"
Optional key to prevent duplicate sends

get_event

value = client.get_event(workflow_id, key, timeout_seconds=60)
Retrieve an event from a workflow.
workflow_id
str
required
The workflow ID
key
str
required
The event key
timeout_seconds
float
default:"60"
Maximum time to wait for the event
Any
Any
The event value

read_stream

for value in client.read_stream(workflow_id, key):
    process(value)
Read values from a workflow stream.
workflow_id
str
required
The workflow ID
key
str
required
The stream key
Generator[Any, Any, None]
Generator
Generator yielding stream values

Schedule Management

create_schedule

client.create_schedule(
    schedule_name="hourly_job",
    workflow_name="cleanup_workflow",
    schedule="0 0 * * * *",
    context=None
)
Create a new workflow schedule.
schedule_name
str
required
Unique name for the schedule
workflow_name
str
required
Fully-qualified workflow function name
schedule
str
required
Cron expression (6 fields: second minute hour day month weekday)
context
Any
default:"None"
Context object passed to each invocation
workflow_class_name
str
default:"None"
Class name for static method workflows
ClientScheduleInput Structure: Used when creating schedules via the client API.
from typing import TypedDict, Optional, Any

class ClientScheduleInput(TypedDict, total=False):
    # Human-readable schedule name (required)
    schedule_name: str
    # Workflow function name to execute (required)
    workflow_name: str
    # Class name for static method workflows
    workflow_class_name: Optional[str]
    # Cron expression defining the schedule (required)
    schedule: str
    # Context object passed to each execution
    context: Any

list_schedules

schedules = client.list_schedules(
    status=None,
    workflow_name=None,
    schedule_name_prefix=None
)
List all schedules.
status
str | List[str]
default:"None"
Filter by status (e.g., “ACTIVE”, “PAUSED”)
workflow_name
str | List[str]
default:"None"
Filter by workflow name
schedule_name_prefix
str | List[str]
default:"None"
Filter by schedule name prefix
List[WorkflowSchedule]
List[WorkflowSchedule]
List of schedule objects
WorkflowSchedule Structure:
from typing import TypedDict, Optional, Any

class WorkflowSchedule(TypedDict):
    # Unique schedule identifier
    schedule_id: str
    # Human-readable schedule name
    schedule_name: str
    # Workflow function name to execute
    workflow_name: str
    # Class name for static method workflows
    workflow_class_name: Optional[str]
    # Cron expression defining the schedule
    schedule: str
    # Current status (e.g., "ACTIVE", "PAUSED")
    status: str
    # Context object passed to each execution
    context: Any
Example:
schedules = client.list_schedules()
for schedule in schedules:
    print(f"Schedule: {schedule['schedule_name']}")
    print(f"  Workflow: {schedule['workflow_name']}")
    print(f"  Cron: {schedule['schedule']}")
    print(f"  Status: {schedule['status']}")

get_schedule

schedule = client.get_schedule(name)
Get a specific schedule by name.
name
str
required
The schedule name
Optional[WorkflowSchedule]
Optional[WorkflowSchedule]
The schedule object, or None if not found

delete_schedule

client.delete_schedule(name)
Delete a schedule.
name
str
required
The schedule name to delete

pause_schedule / resume_schedule

client.pause_schedule(name)
client.resume_schedule(name)
Pause or resume a schedule.

backfill_schedule

handles = client.backfill_schedule(schedule_name, start, end)
Enqueue all missed executions of a schedule between two times.
schedule_name
str
required
The schedule name
start
datetime
required
Start of backfill window (exclusive)
end
datetime
required
End of backfill window (exclusive)
List[WorkflowHandle[None]]
List[WorkflowHandle[None]]
Handles for each enqueued execution

trigger_schedule

handle = client.trigger_schedule(schedule_name)
Manually trigger a schedule to run immediately.
schedule_name
str
required
The schedule name
WorkflowHandle[None]
WorkflowHandle[None]
Handle to the triggered workflow

EnqueueOptions

from typing import TypedDict

class EnqueueOptions(TypedDict, total=False):
    # Required fields
    workflow_name: str
    queue_name: str
    
    # Optional fields
    workflow_id: str
    app_version: str
    workflow_timeout: float
    deduplication_id: str
    priority: int
    max_recovery_attempts: int
    queue_partition_key: str
    authenticated_user: str
    authenticated_roles: list[str]
    serialization_type: WorkflowSerializationFormat
    class_name: str
    instance_name: str
Example with all options:
options: EnqueueOptions = {
    "workflow_name": "process_payment",
    "queue_name": "payments",
    "workflow_id": "payment-unique-123",
    "app_version": "v1.2.3",
    "workflow_timeout": 300.0,  # 5 minutes
    "deduplication_id": "payment-dedup-123",
    "priority": 10,
    "max_recovery_attempts": 5,
    "queue_partition_key": "customer-456",
    "authenticated_user": "[email protected]",
    "authenticated_roles": ["admin", "finance"],
}

handle = client.enqueue(options, payment_amount=99.99)

Type Definitions

ScheduleInput

TypedDict used internally by DBOS for schedule management with workflow function references.
from typing import TypedDict, Callable, Any
from datetime import datetime

class ScheduleInput(TypedDict):
    # Unique schedule name
    schedule_name: str
    # Workflow function to execute
    workflow_fn: Callable[[datetime, Any], None]
    # Cron expression
    schedule: str
    # Context passed to each execution
    context: Any
Note: This is typically used internally by DBOS.create_schedule(). For client-side schedule creation, use ClientScheduleInput with client.create_schedule(). Example:
from dbos import DBOS
from datetime import datetime
from typing import Any

@DBOS.workflow()
def my_scheduled_task(scheduled_time: datetime, context: Any) -> None:
    print(f"Running at {scheduled_time} with context {context}")

# Internal usage (via DBOS class)
schedule_input: ScheduleInput = {
    "schedule_name": "hourly_task",
    "workflow_fn": my_scheduled_task,
    "schedule": "0 0 * * * *",  # Every hour
    "context": {"key": "value"}
}

Cleanup

destroy

client.destroy()
Close database connections and clean up resources. Always call this when done with the client. Example:
client = DBOSClient(application_database_url=url)
try:
    # Use the client
    handle = client.enqueue(options, ...)
    result = handle.get_result()
finally:
    client.destroy()

Build docs developers (and LLMs) love