The DBOSClient class provides a lightweight client for remotely managing DBOS workflows, queues, and schedules without running a full DBOS application. It’s ideal for:
- Enqueueing workflows from external services
- Monitoring workflow status from scripts or dashboards
- Managing schedules programmatically
- Building custom workflow management tools
Constructor
from dbos import DBOSClient
client = DBOSClient(
system_database_url=None,
application_database_url=None,
system_database_engine=None,
dbos_system_schema="dbos",
serializer=DefaultSerializer()
)
URL for the DBOS system database. If not provided, derived from application_database_url.
URL for the application database. Used to derive system database URL if system_database_url is not provided.
system_database_engine
sqlalchemy.Engine
default:"None"
Pre-configured SQLAlchemy engine for the system database. If provided, URLs are ignored.
Schema name for DBOS system tables.
serializer
Serializer
default:"DefaultSerializer()"
Serializer for workflow arguments and return values.
Example: Basic Usage
from dbos import DBOSClient, EnqueueOptions
# Create a client
client = DBOSClient(
application_database_url="postgresql://user:pass@localhost/myapp"
)
# Enqueue a workflow
options: EnqueueOptions = {
"workflow_name": "process_order",
"queue_name": "orders",
}
handle = client.enqueue(options, "order-123", customer_id="456")
# Wait for result
result = handle.get_result()
print(f"Result: {result}")
# Check status
status = handle.get_status()
print(f"Status: {status.status}")
# Clean up
client.destroy()
Workflow Management
enqueue
handle = client.enqueue(options, *args, **kwargs)
Enqueue a workflow for execution.
Configuration for the workflow. Must include workflow_name and queue_name.
Positional arguments for the workflow function
Keyword arguments for the workflow function
A handle to track the workflow execution
Example:
from dbos import EnqueueOptions
options: EnqueueOptions = {
"workflow_name": "send_email",
"queue_name": "emails",
"workflow_id": "email-unique-123",
"priority": 10,
"authenticated_user": "[email protected]",
}
handle = client.enqueue(
options,
to="[email protected]",
subject="Welcome",
body="Thanks for signing up!"
)
enqueue_async
handle = await client.enqueue_async(options, *args, **kwargs)
Async version of enqueue().
An async handle to track the workflow
retrieve_workflow
handle = client.retrieve_workflow(workflow_id)
Retrieve a handle to an existing workflow.
Example:
# Retrieve a previously enqueued workflow
handle = client.retrieve_workflow("my-workflow-id")
status = handle.get_status()
print(f"Workflow status: {status.status}")
if status.status == "SUCCESS":
result = handle.get_result()
print(f"Result: {result}")
retrieve_workflow_async
handle = await client.retrieve_workflow_async(workflow_id)
Async version of retrieve_workflow().
cancel_workflow
client.cancel_workflow(workflow_id)
Cancel a running or pending workflow.
The workflow ID to cancel
resume_workflow
handle = client.resume_workflow(workflow_id)
Resume a cancelled workflow.
The workflow ID to resume
A handle to the resumed workflow
list_workflows
workflows = client.list_workflows(
workflow_ids=None,
status=None,
start_time=None,
end_time=None,
name=None,
app_version=None,
queue_name=None,
limit=None,
offset=None,
sort_desc=False
)
List workflows matching specified criteria.
Filter by specific workflow IDs
status
str | list[str]
default:"None"
Filter by status (e.g., “SUCCESS”, “PENDING”, “ERROR”)
Filter workflows created after this time (ISO 8601 format)
Filter workflows created before this time (ISO 8601 format)
name
str | list[str]
default:"None"
Filter by workflow name
queue_name
str | list[str]
default:"None"
Filter by queue name
Maximum number of results
Number of results to skip
Sort in descending order by creation time
List of matching workflow status objects
Example:
# List all failed workflows from the last hour
import datetime
end = datetime.datetime.now()
start = end - datetime.timedelta(hours=1)
failed = client.list_workflows(
status="ERROR",
start_time=start.isoformat(),
end_time=end.isoformat(),
sort_desc=True
)
for wf in failed:
print(f"Failed workflow: {wf.workflow_id} - {wf.error}")
list_queued_workflows
queued = client.list_queued_workflows(
queue_name=None,
status=None,
limit=None
)
List workflows currently in queues. Accepts the same parameters as list_workflows(), but filters to only queued workflows.
list_workflow_steps
steps = client.list_workflow_steps(workflow_id)
List all steps executed by a workflow.
fork_workflow
handle = client.fork_workflow(workflow_id, start_step, application_version=None)
Create a new workflow by forking from a specific step.
The workflow to fork from
Application version for the forked workflow
Handle to the forked workflow
wait_first
completed = client.wait_first(handles, polling_interval_sec=1.0)
Wait for the first of multiple workflows to complete.
handles
List[WorkflowHandle[Any]]
required
List of workflow handles to wait on
Polling interval in seconds
The first handle that completed
Workflow Communication
send
client.send(destination_id, message, topic=None, idempotency_key=None)
Send a message to a workflow.
The workflow ID to send to
The message to send (must be serializable)
Optional topic for message filtering
Optional key to prevent duplicate sends
get_event
value = client.get_event(workflow_id, key, timeout_seconds=60)
Retrieve an event from a workflow.
Maximum time to wait for the event
read_stream
for value in client.read_stream(workflow_id, key):
process(value)
Read values from a workflow stream.
Generator[Any, Any, None]
Generator yielding stream values
Schedule Management
create_schedule
client.create_schedule(
schedule_name="hourly_job",
workflow_name="cleanup_workflow",
schedule="0 0 * * * *",
context=None
)
Create a new workflow schedule.
Unique name for the schedule
Fully-qualified workflow function name
Cron expression (6 fields: second minute hour day month weekday)
Context object passed to each invocation
Class name for static method workflows
ClientScheduleInput Structure:
Used when creating schedules via the client API.
from typing import TypedDict, Optional, Any
class ClientScheduleInput(TypedDict, total=False):
# Human-readable schedule name (required)
schedule_name: str
# Workflow function name to execute (required)
workflow_name: str
# Class name for static method workflows
workflow_class_name: Optional[str]
# Cron expression defining the schedule (required)
schedule: str
# Context object passed to each execution
context: Any
list_schedules
schedules = client.list_schedules(
status=None,
workflow_name=None,
schedule_name_prefix=None
)
List all schedules.
status
str | List[str]
default:"None"
Filter by status (e.g., “ACTIVE”, “PAUSED”)
workflow_name
str | List[str]
default:"None"
Filter by workflow name
schedule_name_prefix
str | List[str]
default:"None"
Filter by schedule name prefix
WorkflowSchedule Structure:
from typing import TypedDict, Optional, Any
class WorkflowSchedule(TypedDict):
# Unique schedule identifier
schedule_id: str
# Human-readable schedule name
schedule_name: str
# Workflow function name to execute
workflow_name: str
# Class name for static method workflows
workflow_class_name: Optional[str]
# Cron expression defining the schedule
schedule: str
# Current status (e.g., "ACTIVE", "PAUSED")
status: str
# Context object passed to each execution
context: Any
Example:
schedules = client.list_schedules()
for schedule in schedules:
print(f"Schedule: {schedule['schedule_name']}")
print(f" Workflow: {schedule['workflow_name']}")
print(f" Cron: {schedule['schedule']}")
print(f" Status: {schedule['status']}")
get_schedule
schedule = client.get_schedule(name)
Get a specific schedule by name.
Optional[WorkflowSchedule]
Optional[WorkflowSchedule]
The schedule object, or None if not found
delete_schedule
client.delete_schedule(name)
Delete a schedule.
The schedule name to delete
pause_schedule / resume_schedule
client.pause_schedule(name)
client.resume_schedule(name)
Pause or resume a schedule.
backfill_schedule
handles = client.backfill_schedule(schedule_name, start, end)
Enqueue all missed executions of a schedule between two times.
Start of backfill window (exclusive)
End of backfill window (exclusive)
List[WorkflowHandle[None]]
List[WorkflowHandle[None]]
Handles for each enqueued execution
trigger_schedule
handle = client.trigger_schedule(schedule_name)
Manually trigger a schedule to run immediately.
Handle to the triggered workflow
EnqueueOptions
from typing import TypedDict
class EnqueueOptions(TypedDict, total=False):
# Required fields
workflow_name: str
queue_name: str
# Optional fields
workflow_id: str
app_version: str
workflow_timeout: float
deduplication_id: str
priority: int
max_recovery_attempts: int
queue_partition_key: str
authenticated_user: str
authenticated_roles: list[str]
serialization_type: WorkflowSerializationFormat
class_name: str
instance_name: str
Example with all options:
options: EnqueueOptions = {
"workflow_name": "process_payment",
"queue_name": "payments",
"workflow_id": "payment-unique-123",
"app_version": "v1.2.3",
"workflow_timeout": 300.0, # 5 minutes
"deduplication_id": "payment-dedup-123",
"priority": 10,
"max_recovery_attempts": 5,
"queue_partition_key": "customer-456",
"authenticated_user": "[email protected]",
"authenticated_roles": ["admin", "finance"],
}
handle = client.enqueue(options, payment_amount=99.99)
Type Definitions
TypedDict used internally by DBOS for schedule management with workflow function references.
from typing import TypedDict, Callable, Any
from datetime import datetime
class ScheduleInput(TypedDict):
# Unique schedule name
schedule_name: str
# Workflow function to execute
workflow_fn: Callable[[datetime, Any], None]
# Cron expression
schedule: str
# Context passed to each execution
context: Any
Note: This is typically used internally by DBOS.create_schedule(). For client-side schedule creation, use ClientScheduleInput with client.create_schedule().
Example:
from dbos import DBOS
from datetime import datetime
from typing import Any
@DBOS.workflow()
def my_scheduled_task(scheduled_time: datetime, context: Any) -> None:
print(f"Running at {scheduled_time} with context {context}")
# Internal usage (via DBOS class)
schedule_input: ScheduleInput = {
"schedule_name": "hourly_task",
"workflow_fn": my_scheduled_task,
"schedule": "0 0 * * * *", # Every hour
"context": {"key": "value"}
}
Cleanup
destroy
Close database connections and clean up resources. Always call this when done with the client.
Example:
client = DBOSClient(application_database_url=url)
try:
# Use the client
handle = client.enqueue(options, ...)
result = handle.get_result()
finally:
client.destroy()