Skip to main content

Introduction

DBOS provides durable notification primitives (DBOS.send() and DBOS.recv()) that enable reliable communication between workflows. These operations are durable, meaning they survive process restarts and failures.

Basic Send and Receive

Send messages to workflows and receive them reliably:
from dbos import DBOS

@DBOS.workflow()
def receiver_workflow() -> str:
    """Wait for and process a message."""
    
    # Wait for a message (blocks until received)
    message = DBOS.recv()
    
    # Process the message
    DBOS.logger.info(f"Received message: {message}")
    
    return f"Processed: {message}"

@DBOS.workflow()
def sender_workflow() -> str:
    """Start a workflow and send it a message."""
    
    # Start receiver workflow
    handle = DBOS.start_workflow(receiver_workflow)
    receiver_id = handle.get_workflow_id()
    
    # Do some work
    DBOS.sleep(2)
    
    # Send message to receiver
    DBOS.send(receiver_id, "Hello from sender!")
    
    # Wait for receiver to complete
    result = handle.get_result()
    
    return result

result = sender_workflow()
print(result)  # "Processed: Hello from sender!"
recv() blocks the workflow until a message is received. The workflow can be safely restarted while waiting.

Topics

Organize messages by topic:
@DBOS.workflow()
def multi_topic_receiver() -> dict:
    """Receive messages on different topics."""
    
    # Receive on specific topics
    command = DBOS.recv(topic="command")
    data = DBOS.recv(topic="data")
    status = DBOS.recv(topic="status")
    
    return {
        "command": command,
        "data": data,
        "status": status
    }

@DBOS.workflow()
def multi_topic_sender() -> dict:
    """Send messages on different topics."""
    
    # Start receiver
    handle = DBOS.start_workflow(multi_topic_receiver)
    workflow_id = handle.get_workflow_id()
    
    # Send on different topics (order doesn't matter)
    DBOS.send(workflow_id, "active", topic="status")
    DBOS.send(workflow_id, "start", topic="command")
    DBOS.send(workflow_id, {"value": 42}, topic="data")
    
    result = handle.get_result()
    return result
Use topics to organize different types of messages and avoid ordering dependencies.

Timeouts

Handle missing messages gracefully:
from dbos import DBOS

@DBOS.workflow()
def timeout_workflow() -> str:
    """Handle receive timeouts."""
    
    try:
        # Wait up to 30 seconds for a message
        message = DBOS.recv(timeout_seconds=30)
        return f"Received: {message}"
    
    except TimeoutError:
        # No message received within timeout
        DBOS.logger.warning("No message received, using default")
        return "Default value"

@DBOS.workflow()
def conditional_sender(should_send: bool) -> str:
    """Conditionally send a message."""
    
    handle = DBOS.start_workflow(timeout_workflow)
    
    if should_send:
        DBOS.send(handle.get_workflow_id(), "Important data")
    
    # Receiver handles both cases
    return handle.get_result()

result1 = conditional_sender(True)   # "Received: Important data"
result2 = conditional_sender(False)  # "Default value"

Workflow Coordination

Coordinate complex workflows with notifications:
@DBOS.workflow()
def worker_workflow(worker_id: str) -> dict:
    """Worker that waits for tasks."""
    
    results = []
    
    # Process tasks until told to stop
    while True:
        # Wait for next task or stop signal
        message = DBOS.recv(timeout_seconds=60)
        
        if message == "STOP":
            break
        
        # Process task
        result = process_task(message)
        results.append(result)
    
    return {"worker_id": worker_id, "results": results}

@DBOS.workflow()
def coordinator_workflow(tasks: list[dict]) -> list[dict]:
    """Coordinate multiple workers."""
    
    # Start workers
    workers = []
    for i in range(3):
        handle = DBOS.start_workflow(worker_workflow, f"worker-{i}")
        workers.append(handle)
    
    # Distribute tasks to workers
    for i, task in enumerate(tasks):
        worker_handle = workers[i % len(workers)]
        DBOS.send(worker_handle.get_workflow_id(), task)
    
    # Signal workers to stop
    for handle in workers:
        DBOS.send(handle.get_workflow_id(), "STOP")
    
    # Collect results
    results = [handle.get_result() for handle in workers]
    
    return results

