Skip to main content
The DBOS class is the main entry point for DBOS Transact functionality. It provides decorators for defining workflows, transactions, and steps, as well as methods for starting workflows, managing their lifecycle, and interacting with them.

Overview

The DBOS class is a singleton that provides:
  • Decorators for defining workflows, transactions, steps, and scheduled tasks
  • Methods for starting and managing workflow executions
  • Access to the current execution context (workflow ID, SQL session, logger, etc.)
  • Workflow communication via events and messages
  • Durable sleep and streaming capabilities

Initialization

from dbos import DBOS, DBOSConfig

config = DBOSConfig(...)
dbos = DBOS(config=config)
config
DBOSConfig
required
Database and application configuration
fastapi
FastAPI
default:"None"
Optional FastAPI application instance for middleware integration
flask
Flask
default:"None"
Optional Flask application instance for middleware integration
conductor_url
str
default:"None"
URL for DBOS Conductor connection
conductor_key
str
default:"None"
API key for DBOS Conductor authentication

Lifecycle Methods

launch

DBOS.launch()
Launch the DBOS runtime. This starts background threads for workflow recovery, queue processing, and scheduled tasks.

destroy

DBOS.destroy(destroy_registry=False, workflow_completion_timeout_sec=0)
Shut down the DBOS runtime gracefully.
destroy_registry
bool
default:"False"
If True, also clear the global function registry
workflow_completion_timeout_sec
int
default:"0"
Number of seconds to wait for active workflows to complete before shutting down

reset_system_database

DBOS.reset_system_database()
Destroy and recreate the DBOS system database. Only use this in test environments.
This is a destructive operation that deletes all workflow history and status.

Workflow Management

start_workflow

handle = DBOS.start_workflow(func, *args, **kwargs)
Start a workflow function in the background and return a handle to track its execution.
func
Callable[P, R]
required
A workflow function decorated with @DBOS.workflow()
args
P.args
Positional arguments to pass to the workflow function
kwargs
P.kwargs
Keyword arguments to pass to the workflow function
WorkflowHandle[R]
WorkflowHandle[R]
A handle to the started workflow that can be used to retrieve its status and result
Example:
@DBOS.workflow()
def process_order(order_id: str) -> str:
    return f"Processed {order_id}"

handle = DBOS.start_workflow(process_order, "order-123")
result = handle.get_result()  # Wait for completion

start_workflow_async

handle = await DBOS.start_workflow_async(func, *args, **kwargs)
Async version of start_workflow(). Must be called from an async context.
WorkflowHandleAsync[R]
WorkflowHandleAsync[R]
An async handle to the started workflow

retrieve_workflow

handle = DBOS.retrieve_workflow(workflow_id, existing_workflow=True)
Retrieve a handle to an existing workflow by its ID.
workflow_id
str
required
The unique identifier of the workflow
existing_workflow
bool
default:"True"
If True, verify the workflow exists before returning the handle
WorkflowHandle[R]
WorkflowHandle[R]
A handle to the workflow

get_workflow_status

status = DBOS.get_workflow_status(workflow_id)
Get the current status of a workflow.
workflow_id
str
required
The unique identifier of the workflow
WorkflowStatus
Optional[WorkflowStatus]
The workflow status object, or None if the workflow doesn’t exist

get_result

result = DBOS.get_result(workflow_id)
Wait for a workflow to complete and return its result.
workflow_id
str
required
The unique identifier of the workflow
Any
Optional[Any]
The workflow’s return value, or None if it hasn’t completed

cancel_workflow

DBOS.cancel_workflow(workflow_id)
Cancel a running workflow.
workflow_id
str
required
The unique identifier of the workflow to cancel

resume_workflow

handle = DBOS.resume_workflow(workflow_id)
Resume a previously cancelled workflow.
workflow_id
str
required
The unique identifier of the workflow to resume
WorkflowHandle[Any]
WorkflowHandle[Any]
A handle to the resumed workflow

delete_workflow

DBOS.delete_workflow(workflow_id, delete_children=False)
Delete a workflow and all its associated data.
workflow_id
str
required
The unique identifier of the workflow to delete
delete_children
bool
default:"False"
If True, also delete all child workflows recursively

