Skip to main content

Introduction

DBOS makes it easy to build exactly-once event processors by combining idempotent workflows with SetWorkflowID. This ensures events are processed exactly once, even in the face of retries, crashes, or duplicate deliveries.

Exactly-Once Processing Pattern

The key to exactly-once processing is using a deterministic workflow ID based on the event:
from dbos import DBOS, SetWorkflowID
import sqlalchemy as sa

@DBOS.transaction()
def record_event(event_id: str, event_type: str, data: dict) -> None:
    """Store event in database."""
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO events (event_id, event_type, data, processed_at) "
            "VALUES (:event_id, :event_type, :data, NOW())"
        ),
        {"event_id": event_id, "event_type": event_type, "data": str(data)}
    )

@DBOS.step()
def process_event_logic(event_type: str, data: dict) -> dict:
    """Business logic for processing event."""
    if event_type == "order_placed":
        return process_order(data)
    elif event_type == "payment_received":
        return process_payment(data)
    else:
        return {"status": "skipped", "reason": "unknown_event_type"}

@DBOS.workflow()
def process_event(event_id: str, event_type: str, data: dict) -> dict:
    """Process an event exactly once."""
    DBOS.logger.info(f"Processing event {event_id} of type {event_type}")
    
    # Record the event
    record_event(event_id, event_type, data)
    
    # Process the event
    result = process_event_logic(event_type, data)
    
    DBOS.logger.info(f"Event {event_id} processed successfully")
    return result

def handle_incoming_event(event_id: str, event_type: str, data: dict) -> dict:
    """Entry point for event processing."""
    # Use event ID as workflow ID for idempotency
    with SetWorkflowID(f"event-{event_id}"):
        result = process_event(event_id, event_type, data)
    return result

# Usage
result1 = handle_incoming_event("evt-123", "order_placed", {"order_id": "ord-456"})
print(result1)  # Processes the event

# Duplicate delivery - returns cached result without reprocessing
result2 = handle_incoming_event("evt-123", "order_placed", {"order_id": "ord-456"})
print(result2)  # Same result, event processed exactly once
By using the event ID as the workflow ID, DBOS automatically deduplicates duplicate event deliveries.

Webhook Processing

Process webhook events exactly once:
from fastapi import FastAPI, Request, HTTPException
from dbos import DBOS, SetWorkflowID
import hashlib
import hmac

app = FastAPI()

@DBOS.step()
def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
    """Verify webhook signature."""
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(signature, expected)

@DBOS.transaction()
def store_webhook_event(webhook_id: str, source: str, event_type: str, payload: dict) -> None:
    """Store webhook for auditing."""
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO webhook_events (webhook_id, source, event_type, payload, received_at) "
            "VALUES (:id, :source, :type, :payload, NOW())"
        ),
        {"id": webhook_id, "source": source, "type": event_type, "payload": str(payload)}
    )

@DBOS.step()
def process_webhook_payload(source: str, event_type: str, payload: dict) -> dict:
    """Process webhook business logic."""
    if source == "stripe" and event_type == "payment.succeeded":
        return handle_payment_success(payload)
    elif source == "github" and event_type == "push":
        return handle_code_push(payload)
    else:
        return {"status": "ignored"}

@DBOS.workflow()
def process_webhook(webhook_id: str, source: str, event_type: str, payload: dict) -> dict:
    """Process webhook exactly once."""
    # Store for audit trail
    store_webhook_event(webhook_id, source, event_type, payload)
    
    # Process the webhook
    result = process_webhook_payload(source, event_type, payload)
    
    return {"webhook_id": webhook_id, "result": result}

@app.post("/webhooks/stripe")
def stripe_webhook(request: Request):
    """Handle Stripe webhook."""
    payload = request.body()
    signature = request.headers.get("Stripe-Signature")
    
    # Parse webhook
    event = parse_stripe_webhook(payload)
    
    # Generate unique ID from event
    webhook_id = event["id"]  # Stripe provides unique event IDs
    
    # Process exactly once
    with SetWorkflowID(f"stripe-{webhook_id}"):
        result = process_webhook(
            webhook_id,
            "stripe",
            event["type"],
            event["data"]
        )
    
    return {"status": "success", "result": result}