@DBOS.step()
def process_task(task: dict) -> dict:
    """Process a single task."""
    # Task processing logic
    return {"task_id": task["id"], "status": "completed"}

External Notifications

Send notifications from outside workflows using DBOSClient:
from dbos import DBOS, DBOSClient, SetWorkflowID

@DBOS.workflow()
def approval_workflow(request_id: str) -> dict:
    """Wait for approval before proceeding."""
    
    # Send approval request
    send_approval_request(request_id)
    
    # Wait for approval (up to 24 hours)
    try:
        approval = DBOS.recv(topic="approval", timeout_seconds=86400)
        
        if approval["approved"]:
            # Process approved request
            process_request(request_id)
            return {"status": "approved", "request_id": request_id}
        else:
            return {
                "status": "rejected",
                "reason": approval.get("reason"),
                "request_id": request_id
            }
    
    except TimeoutError:
        return {"status": "timeout", "request_id": request_id}

@DBOS.step()
def send_approval_request(request_id: str) -> None:
    """Send approval request email."""
    email_service.send(
        to="[email protected]",
        subject=f"Approval Required: {request_id}",
        body=f"Please approve request {request_id}"
    )

@DBOS.step()
def process_request(request_id: str) -> None:
    """Process approved request."""
    # Processing logic
    pass

# Start approval workflow
request_id = "req-12345"
workflow_id = f"approval-{request_id}"

with SetWorkflowID(workflow_id):
    handle = DBOS.start_workflow(approval_workflow, request_id)

# Later, from external system (e.g., web API)
def approve_request(request_id: str, approved: bool, reason: str = None):
    """Approve or reject a request from external system."""
    
    # Connect to DBOS
    client = DBOSClient(
        system_database_url="postgresql://user:pass@localhost/dbos"
    )
    
    # Send approval to workflow
    workflow_id = f"approval-{request_id}"
    client.send(
        workflow_id,
        {"approved": approved, "reason": reason},
        topic="approval"
    )
External systems can send messages to workflows using DBOSClient.send(), enabling integration with approval systems, admin panels, etc.

Event-Driven Workflows

Respond to multiple events:
@DBOS.workflow()
def event_driven_workflow() -> dict:
    """React to different events."""
    
    state = {"running": True, "events_processed": 0}
    
    while state["running"]:
        try:
            # Wait for events with timeout
            event = DBOS.recv(timeout_seconds=60)
            
            if event["type"] == "data":
                # Process data event
                process_data(event["payload"])
                state["events_processed"] += 1
            
            elif event["type"] == "config_update":
                # Update configuration
                update_config(event["config"])
            
            elif event["type"] == "shutdown":
                # Graceful shutdown
                state["running"] = False
                cleanup()
        
        except TimeoutError:
            # Periodic maintenance during idle time
            perform_maintenance()
    
    return state

@DBOS.step()
def process_data(payload: dict) -> None:
    """Process data event."""
    # Data processing logic
    pass

@DBOS.step()
def update_config(config: dict) -> None:
    """Update workflow configuration."""
    # Configuration update logic
    pass

@DBOS.step()
def cleanup() -> None:
    """Clean up resources."""
    # Cleanup logic
    pass

@DBOS.step()
def perform_maintenance() -> None:
    """Perform periodic maintenance."""
    # Maintenance logic
    pass

Release Gate Pattern

Control workflow progression:
@DBOS.workflow()
def gated_deployment_workflow(version: str) -> dict:
    """Deploy with manual release gates."""
    
    # Deploy to staging
    deploy_to_staging(version)
    DBOS.logger.info("Deployed to staging, awaiting approval")
    
    # Gate 1: Wait for staging approval
    DBOS.recv(topic="staging_approved")
    
    # Deploy to production
    deploy_to_production(version)
    DBOS.logger.info("Deployed to production, monitoring")
    
    # Gate 2: Wait for production verification
    verification = DBOS.recv(
        topic="production_verified",
        timeout_seconds=3600  # 1 hour
    )
    
    if verification["success"]:
        finalize_deployment(version)
        return {"status": "success", "version": version}
    else:
        rollback_deployment(version)
        return {"status": "rolled_back", "reason": verification["reason"]}