fork_workflow

handle = DBOS.fork_workflow(workflow_id, start_step, application_version=None)
Create a new workflow execution by forking from a specific step of an existing workflow.
workflow_id
str
required
The workflow ID to fork from
start_step
int
required
The step ID to start execution from
application_version
str
default:"None"
Optional application version for the forked workflow
WorkflowHandle[Any]
WorkflowHandle[Any]
A handle to the forked workflow

list_workflows

workflows = DBOS.list_workflows(
    workflow_ids=None,
    status=None,
    start_time=None,
    end_time=None,
    name=None,
    app_version=None,
    limit=None,
    offset=None,
    sort_desc=False
)
List workflows matching specified criteria.
workflow_ids
List[str]
default:"None"
Filter by specific workflow IDs
status
str | list[str]
default:"None"
Filter by workflow status (e.g., “SUCCESS”, “PENDING”)
start_time
str
default:"None"
Filter workflows created after this time (ISO format)
end_time
str
default:"None"
Filter workflows created before this time (ISO format)
name
str | list[str]
default:"None"
Filter by workflow function name
limit
int
default:"None"
Maximum number of results to return
offset
int
default:"None"
Number of results to skip
sort_desc
bool
default:"False"
Sort results in descending order by creation time
List[WorkflowStatus]
List[WorkflowStatus]
List of workflow status objects matching the criteria

list_workflow_steps

steps = DBOS.list_workflow_steps(workflow_id)
List all steps executed by a workflow.
workflow_id
str
required
The workflow ID to query
List[StepInfo]
List[StepInfo]
List of step information objects
StepInfo Structure:
from typing import TypedDict, Optional, Any

class StepInfo(TypedDict):
    # The unique ID of the step in the workflow
    function_id: int
    # The (fully qualified) name of the step
    function_name: str
    # The step's output, if any
    output: Optional[Any]
    # The error the step threw, if any
    error: Optional[Exception]
    # If the step starts or retrieves the result of a workflow, its ID
    child_workflow_id: Optional[str]
    # The Unix epoch timestamp at which this step started
    started_at_epoch_ms: Optional[int]
    # The Unix epoch timestamp at which this step completed
    completed_at_epoch_ms: Optional[int]
Example:
from dbos import DBOS

@DBOS.step()
def step_one() -> str:
    return "result1"

@DBOS.step()
def step_two() -> str:
    return "result2"

@DBOS.workflow()
def my_workflow():
    step_one()
    step_two()

handle = DBOS.start_workflow(my_workflow)
handle.get_result()

# List all steps
steps = DBOS.list_workflow_steps(handle.workflow_id)
for step in steps:
    print(f"Step {step['function_id']}: {step['function_name']}")
    print(f"  Output: {step['output']}")
    if step['error']:
        print(f"  Error: {step['error']}")
    if step['started_at_epoch_ms']:
        duration = step['completed_at_epoch_ms'] - step['started_at_epoch_ms']
        print(f"  Duration: {duration}ms")

wait_first

completed_handle = DBOS.wait_first(handles, polling_interval_sec=1.0)
Wait for any one of the given workflow handles to complete.
handles
List[WorkflowHandle[Any]]
required
List of workflow handles to wait on
polling_interval_sec
float
default:"1.0"
How often to poll for completion (in seconds)
WorkflowHandle[Any]
WorkflowHandle[Any]
The first handle that completed

Workflow Execution Helpers

run_step

result = DBOS.run_step(dbos_step_options, func, *args, **kwargs)
Execute a function as a durable step within a workflow. The step’s result is checkpointed.
dbos_step_options
Optional[StepOptions]
required
Step configuration options (can be None for defaults)
func
Callable
required
The function to execute as a step
args
P.args
Positional arguments for the function
kwargs
P.kwargs
Keyword arguments for the function
R
R
The return value of the function
Example:
@DBOS.workflow()
def my_workflow():
    # Execute a lambda as a step
    result = DBOS.run_step(None, lambda x: x * 2, 21)
    return result  # Returns 42