@app.post("/webhooks/github")
def github_webhook(request: Request):
    """Handle GitHub webhook."""
    payload = request.json()
    delivery_id = request.headers.get("X-GitHub-Delivery")
    event_type = request.headers.get("X-GitHub-Event")
    
    # GitHub provides delivery ID for deduplication
    with SetWorkflowID(f"github-{delivery_id}"):
        result = process_webhook(
            delivery_id,
            "github",
            event_type,
            payload
        )
    
    return {"status": "success", "result": result}
Always use the webhook provider’s unique event/delivery ID for the workflow ID to ensure idempotency.

Kafka Consumer

Process Kafka messages with exactly-once semantics:
from dbos import DBOS, SetWorkflowID, KafkaMessage
import json

@DBOS.transaction()
def update_inventory(product_id: str, quantity_change: int) -> dict:
    """Update product inventory."""
    result = DBOS.sql_session.execute(
        sa.text(
            "UPDATE inventory SET quantity = quantity + :change "
            "WHERE product_id = :pid RETURNING quantity"
        ),
        {"change": quantity_change, "pid": product_id}
    )
    row = result.fetchone()
    return {"product_id": product_id, "new_quantity": row[0]}

@DBOS.step()
def send_inventory_alert(product_id: str, quantity: int) -> None:
    """Alert if inventory is low."""
    if quantity < 10:
        alert_service.send(
            f"Low inventory alert: {product_id} has only {quantity} units"
        )

@DBOS.workflow()
def process_inventory_update(message_id: str, product_id: str, quantity_change: int) -> dict:
    """Process inventory update from Kafka."""
    # Update database
    result = update_inventory(product_id, quantity_change)
    
    # Send alert if needed
    send_inventory_alert(product_id, result["new_quantity"])
    
    return result

@DBOS.kafka_consumer(
    topic="inventory-updates",
    config={"bootstrap.servers": "localhost:9092", "group.id": "inventory-processor"}
)
def kafka_inventory_handler(message: KafkaMessage) -> None:
    """Handle Kafka messages with exactly-once processing."""
    # Parse message
    data = json.loads(message.value.decode())
    
    # Create unique ID from Kafka coordinates
    message_id = f"{message.topic}-{message.partition}-{message.offset}"
    
    # Process exactly once
    with SetWorkflowID(message_id):
        process_inventory_update(
            message_id,
            data["product_id"],
            data["quantity_change"]
        )
For Kafka, use topic-partition-offset as the unique message identifier to ensure exactly-once processing.

Message Queue Pattern

Process messages from a queue system:
import boto3
import hashlib
from dbos import DBOS, SetWorkflowID

@DBOS.transaction()
def process_order(order_id: str, customer_id: str, items: list) -> dict:
    """Process order from queue."""
    # Insert order
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (order_id, customer_id, status) "
            "VALUES (:oid, :cid, 'processing')"
        ),
        {"oid": order_id, "cid": customer_id}
    )
    
    # Insert items
    for item in items:
        DBOS.sql_session.execute(
            sa.text(
                "INSERT INTO order_items (order_id, product_id, quantity) "
                "VALUES (:oid, :pid, :qty)"
            ),
            {"oid": order_id, "pid": item["product_id"], "qty": item["quantity"]}
        )
    
    return {"order_id": order_id, "status": "processed"}

@DBOS.step()
def notify_customer(customer_id: str, order_id: str) -> None:
    """Send order confirmation."""
    notification_service.send(
        customer_id,
        f"Your order {order_id} has been received"
    )

@DBOS.workflow()
def handle_order_message(message_id: str, order_data: dict) -> dict:
    """Process order message exactly once."""
    # Process order
    result = process_order(
        order_data["order_id"],
        order_data["customer_id"],
        order_data["items"]
    )
    
    # Notify customer
    notify_customer(
        order_data["customer_id"],
        order_data["order_id"]
    )
    
    return result

