Skip to main content
DBOS Transact provides a powerful Kafka consumer integration that guarantees exactly-once message processing through durable workflow execution.

Quick Start

Consume Kafka messages with automatic exactly-once semantics:
1

Install Dependencies

pip install dbos confluent-kafka
2

Create a Kafka Consumer

from dbos import DBOS, KafkaMessage

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["orders-topic"]
)
@DBOS.workflow()
def process_order(msg: KafkaMessage) -> None:
    order_id = msg.value.decode("utf-8")
    save_order(order_id)
    print(f"Processed order: {order_id}")

@DBOS.transaction()
def save_order(order_id: str) -> None:
    DBOS.sql_session.execute(
        sa.text("INSERT INTO orders (id) VALUES (:id)"),
        {"id": order_id}
    )
3

Launch DBOS

if __name__ == "__main__":
    DBOS.launch()

How It Works

The @DBOS.kafka_consumer() decorator creates a durable Kafka consumer that:
  1. Polls messages from specified topics in a background thread
  2. Generates unique workflow IDs for each message based on topic, partition, offset, and consumer group
  3. Enqueues workflows for message processing
  4. Guarantees exactly-once execution - even if your application crashes mid-processing

Unique Workflow IDs

DBOS generates deterministic workflow IDs for each message:
# Format: kafka-unique-id-{topic}-{partition}-{group_id}-{offset}
workflow_id = f"kafka-unique-id-orders-0-dbos-kafka-group-12345"
This ensures that:
  • Each message is processed exactly once
  • Reprocessing the same message returns the cached result
  • No duplicate processing occurs during recovery

Decorator Parameters

@DBOS.kafka_consumer(config, topics, in_order=False)

config
dict[str, Any]
required
Confluent Kafka consumer configuration. Must include bootstrap.servers. See Confluent Kafka Python documentation for all options.Common configuration options:
  • bootstrap.servers: Kafka broker addresses
  • group.id: Consumer group ID (auto-generated if not provided)
  • auto.offset.reset: Where to start consuming ("earliest" or "latest", defaults to "earliest")
topics
list[str]
required
List of Kafka topics to subscribe to. Can include regular expression patterns (e.g., ["^orders-.*"]) unless using in_order=True.
in_order
bool
default:"False"
If True, processes messages from each topic sequentially in order. Cannot be used with regex topic selectors.

Configuration Examples

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["my-topic"]
)
@DBOS.workflow()
def process_message(msg: KafkaMessage) -> None:
    print(f"Value: {msg.value}")

KafkaMessage Object

The KafkaMessage object passed to your workflow contains:
class KafkaMessage:
    headers: list[tuple[str, bytes]] | None  # Message headers
    key: bytes | None                         # Message key
    value: bytes | None                       # Message value
    topic: str | None                         # Topic name
    partition: int                            # Partition number
    offset: int                               # Message offset
    timestamp: tuple[int, int]                # Timestamp type and value
    latency: float | None                     # Producer latency
    leader_epoch: int | None                  # Leader epoch

Accessing Message Data

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["user-events"]
)
@DBOS.workflow()
def process_user_event(msg: KafkaMessage) -> None:
    # Decode message value
    event_json = msg.value.decode("utf-8")
    
    # Access message key
    user_id = msg.key.decode("utf-8") if msg.key else None
    
    # Get message metadata
    print(f"Topic: {msg.topic}")
    print(f"Partition: {msg.partition}")
    print(f"Offset: {msg.offset}")
    
    # Access headers
    if msg.headers:
        for key, value in msg.headers:
            print(f"Header {key}: {value.decode('utf-8')}")
    
    # Process the event
    store_event(user_id, event_json)

Exactly-Once Processing

DBOS guarantees exactly-once message processing through workflow deduplication:
import json

@DBOS.kafka_consumer(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "payment-processor"
    },
    ["payments"]
)
@DBOS.workflow()
def process_payment(msg: KafkaMessage) -> dict:
    """Process payment - guaranteed exactly once"""
    payment_data = json.loads(msg.value.decode("utf-8"))
    
    # This transaction runs exactly once per message
    # Even if Kafka redelivers the message
    transaction_id = record_payment_txn(
        payment_data["user_id"],
        payment_data["amount"]
    )
    
    # This notification is sent exactly once
    send_confirmation_step(payment_data["user_id"], transaction_id)
    
    return {"transaction_id": transaction_id}