sleep

DBOS.sleep(seconds)
Durable sleep within a workflow. The sleep is checkpointed, so on recovery, completed sleeps are skipped.
seconds
float
required
Number of seconds to sleep
Example:
@DBOS.workflow()
def delayed_workflow():
    DBOS.sleep(60)  # Sleep for 60 seconds
    return "Done"

Workflow Communication

send

DBOS.send(destination_id, message, topic=None, idempotency_key=None)
Send a message to another workflow.
destination_id
str
required
The workflow ID to send the message to
message
Any
required
The message to send (must be serializable)
topic
str
default:"None"
Optional topic for categorizing messages
idempotency_key
str
default:"None"
Optional key to prevent duplicate sends

recv

message = DBOS.recv(topic=None, timeout_seconds=60)
Receive a message within a workflow. Blocks until a message arrives or timeout.
topic
str
default:"None"
Optional topic to filter messages
timeout_seconds
float
default:"60"
Maximum time to wait for a message
Any
Any
The received message

set_event

DBOS.set_event(key, value)
Set an event value for the current workflow. Other workflows can retrieve this value.
key
str
required
The event key
value
Any
required
The event value (must be serializable)

get_event

value = DBOS.get_event(workflow_id, key, timeout_seconds=60)
Retrieve an event value from another workflow. Blocks until the event is set or timeout.
workflow_id
str
required
The workflow ID that will set the event
key
str
required
The event key to retrieve
timeout_seconds
float
default:"60"
Maximum time to wait for the event
Any
Any
The event value

get_all_events

events = DBOS.get_all_events(workflow_id)
Retrieve all current events for a workflow.
workflow_id
str
required
The workflow ID to query
Dict[str, Any]
Dict[str, Any]
A dictionary mapping event keys to their values

Streaming

write_stream

DBOS.write_stream(key, value)
Write a value to a stream within a workflow.
key
str
required
The stream key/name
value
Any
required
The value to write (must be serializable)

close_stream

DBOS.close_stream(key)
Close a stream to signal no more values will be written.
key
str
required
The stream key/name to close

read_stream

for value in DBOS.read_stream(workflow_id, key):
    process(value)
Read values from a stream as a generator.
workflow_id
str
required
The workflow ID that owns the stream
key
str
required
The stream key/name to read from
Generator[Any, Any, None]
Generator
A generator yielding stream values in order

Context Properties

These class properties provide access to the current execution context:

workflow_id

wf_id = DBOS.workflow_id
Optional[str]
Optional[str]
The current workflow ID, or None if not in a workflow

step_id

step_id = DBOS.step_id
Optional[int]
Optional[int]
The current step ID within the workflow, or None if not in a step

sql_session

session = DBOS.sql_session
Session
sqlalchemy.orm.Session
The SQLAlchemy session for the current transaction. Only available within a @DBOS.transaction() function.

logger

DBOS.logger.info("Processing order")
Logger
logging.Logger
The DBOS logger instance

authenticated_user

user = DBOS.authenticated_user
Optional[str]
Optional[str]
The currently authenticated user, if any

authenticated_roles

roles = DBOS.authenticated_roles
Optional[List[str]]
Optional[List[str]]
The roles of the authenticated user, if any

application_version

version = DBOS.application_version
str
str
The current application version

executor_id

exec_id = DBOS.executor_id
str
str
The unique ID of this DBOS executor instance

Type Definitions

VersionInfo

TypedDict containing version information for application versions.
from typing import TypedDict

class VersionInfo(TypedDict):
    # Unique version identifier
    version_id: str
    # Human-readable version name
    version_name: str
    # Unix epoch timestamp when version was deployed
    version_timestamp: int
    # Unix epoch timestamp when version record was created
    created_at: int
Example:
from dbos import DBOS

# Get version info (internal API)
# This is typically used internally by DBOS
version_info = {
    "version_id": "v1.2.3-abc123",
    "version_name": "1.2.3",
    "version_timestamp": 1699999999000,
    "created_at": 1699999999000
}

Build docs developers (and LLMs) love