The DBOS class is the main entry point for DBOS Transact functionality. It provides decorators for defining workflows, transactions, and steps, as well as methods for starting workflows, managing their lifecycle, and interacting with them.
Overview
The DBOS class is a singleton that provides:
- Decorators for defining workflows, transactions, steps, and scheduled tasks
- Methods for starting and managing workflow executions
- Access to the current execution context (workflow ID, SQL session, logger, etc.)
- Workflow communication via events and messages
- Durable sleep and streaming capabilities
Initialization
from dbos import DBOS, DBOSConfig
config = DBOSConfig(...)
dbos = DBOS(config=config)
Database and application configuration
Optional FastAPI application instance for middleware integration
Optional Flask application instance for middleware integration
URL for DBOS Conductor connection
API key for DBOS Conductor authentication
Lifecycle Methods
launch
Launch the DBOS runtime. This starts background threads for workflow recovery, queue processing, and scheduled tasks.
destroy
DBOS.destroy(destroy_registry=False, workflow_completion_timeout_sec=0)
Shut down the DBOS runtime gracefully.
If True, also clear the global function registry
workflow_completion_timeout_sec
Number of seconds to wait for active workflows to complete before shutting down
reset_system_database
DBOS.reset_system_database()
Destroy and recreate the DBOS system database. Only use this in test environments.
This is a destructive operation that deletes all workflow history and status.
Workflow Management
start_workflow
handle = DBOS.start_workflow(func, *args, **kwargs)
Start a workflow function in the background and return a handle to track its execution.
A workflow function decorated with @DBOS.workflow()
Positional arguments to pass to the workflow function
Keyword arguments to pass to the workflow function
A handle to the started workflow that can be used to retrieve its status and result
Example:
@DBOS.workflow()
def process_order(order_id: str) -> str:
return f"Processed {order_id}"
handle = DBOS.start_workflow(process_order, "order-123")
result = handle.get_result() # Wait for completion
start_workflow_async
handle = await DBOS.start_workflow_async(func, *args, **kwargs)
Async version of start_workflow(). Must be called from an async context.
An async handle to the started workflow
retrieve_workflow
handle = DBOS.retrieve_workflow(workflow_id, existing_workflow=True)
Retrieve a handle to an existing workflow by its ID.
The unique identifier of the workflow
If True, verify the workflow exists before returning the handle
get_workflow_status
status = DBOS.get_workflow_status(workflow_id)
Get the current status of a workflow.
The unique identifier of the workflow
The workflow status object, or None if the workflow doesn’t exist
get_result
result = DBOS.get_result(workflow_id)
Wait for a workflow to complete and return its result.
The unique identifier of the workflow
The workflow’s return value, or None if it hasn’t completed
cancel_workflow
DBOS.cancel_workflow(workflow_id)
Cancel a running workflow.
The unique identifier of the workflow to cancel
resume_workflow
handle = DBOS.resume_workflow(workflow_id)
Resume a previously cancelled workflow.
The unique identifier of the workflow to resume
A handle to the resumed workflow
delete_workflow
DBOS.delete_workflow(workflow_id, delete_children=False)
Delete a workflow and all its associated data.
The unique identifier of the workflow to delete
If True, also delete all child workflows recursively
fork_workflow
handle = DBOS.fork_workflow(workflow_id, start_step, application_version=None)
Create a new workflow execution by forking from a specific step of an existing workflow.
The workflow ID to fork from
The step ID to start execution from
Optional application version for the forked workflow
A handle to the forked workflow
list_workflows
workflows = DBOS.list_workflows(
workflow_ids=None,
status=None,
start_time=None,
end_time=None,
name=None,
app_version=None,
limit=None,
offset=None,
sort_desc=False
)
List workflows matching specified criteria.
Filter by specific workflow IDs
status
str | list[str]
default:"None"
Filter by workflow status (e.g., “SUCCESS”, “PENDING”)
Filter workflows created after this time (ISO format)
Filter workflows created before this time (ISO format)
name
str | list[str]
default:"None"
Filter by workflow function name
Maximum number of results to return
Number of results to skip
Sort results in descending order by creation time
List of workflow status objects matching the criteria
list_workflow_steps
steps = DBOS.list_workflow_steps(workflow_id)
List all steps executed by a workflow.
List of step information objects
StepInfo Structure:
from typing import TypedDict, Optional, Any
class StepInfo(TypedDict):
# The unique ID of the step in the workflow
function_id: int
# The (fully qualified) name of the step
function_name: str
# The step's output, if any
output: Optional[Any]
# The error the step threw, if any
error: Optional[Exception]
# If the step starts or retrieves the result of a workflow, its ID
child_workflow_id: Optional[str]
# The Unix epoch timestamp at which this step started
started_at_epoch_ms: Optional[int]
# The Unix epoch timestamp at which this step completed
completed_at_epoch_ms: Optional[int]
Example:
from dbos import DBOS
@DBOS.step()
def step_one() -> str:
return "result1"
@DBOS.step()
def step_two() -> str:
return "result2"
@DBOS.workflow()
def my_workflow():
step_one()
step_two()
handle = DBOS.start_workflow(my_workflow)
handle.get_result()
# List all steps
steps = DBOS.list_workflow_steps(handle.workflow_id)
for step in steps:
print(f"Step {step['function_id']}: {step['function_name']}")
print(f" Output: {step['output']}")
if step['error']:
print(f" Error: {step['error']}")
if step['started_at_epoch_ms']:
duration = step['completed_at_epoch_ms'] - step['started_at_epoch_ms']
print(f" Duration: {duration}ms")
wait_first
completed_handle = DBOS.wait_first(handles, polling_interval_sec=1.0)
Wait for any one of the given workflow handles to complete.
handles
List[WorkflowHandle[Any]]
required
List of workflow handles to wait on
How often to poll for completion (in seconds)
The first handle that completed
Workflow Execution Helpers
run_step
result = DBOS.run_step(dbos_step_options, func, *args, **kwargs)
Execute a function as a durable step within a workflow. The step’s result is checkpointed.
dbos_step_options
Optional[StepOptions]
required
Step configuration options (can be None for defaults)
The function to execute as a step
Positional arguments for the function
Keyword arguments for the function
The return value of the function
Example:
@DBOS.workflow()
def my_workflow():
# Execute a lambda as a step
result = DBOS.run_step(None, lambda x: x * 2, 21)
return result # Returns 42
sleep
Durable sleep within a workflow. The sleep is checkpointed, so on recovery, completed sleeps are skipped.
Number of seconds to sleep
Example:
@DBOS.workflow()
def delayed_workflow():
DBOS.sleep(60) # Sleep for 60 seconds
return "Done"
Workflow Communication
send
DBOS.send(destination_id, message, topic=None, idempotency_key=None)
Send a message to another workflow.
The workflow ID to send the message to
The message to send (must be serializable)
Optional topic for categorizing messages
Optional key to prevent duplicate sends
recv
message = DBOS.recv(topic=None, timeout_seconds=60)
Receive a message within a workflow. Blocks until a message arrives or timeout.
Optional topic to filter messages
Maximum time to wait for a message
set_event
DBOS.set_event(key, value)
Set an event value for the current workflow. Other workflows can retrieve this value.
The event value (must be serializable)
get_event
value = DBOS.get_event(workflow_id, key, timeout_seconds=60)
Retrieve an event value from another workflow. Blocks until the event is set or timeout.
The workflow ID that will set the event
The event key to retrieve
Maximum time to wait for the event
get_all_events
events = DBOS.get_all_events(workflow_id)
Retrieve all current events for a workflow.
A dictionary mapping event keys to their values
Streaming
write_stream
DBOS.write_stream(key, value)
Write a value to a stream within a workflow.
The value to write (must be serializable)
close_stream
Close a stream to signal no more values will be written.
The stream key/name to close
read_stream
for value in DBOS.read_stream(workflow_id, key):
process(value)
Read values from a stream as a generator.
The workflow ID that owns the stream
The stream key/name to read from
Generator[Any, Any, None]
A generator yielding stream values in order
Context Properties
These class properties provide access to the current execution context:
workflow_id
The current workflow ID, or None if not in a workflow
step_id
The current step ID within the workflow, or None if not in a step
sql_session
session = DBOS.sql_session
The SQLAlchemy session for the current transaction. Only available within a @DBOS.transaction() function.
logger
DBOS.logger.info("Processing order")
authenticated_user
user = DBOS.authenticated_user
The currently authenticated user, if any
authenticated_roles
roles = DBOS.authenticated_roles
The roles of the authenticated user, if any
application_version
version = DBOS.application_version
The current application version
executor_id
exec_id = DBOS.executor_id
The unique ID of this DBOS executor instance
Type Definitions
VersionInfo
TypedDict containing version information for application versions.
from typing import TypedDict
class VersionInfo(TypedDict):
# Unique version identifier
version_id: str
# Human-readable version name
version_name: str
# Unix epoch timestamp when version was deployed
version_timestamp: int
# Unix epoch timestamp when version record was created
created_at: int
Example:
from dbos import DBOS
# Get version info (internal API)
# This is typically used internally by DBOS
version_info = {
"version_id": "v1.2.3-abc123",
"version_name": "1.2.3",
"version_timestamp": 1699999999000,
"created_at": 1699999999000
}