@DBOS.transaction()
def record_payment_txn(user_id: str, amount: float) -> str:
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO payments (user_id, amount) "
            "VALUES (:user, :amt) RETURNING id"
        ),
        {"user": user_id, "amt": amount}
    ).fetchone()
    return str(result[0])

@DBOS.step()
def send_confirmation_step(user_id: str, transaction_id: str) -> None:
    # External API call - automatically retried on failure
    # But called exactly once (not retried after success)
    print(f"Sending confirmation to {user_id}")
Even if your application crashes during message processing, DBOS will recover the workflow on restart and complete it exactly once. The workflow ID based on topic-partition-offset-group ensures deduplication.

In-Order Processing

For use cases requiring strict message ordering, use in_order=True:
@DBOS.kafka_consumer(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "ordered-processor"
    },
    ["ordered-events"],
    in_order=True  # Process messages sequentially
)
@DBOS.workflow()
def process_ordered_event(msg: KafkaMessage) -> None:
    """Messages processed one at a time in order"""
    event_id = int(msg.key.decode("utf-8"))
    
    # This will process event 1, then 2, then 3, etc.
    # Even if event 3 arrives before event 2 is done,
    # it waits for event 2 to complete
    process_event_in_sequence(event_id)

@DBOS.step()
def process_event_in_sequence(event_id: int) -> None:
    print(f"Processing event {event_id}")
    # Do work...
When using in_order=True:
  • You cannot use regex topic selectors (e.g., "^prefix-.*")
  • Each topic gets its own sequential queue
  • Messages are processed one at a time per topic, which may reduce throughput

Async Workflows

Kafka consumers work with both sync and async workflows:
import asyncio

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["async-events"]
)
@DBOS.workflow()
async def process_async_event(msg: KafkaMessage) -> None:
    """Async workflow for Kafka messages"""
    data = msg.value.decode("utf-8")
    
    # Async step
    result = await fetch_external_data_step(data)
    
    # Async transaction
    await save_result_txn(result)

@DBOS.step()
async def fetch_external_data_step(data: str) -> dict:
    # Async HTTP call
    await asyncio.sleep(0.1)  # Simulate API call
    return {"data": data, "processed": True}

@DBOS.transaction()
async def save_result_txn(result: dict) -> None:
    DBOS.sql_session.execute(
        sa.text("INSERT INTO results (data) VALUES (:data)"),
        {"data": str(result)}
    )

Error Handling and Retries

DBOS automatically handles Kafka errors and retries:
# The consumer loop handles:
# 1. Connection errors - reconnects automatically
# 2. Fatal errors - creates new consumer instance
# 3. Message processing errors - workflow retry logic applies

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["events"]
)
@DBOS.workflow()
def resilient_processor(msg: KafkaMessage) -> None:
    try:
        # This step retries automatically on transient failures
        result = risky_external_call_step(msg.value)
        
        # This transaction retries on database errors
        save_result_txn(result)
    except Exception as e:
        # Log permanent failures
        log_error_step(str(e), msg.offset)
        raise  # Workflow will be marked as failed

@DBOS.step()
def risky_external_call_step(data: bytes) -> dict:
    # Automatically retried on failure
    response = requests.post("https://api.example.com/process", data=data)
    response.raise_for_status()
    return response.json()

Consumer Groups

DBOS automatically generates consumer group IDs if not provided:
# Auto-generated group ID format:
# dbos-kafka-group-{method_name}-{topic1}-{topic2}-...

@DBOS.kafka_consumer(
    {"bootstrap.servers": "localhost:9092"},
    ["orders", "payments"]
)
@DBOS.workflow()
def my_processor(msg: KafkaMessage) -> None:
    # Auto-generated group.id:
    # "dbos-kafka-group-my_processor-orders-payments"
    pass

# Or specify explicitly:
@DBOS.kafka_consumer(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "custom-group-name"  # Use this instead
    },
    ["orders"]
)
@DBOS.workflow()
def custom_processor(msg: KafkaMessage) -> None:
    pass