@DBOS.step()
def deploy_to_staging(version: str) -> None:
    """Deploy to staging environment."""
    # Staging deployment logic
    pass

@DBOS.step()
def deploy_to_production(version: str) -> None:
    """Deploy to production environment."""
    # Production deployment logic
    pass

@DBOS.step()
def finalize_deployment(version: str) -> None:
    """Finalize successful deployment."""
    # Finalization logic
    pass

@DBOS.step()
def rollback_deployment(version: str) -> None:
    """Rollback failed deployment."""
    # Rollback logic
    pass

Async Send and Receive

Use notifications in async workflows:
import asyncio
from dbos import DBOS

@DBOS.workflow()
async def async_receiver() -> list:
    """Async workflow that receives messages."""
    
    messages = []
    
    for i in range(3):
        # Async receive
        message = await DBOS.recv_async(topic=f"topic-{i}")
        messages.append(message)
    
    return messages

@DBOS.workflow()
async def async_sender() -> list:
    """Async workflow that sends messages."""
    
    # Start receiver
    handle = await DBOS.start_workflow_async(async_receiver)
    workflow_id = handle.get_workflow_id()
    
    # Send messages asynchronously
    await DBOS.send_async(workflow_id, "msg-0", topic="topic-0")
    await DBOS.send_async(workflow_id, "msg-1", topic="topic-1")
    await DBOS.send_async(workflow_id, "msg-2", topic="topic-2")
    
    # Wait for result
    result = await handle.get_result()
    return result

Idempotent Sends

Ensure send operations are idempotent:
from dbos import DBOSClient

# Send with idempotency key
client = DBOSClient(
    system_database_url="postgresql://user:pass@localhost/dbos"
)

# First send
client.send(
    destination_id="workflow-123",
    message="important data",
    topic="data",
    idempotency_key="send-key-1"
)

# Duplicate send with same key - no duplicate message delivered
client.send(
    destination_id="workflow-123",
    message="important data",
    topic="data",
    idempotency_key="send-key-1"
)
Use idempotency keys when sending from external systems to prevent duplicate message delivery.

Complete Example: Order Fulfillment System

from dbos import DBOS, DBOSClient, SetWorkflowID
import sqlalchemy as sa
from datetime import datetime
from typing import Optional

@DBOS.transaction()
def create_order(order_id: str, customer_id: str, items: list) -> None:
    """Create order in database."""
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (order_id, customer_id, status, created_at) "
            "VALUES (:oid, :cid, 'pending', :created)"
        ),
        {"oid": order_id, "cid": customer_id, "created": datetime.now()}
    )
    
    for item in items:
        DBOS.sql_session.execute(
            sa.text(
                "INSERT INTO order_items (order_id, product_id, quantity) "
                "VALUES (:oid, :pid, :qty)"
            ),
            {"oid": order_id, "pid": item["product_id"], "qty": item["quantity"]}
        )

@DBOS.transaction()
def update_order_status(order_id: str, status: str) -> None:
    """Update order status."""
    DBOS.sql_session.execute(
        sa.text(
            "UPDATE orders SET status = :status, updated_at = :updated "
            "WHERE order_id = :oid"
        ),
        {"oid": order_id, "status": status, "updated": datetime.now()}
    )

@DBOS.step()
def reserve_inventory(items: list) -> dict:
    """Reserve inventory for order."""
    # Call inventory service
    reservation_id = inventory_service.reserve(items)
    return {"reservation_id": reservation_id}

@DBOS.step()
def charge_customer(customer_id: str, amount: float) -> dict:
    """Charge customer payment."""
    # Call payment service
    transaction_id = payment_service.charge(customer_id, amount)
    return {"transaction_id": transaction_id}

@DBOS.step()
def ship_order(order_id: str, items: list) -> dict:
    """Ship order to customer."""
    # Call shipping service
    tracking_id = shipping_service.create_shipment(order_id, items)
    return {"tracking_id": tracking_id}

@DBOS.step()
def send_notification(customer_id: str, message: str) -> None:
    """Send notification to customer."""
    notification_service.send(customer_id, message)

