Skip to main content

What is a step?

A step is a function decorated with @DBOS.step() that performs a single unit of work within a workflow. Steps are idempotent and automatically retried on failure. When a step completes successfully, its result is stored in Postgres so it won’t re-execute during workflow recovery.
from dbos import DBOS
import requests

@DBOS.step()
def call_external_api(endpoint: str, data: dict):
    """Steps are perfect for non-deterministic operations like API calls"""
    response = requests.post(endpoint, json=data)
    response.raise_for_status()
    return response.json()

@DBOS.workflow()
def process_with_api(data: dict):
    # This step will only execute once, even if the workflow restarts
    api_result = call_external_api("https://api.example.com/process", data)
    return api_result

Why use steps?

Steps serve two critical purposes:

Non-deterministic operations

Safely execute operations that may produce different results each time (API calls, random numbers, timestamps)

Automatic retry

Built-in retry logic with exponential backoff for transient failures

Idempotency

Results are cached after successful execution, preventing duplicate operations during recovery

Error isolation

Step failures don’t crash the entire workflow; they’re retried automatically

Step decorator

The @DBOS.step() decorator accepts optional parameters:
name
str
Custom name for the step. Defaults to the function name.
retries_allowed
bool
default:"True"
Whether to retry the step on failure.
interval_seconds
float
default:"1.0"
Initial retry interval in seconds (doubles with each retry).
max_attempts
int
default:"3"
Maximum number of retry attempts before raising an exception.
backoff_rate
float
default:"2.0"
Multiplier for retry interval after each failure.
@DBOS.step(
    name="robust_api_call",
    max_attempts=5,
    interval_seconds=2.0,
    backoff_rate=2.0
)
def call_flaky_api(url: str):
    """Retries up to 5 times with exponential backoff: 2s, 4s, 8s, 16s"""
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.json()

Retry behavior

When a step fails (raises an exception), DBOS automatically retries it with exponential backoff:
  1. First attempt fails
  2. Wait interval_seconds (default: 1 second)
  3. Second attempt fails
  4. Wait interval_seconds * backoff_rate (default: 2 seconds)
  5. Third attempt fails
  6. Wait interval_seconds * backoff_rate² (default: 4 seconds)
  7. Continue until max_attempts is reached
import time

@DBOS.step(max_attempts=4, interval_seconds=1.0, backoff_rate=2.0)
def retryable_step():
    DBOS.logger.info(f"Attempt at {time.time()}")
    # Fails 3 times, succeeds on 4th attempt
    # Logs will show: 0s, 1s, 3s, 7s (exponential backoff)
    if not external_service_ready():
        raise Exception("Service not ready")
    return "success"
If a step exceeds max_attempts, DBOS raises DBOSMaxStepRetriesExceeded and the workflow fails. You can catch this in your workflow to implement compensating actions.

When to use steps

✅ Use steps for:

  • External API calls (REST, gRPC, third-party services)
  • File I/O operations (reading/writing files)
  • Getting current time or generating random numbers
  • Calling non-DBOS code that may be non-deterministic
  • Long-running computations that you want to checkpoint
@DBOS.step()
def get_current_timestamp():
    from datetime import datetime
    return datetime.now().isoformat()

@DBOS.step()
def generate_random_id():
    import uuid
    return str(uuid.uuid4())

@DBOS.step()
def send_email(recipient: str, subject: str, body: str):
    # Email service call
    email_client.send(to=recipient, subject=subject, body=body)
    return {"sent": True}

❌ Don’t use steps for:

  • Database queries (use @DBOS.transaction() instead)
  • Simple deterministic calculations (just write them in the workflow)
  • Calling other workflows (call workflows directly)

Steps vs transactions

FeatureStepTransaction
PurposeExternal operations, non-deterministic logicDatabase operations
ExecutionCan make any external callsMust only access the database
RetryAutomatic with exponential backoffAutomatic on deadlock/serialization errors
Database accessNot recommendedRequired
IdempotencyEnforced by DBOSEnforced by DBOS
# Use steps for external calls
@DBOS.step()
def call_payment_processor(amount: float):
    return payment_api.charge(amount)

# Use transactions for database operations
@DBOS.transaction()
def record_payment(payment_id: str, amount: float):
    # Access database via SQLAlchemy session
    with DBOS.sql_session() as session:
        payment = Payment(id=payment_id, amount=amount)
        session.add(payment)

Async steps

Steps can be async functions for improved concurrency:
import httpx

@DBOS.step()
async def async_api_call(url: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

@DBOS.workflow()
async def async_workflow():
    result = await async_api_call("https://api.example.com/data")
    return result

Step context

Access step metadata and utilities via the context:
@DBOS.step()
def contextual_step(item_id: str):
    ctx = DBOS.get_context()
    
    # Access step information
    step_name = ctx.name
    workflow_id = ctx.workflow_id
    
    # Log with automatic context
    DBOS.logger.info(f"Processing item {item_id} in step {step_name}")
    
    # Access authenticated user (if set)
    user = ctx.authenticated_user
    
    return {"item_id": item_id, "processed_by": user}

Disabling retries

For operations that should not be retried (like sending notifications), disable retries:
@DBOS.step(retries_allowed=False)
def send_notification(message: str):
    """Send once, never retry"""
    notification_service.send(message)
Even with retries_allowed=False, the step will still be checkpointed and won’t re-execute during workflow recovery.

Error handling

Handle step errors in your workflow:
from dbos.error import DBOSMaxStepRetriesExceeded

@DBOS.workflow()
def resilient_workflow(order_id: str):
    try:
        # Try to process payment
        payment_result = charge_payment(order_id)
    except DBOSMaxStepRetriesExceeded:
        # Payment failed after all retries
        DBOS.logger.error(f"Payment failed for order {order_id}")
        cancel_order(order_id)
        raise
    
    # Continue with fulfillment
    ship_order(order_id)
    return "completed"

Best practices

Each step should do one thing well. Break complex operations into multiple steps for better granularity and observability.
# ❌ One big step
@DBOS.step()
def process_everything(data):
    api_result = call_api(data)
    transformed = transform_data(api_result)
    file_path = save_to_file(transformed)
    return file_path

# ✅ Focused steps
@DBOS.step()
def call_api(data):
    return api.process(data)

@DBOS.step()
def transform_data(api_result):
    return transform(api_result)

@DBOS.step()
def save_to_file(data):
    return write_file(data)
Design steps so that running them multiple times with the same input produces the same result.
# ❌ Not idempotent
@DBOS.step()
def increment_counter():
    return counter.increment()  # Different result each time

# ✅ Idempotent
@DBOS.step()
def set_counter(value: int):
    return counter.set(value)  # Same result for same input
Set max_attempts and interval_seconds based on the operation’s characteristics.
# Short timeout for fast operations
@DBOS.step(max_attempts=3, interval_seconds=0.5)
def fast_api_call():
    pass

# Longer timeout for slow operations
@DBOS.step(max_attempts=10, interval_seconds=5.0)
def slow_processing_job():
    pass
Custom step names improve observability and debugging.
@DBOS.step(name="stripe_payment_charge")
def charge_payment(amount: float):
    return stripe.charge(amount)

Next steps

Transactions

Learn about database transactions

Workflows

Learn about durable workflows

Error handling guide

Advanced error handling patterns

Workflow tutorial

Build workflows with steps

Build docs developers (and LLMs) love