def poll_sqs_queue():
    """Poll SQS and process messages."""
    sqs = boto3.client('sqs')
    queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'
    
    while True:
        # Receive messages
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20
        )
        
        messages = response.get('Messages', [])
        for message in messages:
            # Use message ID for idempotency
            message_id = message['MessageId']
            
            try:
                # Parse message
                order_data = json.loads(message['Body'])
                
                # Process exactly once
                with SetWorkflowID(f"sqs-{message_id}"):
                    handle_order_message(message_id, order_data)
                
                # Delete message after successful processing
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=message['ReceiptHandle']
                )
            
            except Exception as e:
                print(f"Error processing message {message_id}: {e}")
                # Message will be retried by SQS

Event Sourcing Pattern

Implement event sourcing with DBOS:
from dbos import DBOS, SetWorkflowID
from datetime import datetime
import json

@DBOS.transaction()
def append_event(aggregate_id: str, event_type: str, event_data: dict, event_id: str) -> int:
    """Append event to event store."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO event_store (aggregate_id, event_id, event_type, event_data, timestamp) "
            "VALUES (:agg_id, :evt_id, :evt_type, :data, :ts) "
            "RETURNING sequence_number"
        ),
        {
            "agg_id": aggregate_id,
            "evt_id": event_id,
            "evt_type": event_type,
            "data": json.dumps(event_data),
            "ts": datetime.now()
        }
    )
    return result.fetchone()[0]

@DBOS.transaction()
def update_projection(aggregate_id: str, event_type: str, event_data: dict) -> None:
    """Update read model/projection."""
    if event_type == "account.created":
        DBOS.sql_session.execute(
            sa.text(
                "INSERT INTO account_view (account_id, balance, status) "
                "VALUES (:id, :balance, 'active')"
            ),
            {"id": aggregate_id, "balance": event_data["initial_balance"]}
        )
    
    elif event_type == "account.credited":
        DBOS.sql_session.execute(
            sa.text(
                "UPDATE account_view SET balance = balance + :amount "
                "WHERE account_id = :id"
            ),
            {"id": aggregate_id, "amount": event_data["amount"]}
        )
    
    elif event_type == "account.debited":
        DBOS.sql_session.execute(
            sa.text(
                "UPDATE account_view SET balance = balance - :amount "
                "WHERE account_id = :id"
            ),
            {"id": aggregate_id, "amount": event_data["amount"]}
        )

@DBOS.workflow()
def process_domain_event(event_id: str, aggregate_id: str, event_type: str, event_data: dict) -> dict:
    """Process domain event in event sourcing system."""
    # Append to event store
    sequence_number = append_event(aggregate_id, event_type, event_data, event_id)
    
    # Update projection
    update_projection(aggregate_id, event_type, event_data)
    
    return {
        "event_id": event_id,
        "sequence_number": sequence_number,
        "aggregate_id": aggregate_id
    }

def publish_domain_event(aggregate_id: str, event_type: str, event_data: dict) -> dict:
    """Publish a domain event."""
    # Generate unique event ID
    event_id = f"{aggregate_id}-{event_type}-{datetime.now().isoformat()}"
    
    # Process event exactly once
    with SetWorkflowID(f"event-{event_id}"):
        result = process_domain_event(event_id, aggregate_id, event_type, event_data)
    
    return result

# Usage
publish_domain_event(
    "account-123",
    "account.created",
    {"initial_balance": 1000.0}
)

publish_domain_event(
    "account-123",
    "account.credited",
    {"amount": 500.0}
)

publish_domain_event(
    "account-123",
    "account.debited",
    {"amount": 200.0}
)

Idempotency Keys

Use client-provided idempotency keys:
from fastapi import FastAPI, Header, HTTPException
from dbos import DBOS, SetWorkflowID
from typing import Optional

app = FastAPI()

@DBOS.transaction()
def create_payment(payment_id: str, customer_id: str, amount: float) -> dict:
    """Create payment record."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO payments (payment_id, customer_id, amount, status, created_at) "
            "VALUES (:pid, :cid, :amount, 'pending', NOW()) "
            "RETURNING id"
        ),
        {"pid": payment_id, "cid": customer_id, "amount": amount}
    )
    return {"db_id": result.fetchone()[0]}