Group ID is limited to 255 characters. Auto-generated IDs are truncated if they exceed this limit.

Complete Example

Here’s a full example of a Kafka-powered order processing system:
import json
import sqlalchemy as sa
from dbos import DBOS, DBOSConfig, KafkaMessage

# Configure DBOS
config = DBOSConfig(
    name="order-processor",
    application_database_url="postgresql://localhost/orders"
)
DBOS(config=config)

# Kafka consumer for new orders
@DBOS.kafka_consumer(
    {
        "bootstrap.servers": "localhost:9092",
        "group.id": "order-processor-group",
        "auto.offset.reset": "earliest"
    },
    ["orders"]
)
@DBOS.workflow()
def process_order(msg: KafkaMessage) -> dict:
    """Process order with exactly-once guarantee"""
    order = json.loads(msg.value.decode("utf-8"))
    
    # Save order to database
    order_id = save_order_txn(order)
    
    # Validate inventory
    available = check_inventory_step(order["items"])
    
    if available:
        # Reserve items and process payment
        reserve_inventory_txn(order_id, order["items"])
        payment_result = process_payment_step(order_id, order["total"])
        
        if payment_result["success"]:
            # Confirm order
            confirm_order_txn(order_id)
            send_confirmation_step(order["user_id"], order_id)
            return {"order_id": order_id, "status": "confirmed"}
        else:
            # Release inventory on payment failure
            release_inventory_txn(order_id)
            return {"order_id": order_id, "status": "payment_failed"}
    else:
        cancel_order_txn(order_id)
        return {"order_id": order_id, "status": "out_of_stock"}

@DBOS.transaction()
def save_order_txn(order: dict) -> str:
    result = DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO orders (user_id, items, total, status) "
            "VALUES (:user, :items, :total, 'pending') RETURNING id"
        ),
        {"user": order["user_id"], "items": json.dumps(order["items"]), "total": order["total"]}
    ).fetchone()
    return str(result[0])

@DBOS.step()
def check_inventory_step(items: list) -> bool:
    # Check external inventory service
    return True  # Simplified

@DBOS.transaction()
def reserve_inventory_txn(order_id: str, items: list) -> None:
    for item in items:
        DBOS.sql_session.execute(
            sa.text(
                "UPDATE inventory SET reserved = reserved + :qty "
                "WHERE product_id = :id"
            ),
            {"qty": item["quantity"], "id": item["product_id"]}
        )

@DBOS.step()
def process_payment_step(order_id: str, amount: float) -> dict:
    # Call payment provider API
    return {"success": True, "transaction_id": "txn_123"}

@DBOS.transaction()
def confirm_order_txn(order_id: str) -> None:
    DBOS.sql_session.execute(
        sa.text("UPDATE orders SET status = 'confirmed' WHERE id = :id"),
        {"id": order_id}
    )

@DBOS.step()
def send_confirmation_step(user_id: str, order_id: str) -> None:
    print(f"Sending confirmation to {user_id} for order {order_id}")

@DBOS.transaction()
def release_inventory_txn(order_id: str) -> None:
    # Release reserved inventory
    pass

@DBOS.transaction()
def cancel_order_txn(order_id: str) -> None:
    DBOS.sql_session.execute(
        sa.text("UPDATE orders SET status = 'cancelled' WHERE id = :id"),
        {"id": order_id}
    )

if __name__ == "__main__":
    DBOS.launch()

Testing

Test Kafka consumers by sending messages to test topics:
from confluent_kafka import Producer

def test_kafka_consumer():
    # Send test messages
    producer = Producer({"bootstrap.servers": "localhost:9092"})
    
    for i in range(3):
        producer.produce(
            "test-topic",
            key=f"key-{i}",
            value=f"value-{i}"
        )
    
    producer.flush()
    
    # Wait for processing
    import time
    time.sleep(5)
    
    # Verify results in database
    # ...

Next Steps

  • Learn about Workflows for durable execution
  • Explore Queues for internal message processing
  • Set up Monitoring for your Kafka consumers

Build docs developers (and LLMs) love