@DBOS.workflow()
def order_fulfillment_workflow(
    order_id: str,
    customer_id: str,
    items: list,
    total_amount: float
) -> dict:
    """Orchestrate order fulfillment with approval gates."""
    
    DBOS.logger.info(f"Starting fulfillment for order {order_id}")
    
    # Create order
    create_order(order_id, customer_id, items)
    update_order_status(order_id, "created")
    
    # Reserve inventory
    reservation = reserve_inventory(items)
    update_order_status(order_id, "inventory_reserved")
    
    # Wait for fraud check (automated or manual)
    fraud_check = DBOS.recv(
        topic="fraud_check",
        timeout_seconds=300  # 5 minutes
    )
    
    if not fraud_check["approved"]:
        update_order_status(order_id, "cancelled_fraud")
        return {
            "status": "cancelled",
            "reason": "fraud_check_failed",
            "order_id": order_id
        }
    
    # Charge customer
    try:
        payment = charge_customer(customer_id, total_amount)
        update_order_status(order_id, "paid")
    except Exception as e:
        update_order_status(order_id, "payment_failed")
        return {
            "status": "failed",
            "reason": "payment_error",
            "error": str(e),
            "order_id": order_id
        }
    
    # Ship order
    shipment = ship_order(order_id, items)
    update_order_status(order_id, "shipped")
    
    # Notify customer
    send_notification(
        customer_id,
        f"Your order {order_id} has been shipped. Tracking: {shipment['tracking_id']}"
    )
    
    # Wait for delivery confirmation (webhook from carrier)
    delivery = DBOS.recv(
        topic="delivery_confirmed",
        timeout_seconds=604800  # 7 days
    )
    
    if delivery["delivered"]:
        update_order_status(order_id, "delivered")
        send_notification(
            customer_id,
            f"Your order {order_id} has been delivered!"
        )
    else:
        update_order_status(order_id, "delivery_failed")
    
    return {
        "status": "completed",
        "order_id": order_id,
        "reservation_id": reservation["reservation_id"],
        "transaction_id": payment["transaction_id"],
        "tracking_id": shipment["tracking_id"]
    }

# API endpoint to create order
def create_order_endpoint(order_data: dict) -> dict:
    """Create new order."""
    
    order_id = order_data["order_id"]
    
    with SetWorkflowID(f"order-{order_id}"):
        handle = DBOS.start_workflow(
            order_fulfillment_workflow,
            order_id,
            order_data["customer_id"],
            order_data["items"],
            order_data["total_amount"]
        )
    
    return {
        "order_id": order_id,
        "workflow_id": handle.get_workflow_id()
    }

# External fraud check service callback
def fraud_check_callback(order_id: str, approved: bool, risk_score: float):
    """Handle fraud check result from external service."""
    
    client = DBOSClient(
        system_database_url="postgresql://user:pass@localhost/dbos"
    )
    
    workflow_id = f"order-{order_id}"
    
    client.send(
        workflow_id,
        {"approved": approved, "risk_score": risk_score},
        topic="fraud_check"
    )

# Shipping carrier webhook
def delivery_webhook(order_id: str, delivered: bool, timestamp: str):
    """Handle delivery confirmation from carrier."""
    
    client = DBOSClient(
        system_database_url="postgresql://user:pass@localhost/dbos"
    )
    
    workflow_id = f"order-{order_id}"
    
    client.send(
        workflow_id,
        {"delivered": delivered, "timestamp": timestamp},
        topic="delivery_confirmed"
    )

Best Practices

  • Use short timeouts for synchronous operations (seconds to minutes)
  • Use long timeouts for human approvals (hours to days)
  • Always handle TimeoutError gracefully
  • Consider what happens if no message arrives
  • Use topics to organize message types
  • Avoid dependencies on message ordering
  • Document expected topics for each workflow
  • Use meaningful topic names
  • Use DBOSClient for external systems
  • Always include idempotency keys
  • Validate messages before sending
  • Log all external communications
  • Handle receive timeouts explicitly
  • Validate message structure and content
  • Log unexpected messages
  • Consider dead letter handling

Next Steps

Workflow Tutorial

Learn more about building workflows

Event Processing

Build event-driven applications

Workflow Management

Programmatically manage workflows

Error Handling

Handle failures and implement recovery

Build docs developers (and LLMs) love