@DBOS.step()
def charge_payment_gateway(payment_id: str, amount: float) -> str:
    """Charge via payment gateway."""
    transaction_id = payment_gateway.charge(amount)
    return transaction_id

@DBOS.transaction()
def mark_payment_completed(payment_id: str, transaction_id: str) -> None:
    """Mark payment as completed."""
    DBOS.sql_session.execute(
        sa.text(
            "UPDATE payments SET status = 'completed', transaction_id = :txn_id "
            "WHERE payment_id = :pid"
        ),
        {"pid": payment_id, "txn_id": transaction_id}
    )

@DBOS.workflow()
def process_payment(payment_id: str, customer_id: str, amount: float) -> dict:
    """Process payment workflow."""
    # Create payment record
    db_record = create_payment(payment_id, customer_id, amount)
    
    # Charge payment gateway
    transaction_id = charge_payment_gateway(payment_id, amount)
    
    # Mark as completed
    mark_payment_completed(payment_id, transaction_id)
    
    return {
        "payment_id": payment_id,
        "transaction_id": transaction_id,
        "status": "completed"
    }

@app.post("/api/payments")
def create_payment_endpoint(
    customer_id: str,
    amount: float,
    idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key")
):
    """Create payment with idempotency key."""
    
    if not idempotency_key:
        raise HTTPException(status_code=400, detail="Idempotency-Key header required")
    
    # Generate payment ID
    payment_id = f"pay-{idempotency_key}"
    
    # Process with idempotency
    with SetWorkflowID(f"payment-{idempotency_key}"):
        result = process_payment(payment_id, customer_id, amount)
    
    return result
Idempotency keys allow clients to safely retry requests without creating duplicate resources.

Composite Event IDs

Create unique IDs from multiple event fields:
import hashlib
from dbos import DBOS, SetWorkflowID

def create_event_id(source: str, timestamp: str, data: dict) -> str:
    """Create deterministic event ID from event components."""
    # Combine components
    components = f"{source}:{timestamp}:{sorted(data.items())}"
    
    # Hash to create stable ID
    return hashlib.sha256(components.encode()).hexdigest()

@DBOS.workflow()
def process_composite_event(event_id: str, source: str, data: dict) -> dict:
    """Process event with composite ID."""
    # Processing logic
    return {"event_id": event_id, "processed": True}

def handle_event(source: str, timestamp: str, data: dict) -> dict:
    """Handle event with composite ID."""
    # Create deterministic ID
    event_id = create_event_id(source, timestamp, data)
    
    # Process exactly once
    with SetWorkflowID(f"event-{event_id}"):
        result = process_composite_event(event_id, source, data)
    
    return result

Complete Example: Multi-Source Event Processor

from dbos import DBOS, SetWorkflowID, KafkaMessage
from fastapi import FastAPI, Request, Header
import sqlalchemy as sa
import json
from datetime import datetime
from typing import Optional

app = FastAPI()

# Database schema
@DBOS.transaction()
def store_event(
    event_id: str,
    source: str,
    event_type: str,
    payload: dict,
    metadata: dict
) -> int:
    """Store event in event log."""
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO event_log "
            "(event_id, source, event_type, payload, metadata, received_at) "
            "VALUES (:id, :source, :type, :payload, :metadata, :received) "
            "RETURNING id"
        ),
        {
            "id": event_id,
            "source": source,
            "type": event_type,
            "payload": json.dumps(payload),
            "metadata": json.dumps(metadata),
            "received": datetime.now()
        }
    )
    return result.fetchone()[0]

@DBOS.step()
def route_event(source: str, event_type: str, payload: dict) -> dict:
    """Route event to appropriate handler."""
    
    if source == "stripe":
        return handle_stripe_event(event_type, payload)
    elif source == "github":
        return handle_github_event(event_type, payload)
    elif source == "kafka":
        return handle_kafka_event(event_type, payload)
    else:
        return {"status": "unhandled", "source": source}

@DBOS.step()
def handle_stripe_event(event_type: str, payload: dict) -> dict:
    """Handle Stripe-specific events."""
    if event_type == "payment_intent.succeeded":
        payment_id = payload["id"]
        amount = payload["amount"]
        return fulfill_order(payment_id, amount)
    
    return {"status": "ignored", "event_type": event_type}

