Introduction
DBOS makes it easy to build exactly-once event processors by combining idempotent workflows withSetWorkflowID. This ensures events are processed exactly once, even in the face of retries, crashes, or duplicate deliveries.
Exactly-Once Processing Pattern
The key to exactly-once processing is using a deterministic workflow ID based on the event:from dbos import DBOS, SetWorkflowID
import sqlalchemy as sa
@DBOS.transaction()
def record_event(event_id: str, event_type: str, data: dict) -> None:
"""Store event in database."""
DBOS.sql_session.execute(
sa.text(
"INSERT INTO events (event_id, event_type, data, processed_at) "
"VALUES (:event_id, :event_type, :data, NOW())"
),
{"event_id": event_id, "event_type": event_type, "data": str(data)}
)
@DBOS.step()
def process_event_logic(event_type: str, data: dict) -> dict:
"""Business logic for processing event."""
if event_type == "order_placed":
return process_order(data)
elif event_type == "payment_received":
return process_payment(data)
else:
return {"status": "skipped", "reason": "unknown_event_type"}
@DBOS.workflow()
def process_event(event_id: str, event_type: str, data: dict) -> dict:
"""Process an event exactly once."""
DBOS.logger.info(f"Processing event {event_id} of type {event_type}")
# Record the event
record_event(event_id, event_type, data)
# Process the event
result = process_event_logic(event_type, data)
DBOS.logger.info(f"Event {event_id} processed successfully")
return result
def handle_incoming_event(event_id: str, event_type: str, data: dict) -> dict:
"""Entry point for event processing."""
# Use event ID as workflow ID for idempotency
with SetWorkflowID(f"event-{event_id}"):
result = process_event(event_id, event_type, data)
return result
# Usage
result1 = handle_incoming_event("evt-123", "order_placed", {"order_id": "ord-456"})
print(result1) # Processes the event
# Duplicate delivery - returns cached result without reprocessing
result2 = handle_incoming_event("evt-123", "order_placed", {"order_id": "ord-456"})
print(result2) # Same result, event processed exactly once
By using the event ID as the workflow ID, DBOS automatically deduplicates duplicate event deliveries.
Webhook Processing
Process webhook events exactly once:from fastapi import FastAPI, Request, HTTPException
from dbos import DBOS, SetWorkflowID
import hashlib
import hmac
app = FastAPI()
@DBOS.step()
def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify webhook signature."""
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@DBOS.transaction()
def store_webhook_event(webhook_id: str, source: str, event_type: str, payload: dict) -> None:
"""Store webhook for auditing."""
DBOS.sql_session.execute(
sa.text(
"INSERT INTO webhook_events (webhook_id, source, event_type, payload, received_at) "
"VALUES (:id, :source, :type, :payload, NOW())"
),
{"id": webhook_id, "source": source, "type": event_type, "payload": str(payload)}
)
@DBOS.step()
def process_webhook_payload(source: str, event_type: str, payload: dict) -> dict:
"""Process webhook business logic."""
if source == "stripe" and event_type == "payment.succeeded":
return handle_payment_success(payload)
elif source == "github" and event_type == "push":
return handle_code_push(payload)
else:
return {"status": "ignored"}
@DBOS.workflow()
def process_webhook(webhook_id: str, source: str, event_type: str, payload: dict) -> dict:
"""Process webhook exactly once."""
# Store for audit trail
store_webhook_event(webhook_id, source, event_type, payload)
# Process the webhook
result = process_webhook_payload(source, event_type, payload)
return {"webhook_id": webhook_id, "result": result}
@app.post("/webhooks/stripe")
def stripe_webhook(request: Request):
"""Handle Stripe webhook."""
payload = request.body()
signature = request.headers.get("Stripe-Signature")
# Parse webhook
event = parse_stripe_webhook(payload)
# Generate unique ID from event
webhook_id = event["id"] # Stripe provides unique event IDs
# Process exactly once
with SetWorkflowID(f"stripe-{webhook_id}"):
result = process_webhook(
webhook_id,
"stripe",
event["type"],
event["data"]
)
return {"status": "success", "result": result}
@app.post("/webhooks/github")
def github_webhook(request: Request):
"""Handle GitHub webhook."""
payload = request.json()
delivery_id = request.headers.get("X-GitHub-Delivery")
event_type = request.headers.get("X-GitHub-Event")
# GitHub provides delivery ID for deduplication
with SetWorkflowID(f"github-{delivery_id}"):
result = process_webhook(
delivery_id,
"github",
event_type,
payload
)
return {"status": "success", "result": result}
Always use the webhook provider’s unique event/delivery ID for the workflow ID to ensure idempotency.
Kafka Consumer
Process Kafka messages with exactly-once semantics:from dbos import DBOS, SetWorkflowID, KafkaMessage
import json
@DBOS.transaction()
def update_inventory(product_id: str, quantity_change: int) -> dict:
"""Update product inventory."""
result = DBOS.sql_session.execute(
sa.text(
"UPDATE inventory SET quantity = quantity + :change "
"WHERE product_id = :pid RETURNING quantity"
),
{"change": quantity_change, "pid": product_id}
)
row = result.fetchone()
return {"product_id": product_id, "new_quantity": row[0]}
@DBOS.step()
def send_inventory_alert(product_id: str, quantity: int) -> None:
"""Alert if inventory is low."""
if quantity < 10:
alert_service.send(
f"Low inventory alert: {product_id} has only {quantity} units"
)
@DBOS.workflow()
def process_inventory_update(message_id: str, product_id: str, quantity_change: int) -> dict:
"""Process inventory update from Kafka."""
# Update database
result = update_inventory(product_id, quantity_change)
# Send alert if needed
send_inventory_alert(product_id, result["new_quantity"])
return result
@DBOS.kafka_consumer(
topic="inventory-updates",
config={"bootstrap.servers": "localhost:9092", "group.id": "inventory-processor"}
)
def kafka_inventory_handler(message: KafkaMessage) -> None:
"""Handle Kafka messages with exactly-once processing."""
# Parse message
data = json.loads(message.value.decode())
# Create unique ID from Kafka coordinates
message_id = f"{message.topic}-{message.partition}-{message.offset}"
# Process exactly once
with SetWorkflowID(message_id):
process_inventory_update(
message_id,
data["product_id"],
data["quantity_change"]
)
For Kafka, use
topic-partition-offset as the unique message identifier to ensure exactly-once processing.Message Queue Pattern
Process messages from a queue system:import boto3
import hashlib
from dbos import DBOS, SetWorkflowID
@DBOS.transaction()
def process_order(order_id: str, customer_id: str, items: list) -> dict:
"""Process order from queue."""
# Insert order
DBOS.sql_session.execute(
sa.text(
"INSERT INTO orders (order_id, customer_id, status) "
"VALUES (:oid, :cid, 'processing')"
),
{"oid": order_id, "cid": customer_id}
)
# Insert items
for item in items:
DBOS.sql_session.execute(
sa.text(
"INSERT INTO order_items (order_id, product_id, quantity) "
"VALUES (:oid, :pid, :qty)"
),
{"oid": order_id, "pid": item["product_id"], "qty": item["quantity"]}
)
return {"order_id": order_id, "status": "processed"}
@DBOS.step()
def notify_customer(customer_id: str, order_id: str) -> None:
"""Send order confirmation."""
notification_service.send(
customer_id,
f"Your order {order_id} has been received"
)
@DBOS.workflow()
def handle_order_message(message_id: str, order_data: dict) -> dict:
"""Process order message exactly once."""
# Process order
result = process_order(
order_data["order_id"],
order_data["customer_id"],
order_data["items"]
)
# Notify customer
notify_customer(
order_data["customer_id"],
order_data["order_id"]
)
return result
def poll_sqs_queue():
"""Poll SQS and process messages."""
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'
while True:
# Receive messages
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get('Messages', [])
for message in messages:
# Use message ID for idempotency
message_id = message['MessageId']
try:
# Parse message
order_data = json.loads(message['Body'])
# Process exactly once
with SetWorkflowID(f"sqs-{message_id}"):
handle_order_message(message_id, order_data)
# Delete message after successful processing
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
print(f"Error processing message {message_id}: {e}")
# Message will be retried by SQS
Event Sourcing Pattern
Implement event sourcing with DBOS:from dbos import DBOS, SetWorkflowID
from datetime import datetime
import json
@DBOS.transaction()
def append_event(aggregate_id: str, event_type: str, event_data: dict, event_id: str) -> int:
"""Append event to event store."""
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO event_store (aggregate_id, event_id, event_type, event_data, timestamp) "
"VALUES (:agg_id, :evt_id, :evt_type, :data, :ts) "
"RETURNING sequence_number"
),
{
"agg_id": aggregate_id,
"evt_id": event_id,
"evt_type": event_type,
"data": json.dumps(event_data),
"ts": datetime.now()
}
)
return result.fetchone()[0]
@DBOS.transaction()
def update_projection(aggregate_id: str, event_type: str, event_data: dict) -> None:
"""Update read model/projection."""
if event_type == "account.created":
DBOS.sql_session.execute(
sa.text(
"INSERT INTO account_view (account_id, balance, status) "
"VALUES (:id, :balance, 'active')"
),
{"id": aggregate_id, "balance": event_data["initial_balance"]}
)
elif event_type == "account.credited":
DBOS.sql_session.execute(
sa.text(
"UPDATE account_view SET balance = balance + :amount "
"WHERE account_id = :id"
),
{"id": aggregate_id, "amount": event_data["amount"]}
)
elif event_type == "account.debited":
DBOS.sql_session.execute(
sa.text(
"UPDATE account_view SET balance = balance - :amount "
"WHERE account_id = :id"
),
{"id": aggregate_id, "amount": event_data["amount"]}
)
@DBOS.workflow()
def process_domain_event(event_id: str, aggregate_id: str, event_type: str, event_data: dict) -> dict:
"""Process domain event in event sourcing system."""
# Append to event store
sequence_number = append_event(aggregate_id, event_type, event_data, event_id)
# Update projection
update_projection(aggregate_id, event_type, event_data)
return {
"event_id": event_id,
"sequence_number": sequence_number,
"aggregate_id": aggregate_id
}
def publish_domain_event(aggregate_id: str, event_type: str, event_data: dict) -> dict:
"""Publish a domain event."""
# Generate unique event ID
event_id = f"{aggregate_id}-{event_type}-{datetime.now().isoformat()}"
# Process event exactly once
with SetWorkflowID(f"event-{event_id}"):
result = process_domain_event(event_id, aggregate_id, event_type, event_data)
return result
# Usage
publish_domain_event(
"account-123",
"account.created",
{"initial_balance": 1000.0}
)
publish_domain_event(
"account-123",
"account.credited",
{"amount": 500.0}
)
publish_domain_event(
"account-123",
"account.debited",
{"amount": 200.0}
)
Idempotency Keys
Use client-provided idempotency keys:from fastapi import FastAPI, Header, HTTPException
from dbos import DBOS, SetWorkflowID
from typing import Optional
app = FastAPI()
@DBOS.transaction()
def create_payment(payment_id: str, customer_id: str, amount: float) -> dict:
"""Create payment record."""
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO payments (payment_id, customer_id, amount, status, created_at) "
"VALUES (:pid, :cid, :amount, 'pending', NOW()) "
"RETURNING id"
),
{"pid": payment_id, "cid": customer_id, "amount": amount}
)
return {"db_id": result.fetchone()[0]}
@DBOS.step()
def charge_payment_gateway(payment_id: str, amount: float) -> str:
"""Charge via payment gateway."""
transaction_id = payment_gateway.charge(amount)
return transaction_id
@DBOS.transaction()
def mark_payment_completed(payment_id: str, transaction_id: str) -> None:
"""Mark payment as completed."""
DBOS.sql_session.execute(
sa.text(
"UPDATE payments SET status = 'completed', transaction_id = :txn_id "
"WHERE payment_id = :pid"
),
{"pid": payment_id, "txn_id": transaction_id}
)
@DBOS.workflow()
def process_payment(payment_id: str, customer_id: str, amount: float) -> dict:
"""Process payment workflow."""
# Create payment record
db_record = create_payment(payment_id, customer_id, amount)
# Charge payment gateway
transaction_id = charge_payment_gateway(payment_id, amount)
# Mark as completed
mark_payment_completed(payment_id, transaction_id)
return {
"payment_id": payment_id,
"transaction_id": transaction_id,
"status": "completed"
}
@app.post("/api/payments")
def create_payment_endpoint(
customer_id: str,
amount: float,
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key")
):
"""Create payment with idempotency key."""
if not idempotency_key:
raise HTTPException(status_code=400, detail="Idempotency-Key header required")
# Generate payment ID
payment_id = f"pay-{idempotency_key}"
# Process with idempotency
with SetWorkflowID(f"payment-{idempotency_key}"):
result = process_payment(payment_id, customer_id, amount)
return result
Idempotency keys allow clients to safely retry requests without creating duplicate resources.
Composite Event IDs
Create unique IDs from multiple event fields:import hashlib
from dbos import DBOS, SetWorkflowID
def create_event_id(source: str, timestamp: str, data: dict) -> str:
"""Create deterministic event ID from event components."""
# Combine components
components = f"{source}:{timestamp}:{sorted(data.items())}"
# Hash to create stable ID
return hashlib.sha256(components.encode()).hexdigest()
@DBOS.workflow()
def process_composite_event(event_id: str, source: str, data: dict) -> dict:
"""Process event with composite ID."""
# Processing logic
return {"event_id": event_id, "processed": True}
def handle_event(source: str, timestamp: str, data: dict) -> dict:
"""Handle event with composite ID."""
# Create deterministic ID
event_id = create_event_id(source, timestamp, data)
# Process exactly once
with SetWorkflowID(f"event-{event_id}"):
result = process_composite_event(event_id, source, data)
return result
Complete Example: Multi-Source Event Processor
from dbos import DBOS, SetWorkflowID, KafkaMessage
from fastapi import FastAPI, Request, Header
import sqlalchemy as sa
import json
from datetime import datetime
from typing import Optional
app = FastAPI()
# Database schema
@DBOS.transaction()
def store_event(
event_id: str,
source: str,
event_type: str,
payload: dict,
metadata: dict
) -> int:
"""Store event in event log."""
result = DBOS.sql_session.execute(
sa.text(
"INSERT INTO event_log "
"(event_id, source, event_type, payload, metadata, received_at) "
"VALUES (:id, :source, :type, :payload, :metadata, :received) "
"RETURNING id"
),
{
"id": event_id,
"source": source,
"type": event_type,
"payload": json.dumps(payload),
"metadata": json.dumps(metadata),
"received": datetime.now()
}
)
return result.fetchone()[0]
@DBOS.step()
def route_event(source: str, event_type: str, payload: dict) -> dict:
"""Route event to appropriate handler."""
if source == "stripe":
return handle_stripe_event(event_type, payload)
elif source == "github":
return handle_github_event(event_type, payload)
elif source == "kafka":
return handle_kafka_event(event_type, payload)
else:
return {"status": "unhandled", "source": source}
@DBOS.step()
def handle_stripe_event(event_type: str, payload: dict) -> dict:
"""Handle Stripe-specific events."""
if event_type == "payment_intent.succeeded":
payment_id = payload["id"]
amount = payload["amount"]
return fulfill_order(payment_id, amount)
return {"status": "ignored", "event_type": event_type}
@DBOS.step()
def handle_github_event(event_type: str, payload: dict) -> dict:
"""Handle GitHub-specific events."""
if event_type == "push":
repo = payload["repository"]["name"]
ref = payload["ref"]
return trigger_ci_cd(repo, ref)
return {"status": "ignored", "event_type": event_type}
@DBOS.step()
def handle_kafka_event(event_type: str, payload: dict) -> dict:
"""Handle Kafka-specific events."""
if event_type == "inventory.updated":
product_id = payload["product_id"]
quantity = payload["quantity"]
return update_inventory_cache(product_id, quantity)
return {"status": "ignored", "event_type": event_type}
@DBOS.workflow()
def unified_event_processor(
event_id: str,
source: str,
event_type: str,
payload: dict,
metadata: Optional[dict] = None
) -> dict:
"""Unified event processing workflow."""
if metadata is None:
metadata = {}
DBOS.logger.info(f"Processing event {event_id} from {source}")
# Store event
db_id = store_event(event_id, source, event_type, payload, metadata)
# Route and process
result = route_event(source, event_type, payload)
DBOS.logger.info(f"Event {event_id} processed: {result['status']}")
return {
"event_id": event_id,
"db_id": db_id,
"result": result
}
# HTTP webhook endpoint
@app.post("/webhooks/{source}")
def webhook_endpoint(
source: str,
request: Request,
x_event_id: Optional[str] = Header(None),
x_event_type: Optional[str] = Header(None)
):
"""Generic webhook endpoint."""
payload = request.json()
# Extract event ID from header or payload
if source == "stripe":
event_id = payload.get("id", x_event_id)
event_type = payload.get("type", x_event_type)
elif source == "github":
event_id = request.headers.get("X-GitHub-Delivery")
event_type = request.headers.get("X-GitHub-Event")
else:
event_id = x_event_id
event_type = x_event_type
if not event_id:
raise HTTPException(status_code=400, detail="Missing event ID")
# Process with idempotency
with SetWorkflowID(f"{source}-{event_id}"):
result = unified_event_processor(
event_id,
source,
event_type,
payload,
{"headers": dict(request.headers)}
)
return {"status": "success", "result": result}
# Kafka consumer
@DBOS.kafka_consumer(
topic="events",
config={"bootstrap.servers": "localhost:9092", "group.id": "event-processor"}
)
def kafka_event_handler(message: KafkaMessage) -> None:
"""Handle Kafka events."""
# Parse message
data = json.loads(message.value.decode())
# Create unique ID
event_id = f"{message.topic}-{message.partition}-{message.offset}"
# Process with idempotency
with SetWorkflowID(event_id):
unified_event_processor(
event_id,
"kafka",
data.get("event_type", "unknown"),
data,
{
"topic": message.topic,
"partition": message.partition,
"offset": message.offset
}
)
Best Practices
Choose Good Event IDs
Choose Good Event IDs
- Use provider-supplied IDs when available (e.g., Stripe event ID, SQS message ID)
- For Kafka, use
topic-partition-offset - For composite events, hash the event components deterministically
- Never use timestamps alone (not unique)
Handle Duplicate Deliveries
Handle Duplicate Deliveries
- Always use SetWorkflowID with event ID
- Design workflows to be idempotent
- Log when duplicate events are detected
- Monitor deduplication metrics
Error Handling
Error Handling
- Distinguish between retriable and non-retriable errors
- Use dead letter queues for failed events
- Store failed events for manual review
- Alert on repeated failures
Monitoring
Monitoring
- Track event processing latency
- Monitor duplicate event rate
- Alert on processing failures
- Measure end-to-end event delivery time
Next Steps
Workflow Tutorial
Learn more about building durable workflows
Error Handling
Handle failures in event processing
Notifications
Coordinate workflows with durable notifications
Workflow Management
Monitor and manage event processing workflows