Confluent Kafka consumer configuration. Must include bootstrap.servers. See Confluent Kafka Python documentation for all options.Common configuration options:
bootstrap.servers: Kafka broker addresses
group.id: Consumer group ID (auto-generated if not provided)
auto.offset.reset: Where to start consuming ("earliest" or "latest", defaults to "earliest")
DBOS guarantees exactly-once message processing through workflow deduplication:
import json@DBOS.kafka_consumer( { "bootstrap.servers": "localhost:9092", "group.id": "payment-processor" }, ["payments"])@DBOS.workflow()def process_payment(msg: KafkaMessage) -> dict: """Process payment - guaranteed exactly once""" payment_data = json.loads(msg.value.decode("utf-8")) # This transaction runs exactly once per message # Even if Kafka redelivers the message transaction_id = record_payment_txn( payment_data["user_id"], payment_data["amount"] ) # This notification is sent exactly once send_confirmation_step(payment_data["user_id"], transaction_id) return {"transaction_id": transaction_id}@DBOS.transaction()def record_payment_txn(user_id: str, amount: float) -> str: result = DBOS.sql_session.execute( sa.text( "INSERT INTO payments (user_id, amount) " "VALUES (:user, :amt) RETURNING id" ), {"user": user_id, "amt": amount} ).fetchone() return str(result[0])@DBOS.step()def send_confirmation_step(user_id: str, transaction_id: str) -> None: # External API call - automatically retried on failure # But called exactly once (not retried after success) print(f"Sending confirmation to {user_id}")
Even if your application crashes during message processing, DBOS will recover the workflow on restart and complete it exactly once. The workflow ID based on topic-partition-offset-group ensures deduplication.
For use cases requiring strict message ordering, use in_order=True:
@DBOS.kafka_consumer( { "bootstrap.servers": "localhost:9092", "group.id": "ordered-processor" }, ["ordered-events"], in_order=True # Process messages sequentially)@DBOS.workflow()def process_ordered_event(msg: KafkaMessage) -> None: """Messages processed one at a time in order""" event_id = int(msg.key.decode("utf-8")) # This will process event 1, then 2, then 3, etc. # Even if event 3 arrives before event 2 is done, # it waits for event 2 to complete process_event_in_sequence(event_id)@DBOS.step()def process_event_in_sequence(event_id: int) -> None: print(f"Processing event {event_id}") # Do work...
When using in_order=True:
You cannot use regex topic selectors (e.g., "^prefix-.*")
Each topic gets its own sequential queue
Messages are processed one at a time per topic, which may reduce throughput
Test Kafka consumers by sending messages to test topics:
from confluent_kafka import Producerdef test_kafka_consumer(): # Send test messages producer = Producer({"bootstrap.servers": "localhost:9092"}) for i in range(3): producer.produce( "test-topic", key=f"key-{i}", value=f"value-{i}" ) producer.flush() # Wait for processing import time time.sleep(5) # Verify results in database # ...