@DBOS.step()
def handle_github_event(event_type: str, payload: dict) -> dict:
    """Handle GitHub-specific events."""
    if event_type == "push":
        repo = payload["repository"]["name"]
        ref = payload["ref"]
        return trigger_ci_cd(repo, ref)
    
    return {"status": "ignored", "event_type": event_type}

@DBOS.step()
def handle_kafka_event(event_type: str, payload: dict) -> dict:
    """Handle Kafka-specific events."""
    if event_type == "inventory.updated":
        product_id = payload["product_id"]
        quantity = payload["quantity"]
        return update_inventory_cache(product_id, quantity)
    
    return {"status": "ignored", "event_type": event_type}

@DBOS.workflow()
def unified_event_processor(
    event_id: str,
    source: str,
    event_type: str,
    payload: dict,
    metadata: Optional[dict] = None
) -> dict:
    """Unified event processing workflow."""
    
    if metadata is None:
        metadata = {}
    
    DBOS.logger.info(f"Processing event {event_id} from {source}")
    
    # Store event
    db_id = store_event(event_id, source, event_type, payload, metadata)
    
    # Route and process
    result = route_event(source, event_type, payload)
    
    DBOS.logger.info(f"Event {event_id} processed: {result['status']}")
    
    return {
        "event_id": event_id,
        "db_id": db_id,
        "result": result
    }

# HTTP webhook endpoint
@app.post("/webhooks/{source}")
def webhook_endpoint(
    source: str,
    request: Request,
    x_event_id: Optional[str] = Header(None),
    x_event_type: Optional[str] = Header(None)
):
    """Generic webhook endpoint."""
    
    payload = request.json()
    
    # Extract event ID from header or payload
    if source == "stripe":
        event_id = payload.get("id", x_event_id)
        event_type = payload.get("type", x_event_type)
    elif source == "github":
        event_id = request.headers.get("X-GitHub-Delivery")
        event_type = request.headers.get("X-GitHub-Event")
    else:
        event_id = x_event_id
        event_type = x_event_type
    
    if not event_id:
        raise HTTPException(status_code=400, detail="Missing event ID")
    
    # Process with idempotency
    with SetWorkflowID(f"{source}-{event_id}"):
        result = unified_event_processor(
            event_id,
            source,
            event_type,
            payload,
            {"headers": dict(request.headers)}
        )
    
    return {"status": "success", "result": result}

# Kafka consumer
@DBOS.kafka_consumer(
    topic="events",
    config={"bootstrap.servers": "localhost:9092", "group.id": "event-processor"}
)
def kafka_event_handler(message: KafkaMessage) -> None:
    """Handle Kafka events."""
    
    # Parse message
    data = json.loads(message.value.decode())
    
    # Create unique ID
    event_id = f"{message.topic}-{message.partition}-{message.offset}"
    
    # Process with idempotency
    with SetWorkflowID(event_id):
        unified_event_processor(
            event_id,
            "kafka",
            data.get("event_type", "unknown"),
            data,
            {
                "topic": message.topic,
                "partition": message.partition,
                "offset": message.offset
            }
        )

Best Practices

  • Use provider-supplied IDs when available (e.g., Stripe event ID, SQS message ID)
  • For Kafka, use topic-partition-offset
  • For composite events, hash the event components deterministically
  • Never use timestamps alone (not unique)
  • Always use SetWorkflowID with event ID
  • Design workflows to be idempotent
  • Log when duplicate events are detected
  • Monitor deduplication metrics
  • Distinguish between retriable and non-retriable errors
  • Use dead letter queues for failed events
  • Store failed events for manual review
  • Alert on repeated failures
  • Track event processing latency
  • Monitor duplicate event rate
  • Alert on processing failures
  • Measure end-to-end event delivery time

Next Steps

Workflow Tutorial

Learn more about building durable workflows

Error Handling

Handle failures in event processing

Notifications

Coordinate workflows with durable notifications

Workflow Management

Monitor and manage event processing workflows

Build docs developers (and LLMs) love