Introduction
DBOS provides durable notification primitives (DBOS.send() and DBOS.recv()) that enable reliable communication between workflows. These operations are durable, meaning they survive process restarts and failures.
Basic Send and Receive
Send messages to workflows and receive them reliably:from dbos import DBOS
@DBOS.workflow()
def receiver_workflow() -> str:
"""Wait for and process a message."""
# Wait for a message (blocks until received)
message = DBOS.recv()
# Process the message
DBOS.logger.info(f"Received message: {message}")
return f"Processed: {message}"
@DBOS.workflow()
def sender_workflow() -> str:
"""Start a workflow and send it a message."""
# Start receiver workflow
handle = DBOS.start_workflow(receiver_workflow)
receiver_id = handle.get_workflow_id()
# Do some work
DBOS.sleep(2)
# Send message to receiver
DBOS.send(receiver_id, "Hello from sender!")
# Wait for receiver to complete
result = handle.get_result()
return result
result = sender_workflow()
print(result) # "Processed: Hello from sender!"
recv() blocks the workflow until a message is received. The workflow can be safely restarted while waiting.Topics
Organize messages by topic:@DBOS.workflow()
def multi_topic_receiver() -> dict:
"""Receive messages on different topics."""
# Receive on specific topics
command = DBOS.recv(topic="command")
data = DBOS.recv(topic="data")
status = DBOS.recv(topic="status")
return {
"command": command,
"data": data,
"status": status
}
@DBOS.workflow()
def multi_topic_sender() -> dict:
"""Send messages on different topics."""
# Start receiver
handle = DBOS.start_workflow(multi_topic_receiver)
workflow_id = handle.get_workflow_id()
# Send on different topics (order doesn't matter)
DBOS.send(workflow_id, "active", topic="status")
DBOS.send(workflow_id, "start", topic="command")
DBOS.send(workflow_id, {"value": 42}, topic="data")
result = handle.get_result()
return result
Use topics to organize different types of messages and avoid ordering dependencies.
Timeouts
Handle missing messages gracefully:from dbos import DBOS
@DBOS.workflow()
def timeout_workflow() -> str:
"""Handle receive timeouts."""
try:
# Wait up to 30 seconds for a message
message = DBOS.recv(timeout_seconds=30)
return f"Received: {message}"
except TimeoutError:
# No message received within timeout
DBOS.logger.warning("No message received, using default")
return "Default value"
@DBOS.workflow()
def conditional_sender(should_send: bool) -> str:
"""Conditionally send a message."""
handle = DBOS.start_workflow(timeout_workflow)
if should_send:
DBOS.send(handle.get_workflow_id(), "Important data")
# Receiver handles both cases
return handle.get_result()
result1 = conditional_sender(True) # "Received: Important data"
result2 = conditional_sender(False) # "Default value"
Workflow Coordination
Coordinate complex workflows with notifications:@DBOS.workflow()
def worker_workflow(worker_id: str) -> dict:
"""Worker that waits for tasks."""
results = []
# Process tasks until told to stop
while True:
# Wait for next task or stop signal
message = DBOS.recv(timeout_seconds=60)
if message == "STOP":
break
# Process task
result = process_task(message)
results.append(result)
return {"worker_id": worker_id, "results": results}
@DBOS.workflow()
def coordinator_workflow(tasks: list[dict]) -> list[dict]:
"""Coordinate multiple workers."""
# Start workers
workers = []
for i in range(3):
handle = DBOS.start_workflow(worker_workflow, f"worker-{i}")
workers.append(handle)
# Distribute tasks to workers
for i, task in enumerate(tasks):
worker_handle = workers[i % len(workers)]
DBOS.send(worker_handle.get_workflow_id(), task)
# Signal workers to stop
for handle in workers:
DBOS.send(handle.get_workflow_id(), "STOP")
# Collect results
results = [handle.get_result() for handle in workers]
return results
@DBOS.step()
def process_task(task: dict) -> dict:
"""Process a single task."""
# Task processing logic
return {"task_id": task["id"], "status": "completed"}
External Notifications
Send notifications from outside workflows using DBOSClient:from dbos import DBOS, DBOSClient, SetWorkflowID
@DBOS.workflow()
def approval_workflow(request_id: str) -> dict:
"""Wait for approval before proceeding."""
# Send approval request
send_approval_request(request_id)
# Wait for approval (up to 24 hours)
try:
approval = DBOS.recv(topic="approval", timeout_seconds=86400)
if approval["approved"]:
# Process approved request
process_request(request_id)
return {"status": "approved", "request_id": request_id}
else:
return {
"status": "rejected",
"reason": approval.get("reason"),
"request_id": request_id
}
except TimeoutError:
return {"status": "timeout", "request_id": request_id}
@DBOS.step()
def send_approval_request(request_id: str) -> None:
"""Send approval request email."""
email_service.send(
to="[email protected]",
subject=f"Approval Required: {request_id}",
body=f"Please approve request {request_id}"
)
@DBOS.step()
def process_request(request_id: str) -> None:
"""Process approved request."""
# Processing logic
pass
# Start approval workflow
request_id = "req-12345"
workflow_id = f"approval-{request_id}"
with SetWorkflowID(workflow_id):
handle = DBOS.start_workflow(approval_workflow, request_id)
# Later, from external system (e.g., web API)
def approve_request(request_id: str, approved: bool, reason: str = None):
"""Approve or reject a request from external system."""
# Connect to DBOS
client = DBOSClient(
system_database_url="postgresql://user:pass@localhost/dbos"
)
# Send approval to workflow
workflow_id = f"approval-{request_id}"
client.send(
workflow_id,
{"approved": approved, "reason": reason},
topic="approval"
)
External systems can send messages to workflows using
DBOSClient.send(), enabling integration with approval systems, admin panels, etc.Event-Driven Workflows
Respond to multiple events:@DBOS.workflow()
def event_driven_workflow() -> dict:
"""React to different events."""
state = {"running": True, "events_processed": 0}
while state["running"]:
try:
# Wait for events with timeout
event = DBOS.recv(timeout_seconds=60)
if event["type"] == "data":
# Process data event
process_data(event["payload"])
state["events_processed"] += 1
elif event["type"] == "config_update":
# Update configuration
update_config(event["config"])
elif event["type"] == "shutdown":
# Graceful shutdown
state["running"] = False
cleanup()
except TimeoutError:
# Periodic maintenance during idle time
perform_maintenance()
return state
@DBOS.step()
def process_data(payload: dict) -> None:
"""Process data event."""
# Data processing logic
pass
@DBOS.step()
def update_config(config: dict) -> None:
"""Update workflow configuration."""
# Configuration update logic
pass
@DBOS.step()
def cleanup() -> None:
"""Clean up resources."""
# Cleanup logic
pass
@DBOS.step()
def perform_maintenance() -> None:
"""Perform periodic maintenance."""
# Maintenance logic
pass
Release Gate Pattern
Control workflow progression:@DBOS.workflow()
def gated_deployment_workflow(version: str) -> dict:
"""Deploy with manual release gates."""
# Deploy to staging
deploy_to_staging(version)
DBOS.logger.info("Deployed to staging, awaiting approval")
# Gate 1: Wait for staging approval
DBOS.recv(topic="staging_approved")
# Deploy to production
deploy_to_production(version)
DBOS.logger.info("Deployed to production, monitoring")
# Gate 2: Wait for production verification
verification = DBOS.recv(
topic="production_verified",
timeout_seconds=3600 # 1 hour
)
if verification["success"]:
finalize_deployment(version)
return {"status": "success", "version": version}
else:
rollback_deployment(version)
return {"status": "rolled_back", "reason": verification["reason"]}
@DBOS.step()
def deploy_to_staging(version: str) -> None:
"""Deploy to staging environment."""
# Staging deployment logic
pass
@DBOS.step()
def deploy_to_production(version: str) -> None:
"""Deploy to production environment."""
# Production deployment logic
pass
@DBOS.step()
def finalize_deployment(version: str) -> None:
"""Finalize successful deployment."""
# Finalization logic
pass
@DBOS.step()
def rollback_deployment(version: str) -> None:
"""Rollback failed deployment."""
# Rollback logic
pass
Async Send and Receive
Use notifications in async workflows:import asyncio
from dbos import DBOS
@DBOS.workflow()
async def async_receiver() -> list:
"""Async workflow that receives messages."""
messages = []
for i in range(3):
# Async receive
message = await DBOS.recv_async(topic=f"topic-{i}")
messages.append(message)
return messages
@DBOS.workflow()
async def async_sender() -> list:
"""Async workflow that sends messages."""
# Start receiver
handle = await DBOS.start_workflow_async(async_receiver)
workflow_id = handle.get_workflow_id()
# Send messages asynchronously
await DBOS.send_async(workflow_id, "msg-0", topic="topic-0")
await DBOS.send_async(workflow_id, "msg-1", topic="topic-1")
await DBOS.send_async(workflow_id, "msg-2", topic="topic-2")
# Wait for result
result = await handle.get_result()
return result
Idempotent Sends
Ensure send operations are idempotent:from dbos import DBOSClient
# Send with idempotency key
client = DBOSClient(
system_database_url="postgresql://user:pass@localhost/dbos"
)
# First send
client.send(
destination_id="workflow-123",
message="important data",
topic="data",
idempotency_key="send-key-1"
)
# Duplicate send with same key - no duplicate message delivered
client.send(
destination_id="workflow-123",
message="important data",
topic="data",
idempotency_key="send-key-1"
)
Use idempotency keys when sending from external systems to prevent duplicate message delivery.
Complete Example: Order Fulfillment System
from dbos import DBOS, DBOSClient, SetWorkflowID
import sqlalchemy as sa
from datetime import datetime
from typing import Optional
@DBOS.transaction()
def create_order(order_id: str, customer_id: str, items: list) -> None:
"""Create order in database."""
DBOS.sql_session.execute(
sa.text(
"INSERT INTO orders (order_id, customer_id, status, created_at) "
"VALUES (:oid, :cid, 'pending', :created)"
),
{"oid": order_id, "cid": customer_id, "created": datetime.now()}
)
for item in items:
DBOS.sql_session.execute(
sa.text(
"INSERT INTO order_items (order_id, product_id, quantity) "
"VALUES (:oid, :pid, :qty)"
),
{"oid": order_id, "pid": item["product_id"], "qty": item["quantity"]}
)
@DBOS.transaction()
def update_order_status(order_id: str, status: str) -> None:
"""Update order status."""
DBOS.sql_session.execute(
sa.text(
"UPDATE orders SET status = :status, updated_at = :updated "
"WHERE order_id = :oid"
),
{"oid": order_id, "status": status, "updated": datetime.now()}
)
@DBOS.step()
def reserve_inventory(items: list) -> dict:
"""Reserve inventory for order."""
# Call inventory service
reservation_id = inventory_service.reserve(items)
return {"reservation_id": reservation_id}
@DBOS.step()
def charge_customer(customer_id: str, amount: float) -> dict:
"""Charge customer payment."""
# Call payment service
transaction_id = payment_service.charge(customer_id, amount)
return {"transaction_id": transaction_id}
@DBOS.step()
def ship_order(order_id: str, items: list) -> dict:
"""Ship order to customer."""
# Call shipping service
tracking_id = shipping_service.create_shipment(order_id, items)
return {"tracking_id": tracking_id}
@DBOS.step()
def send_notification(customer_id: str, message: str) -> None:
"""Send notification to customer."""
notification_service.send(customer_id, message)
@DBOS.workflow()
def order_fulfillment_workflow(
order_id: str,
customer_id: str,
items: list,
total_amount: float
) -> dict:
"""Orchestrate order fulfillment with approval gates."""
DBOS.logger.info(f"Starting fulfillment for order {order_id}")
# Create order
create_order(order_id, customer_id, items)
update_order_status(order_id, "created")
# Reserve inventory
reservation = reserve_inventory(items)
update_order_status(order_id, "inventory_reserved")
# Wait for fraud check (automated or manual)
fraud_check = DBOS.recv(
topic="fraud_check",
timeout_seconds=300 # 5 minutes
)
if not fraud_check["approved"]:
update_order_status(order_id, "cancelled_fraud")
return {
"status": "cancelled",
"reason": "fraud_check_failed",
"order_id": order_id
}
# Charge customer
try:
payment = charge_customer(customer_id, total_amount)
update_order_status(order_id, "paid")
except Exception as e:
update_order_status(order_id, "payment_failed")
return {
"status": "failed",
"reason": "payment_error",
"error": str(e),
"order_id": order_id
}
# Ship order
shipment = ship_order(order_id, items)
update_order_status(order_id, "shipped")
# Notify customer
send_notification(
customer_id,
f"Your order {order_id} has been shipped. Tracking: {shipment['tracking_id']}"
)
# Wait for delivery confirmation (webhook from carrier)
delivery = DBOS.recv(
topic="delivery_confirmed",
timeout_seconds=604800 # 7 days
)
if delivery["delivered"]:
update_order_status(order_id, "delivered")
send_notification(
customer_id,
f"Your order {order_id} has been delivered!"
)
else:
update_order_status(order_id, "delivery_failed")
return {
"status": "completed",
"order_id": order_id,
"reservation_id": reservation["reservation_id"],
"transaction_id": payment["transaction_id"],
"tracking_id": shipment["tracking_id"]
}
# API endpoint to create order
def create_order_endpoint(order_data: dict) -> dict:
"""Create new order."""
order_id = order_data["order_id"]
with SetWorkflowID(f"order-{order_id}"):
handle = DBOS.start_workflow(
order_fulfillment_workflow,
order_id,
order_data["customer_id"],
order_data["items"],
order_data["total_amount"]
)
return {
"order_id": order_id,
"workflow_id": handle.get_workflow_id()
}
# External fraud check service callback
def fraud_check_callback(order_id: str, approved: bool, risk_score: float):
"""Handle fraud check result from external service."""
client = DBOSClient(
system_database_url="postgresql://user:pass@localhost/dbos"
)
workflow_id = f"order-{order_id}"
client.send(
workflow_id,
{"approved": approved, "risk_score": risk_score},
topic="fraud_check"
)
# Shipping carrier webhook
def delivery_webhook(order_id: str, delivered: bool, timestamp: str):
"""Handle delivery confirmation from carrier."""
client = DBOSClient(
system_database_url="postgresql://user:pass@localhost/dbos"
)
workflow_id = f"order-{order_id}"
client.send(
workflow_id,
{"delivered": delivered, "timestamp": timestamp},
topic="delivery_confirmed"
)
Best Practices
Choose Appropriate Timeouts
Choose Appropriate Timeouts
- Use short timeouts for synchronous operations (seconds to minutes)
- Use long timeouts for human approvals (hours to days)
- Always handle TimeoutError gracefully
- Consider what happens if no message arrives
Use Topics Wisely
Use Topics Wisely
- Use topics to organize message types
- Avoid dependencies on message ordering
- Document expected topics for each workflow
- Use meaningful topic names
External Integration
External Integration
- Use DBOSClient for external systems
- Always include idempotency keys
- Validate messages before sending
- Log all external communications
Error Handling
Error Handling
- Handle receive timeouts explicitly
- Validate message structure and content
- Log unexpected messages
- Consider dead letter handling
Next Steps
Workflow Tutorial
Learn more about building workflows
Event Processing
Build event-driven applications
Workflow Management
Programmatically manage workflows
Error Handling
Handle failures and implement recovery