Overview
The Kafka module provides patterns for using Apache Kafka, a distributed event streaming platform, with events modeled in Intent Architect’s designers. Kafka excels at building real-time streaming data pipelines and applications that process streams of records.
Apache Kafka is ideal for high-throughput, fault-tolerant, and scalable event streaming scenarios.
Installation
What is Apache Kafka?
Kafka is a distributed streaming platform that:
Publishes and subscribes to streams of records
Stores streams of records durably and fault-tolerantly
Processes streams of records as they occur
Use Cases
Event Streaming Real-time data pipelines
Log Aggregation Centralized logging from multiple sources
Metrics Collection High-volume telemetry and metrics
Stream Processing Real-time analytics and transformations
What’s Generated
This module generates:
Kafka Configuration - Connection and broker setup
Kafka Producers - Message publishing infrastructure
Kafka Consumers - Message consumption with background services
Event Dispatchers - Route messages to handlers
Message Bus - IEventBus implementation
Dependency Injection - Service registration
Core Concepts
Topics are categories or feed names to which records are published:
// Events published to topic "order-events"
public class OrderCreatedEvent
{
public Guid OrderId { get ; set ; }
public Guid CustomerId { get ; set ; }
public decimal TotalAmount { get ; set ; }
}
Partitions
Topics are divided into partitions for parallel processing:
Topic: order-events
Partition 0: [msg1, msg2, msg3]
Partition 1: [msg4, msg5, msg6]
Partition 2: [msg7, msg8, msg9]
Consumer Groups
Multiple consumers work together to process messages:
// Consumer Group: "order-processing-service"
// - Consumer 1 reads Partition 0
// - Consumer 2 reads Partition 1
// - Consumer 3 reads Partition 2
Configuration
Basic Configuration
appsettings.json:
{
"Kafka" : {
"BootstrapServers" : "localhost:9092" ,
"ProducerConfig" : {
"Acks" : "all" ,
"Retries" : 3 ,
"CompressionType" : "snappy"
},
"ConsumerConfig" : {
"GroupId" : "my-service" ,
"AutoOffsetReset" : "earliest" ,
"EnableAutoCommit" : false
}
}
}
Production Configuration
appsettings.Production.json:
{
"Kafka" : {
"BootstrapServers" : "kafka1.prod.com:9092,kafka2.prod.com:9092,kafka3.prod.com:9092" ,
"SecurityProtocol" : "SaslSsl" ,
"SaslMechanism" : "Plain" ,
"SaslUsername" : "${KAFKA_USERNAME}" ,
"SaslPassword" : "${KAFKA_PASSWORD}" ,
"ProducerConfig" : {
"Acks" : "all" ,
"Retries" : 10 ,
"MaxInFlight" : 5 ,
"CompressionType" : "lz4" ,
"LingerMs" : 10 ,
"BatchSize" : 16384
},
"ConsumerConfig" : {
"GroupId" : "order-service-prod" ,
"AutoOffsetReset" : "earliest" ,
"EnableAutoCommit" : false ,
"MaxPollIntervalMs" : 300000 ,
"SessionTimeoutMs" : 10000
}
}
}
Modeling Events
In the Services Designer
Create Integration Events in the Eventing designer
Define properties for your events
Model Event Handlers that subscribe to Kafka topics
Configure topic mappings using stereotypes
Example Event
public class OrderCreatedEvent
{
public Guid OrderId { get ; set ; }
public Guid CustomerId { get ; set ; }
public decimal TotalAmount { get ; set ; }
public DateTime CreatedAt { get ; set ; }
public List < OrderItem > Items { get ; set ; }
}
public class OrderItem
{
public string ProductId { get ; set ; }
public int Quantity { get ; set ; }
public decimal Price { get ; set ; }
}
Publishing Events
Using IEventBus
public class OrderService
{
private readonly IEventBus _eventBus ;
private readonly IOrderRepository _repository ;
public async Task CreateOrderAsync (
CreateOrderCommand command ,
CancellationToken cancellationToken )
{
// Create order
var order = await _repository . CreateAsync ( command , cancellationToken );
// Publish to Kafka
await _eventBus . PublishAsync (
new OrderCreatedEvent
{
OrderId = order . Id ,
CustomerId = order . CustomerId ,
TotalAmount = order . TotalAmount ,
CreatedAt = DateTime . UtcNow ,
Items = order . Items . Select ( i => new OrderItem
{
ProductId = i . ProductId ,
Quantity = i . Quantity ,
Price = i . Price
}). ToList ()
},
cancellationToken );
}
}
Direct Producer Usage
public class KafkaPublisher
{
private readonly IKafkaProducer _producer ;
public async Task PublishAsync < T >(
string topic ,
string key ,
T message ,
CancellationToken cancellationToken ) where T : class
{
var result = await _producer . ProduceAsync (
topic ,
key ,
message ,
cancellationToken );
_logger . LogInformation (
"Published to {Topic} partition {Partition} offset {Offset}" ,
result . Topic ,
result . Partition ,
result . Offset );
}
}
Partition Keys
Control message partitioning:
public async Task PublishOrderEventAsync (
OrderCreatedEvent orderEvent ,
CancellationToken cancellationToken )
{
// Use CustomerId as partition key to ensure
// all orders for a customer go to same partition (ordered)
await _producer . ProduceAsync (
topic : "order-events" ,
key : orderEvent . CustomerId . ToString (),
message : orderEvent ,
cancellationToken );
}
Consuming Events
Generated Consumer
public class OrderCreatedEventConsumer : IKafkaConsumer < OrderCreatedEvent >
{
private readonly IKafkaEventDispatcher _dispatcher ;
private readonly ILogger < OrderCreatedEventConsumer > _logger ;
public async Task HandleAsync (
ConsumeResult < string , OrderCreatedEvent > result ,
CancellationToken cancellationToken )
{
_logger . LogInformation (
"Received from {Topic} partition {Partition} offset {Offset}" ,
result . Topic ,
result . Partition ,
result . Offset );
await _dispatcher . DispatchAsync ( result . Message . Value , cancellationToken );
}
}
Event Handler
[ IntentManaged ( Mode . Merge , Signature = Mode . Fully )]
public class OrderCreatedEventHandler
: IIntegrationEventHandler < OrderCreatedEvent >
{
private readonly IInventoryService _inventoryService ;
private readonly ILogger < OrderCreatedEventHandler > _logger ;
[ IntentManaged ( Mode . Fully , Body = Mode . Ignore )]
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
_logger . LogInformation (
"Processing order {OrderId} for customer {CustomerId}" ,
message . OrderId ,
message . CustomerId );
// Reserve inventory for all items
foreach ( var item in message . Items )
{
await _inventoryService . ReserveStockAsync (
item . ProductId ,
item . Quantity ,
cancellationToken );
}
_logger . LogInformation (
"Order {OrderId} processed successfully" ,
message . OrderId );
}
}
Background Service
Generated Kafka consumer background service:
public class KafkaConsumerBackgroundService : BackgroundService
{
private readonly IServiceProvider _serviceProvider ;
private readonly ILogger < KafkaConsumerBackgroundService > _logger ;
protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
{
using var scope = _serviceProvider . CreateScope ();
var consumer = scope . ServiceProvider
. GetRequiredService < IKafkaConsumer < OrderCreatedEvent >>();
while ( ! stoppingToken . IsCancellationRequested )
{
try
{
var result = consumer . Consume ( stoppingToken );
if ( result != null )
{
await consumer . HandleAsync ( result , stoppingToken );
consumer . Commit ( result );
}
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Error consuming message" );
await Task . Delay ( 1000 , stoppingToken ); // Back off
}
}
}
}
Offset Management
Manual Commit
Recommended for reliability:
public class ManualCommitConsumer : IKafkaConsumer < OrderCreatedEvent >
{
public async Task HandleAsync (
ConsumeResult < string , OrderCreatedEvent > result ,
CancellationToken cancellationToken )
{
try
{
// Process message
await ProcessOrderAsync ( result . Message . Value , cancellationToken );
// Commit only after successful processing
_consumer . Commit ( result );
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Failed to process message" );
// Don't commit - message will be reprocessed
throw ;
}
}
}
Auto Commit (Not Recommended)
{
"Kafka" : {
"ConsumerConfig" : {
"EnableAutoCommit" : true ,
"AutoCommitIntervalMs" : 5000
}
}
}
Auto-commit can lead to message loss if processing fails between commits. Always use manual commit for critical data.
Message Ordering
Guaranteed Ordering
Messages in the same partition are guaranteed to be ordered:
// Ensure all customer events are ordered
public async Task PublishCustomerEventAsync < T >(
Guid customerId ,
T eventData ,
CancellationToken cancellationToken ) where T : class
{
await _producer . ProduceAsync (
topic : "customer-events" ,
key : customerId . ToString (), // Same key = same partition
message : eventData ,
cancellationToken );
}
No Ordering Across Partitions
Messages across different partitions have no ordering guarantees:
Partition 0: [Event1, Event2, Event3]
Partition 1: [Event4, Event5, Event6]
# Processing order could be:
Event1, Event4, Event2, Event5, Event3, Event6
Error Handling
Retry Logic
public async Task HandleAsync (
ConsumeResult < string , OrderCreatedEvent > result ,
CancellationToken cancellationToken )
{
var retryCount = 0 ;
var maxRetries = 3 ;
while ( retryCount < maxRetries )
{
try
{
await ProcessMessageAsync ( result . Message . Value , cancellationToken );
_consumer . Commit ( result );
return ;
}
catch ( TransientException ex )
{
retryCount ++ ;
if ( retryCount >= maxRetries )
{
_logger . LogError ( ex , "Max retries exceeded" );
await SendToDeadLetterAsync ( result );
_consumer . Commit ( result ); // Move past poison message
}
else
{
_logger . LogWarning ( ex , "Retry {Count}/{Max}" ,
retryCount , maxRetries );
await Task . Delay ( TimeSpan . FromSeconds ( Math . Pow ( 2 , retryCount )));
}
}
}
}
Dead Letter Topic
public async Task SendToDeadLetterAsync < T >(
ConsumeResult < string , T > failedMessage ) where T : class
{
var deadLetterTopic = $" { failedMessage . Topic } .dead-letter" ;
var headers = new Headers
{
{ "original-topic" , Encoding . UTF8 . GetBytes ( failedMessage . Topic ) },
{ "original-partition" , BitConverter . GetBytes ( failedMessage . Partition . Value ) },
{ "original-offset" , BitConverter . GetBytes ( failedMessage . Offset . Value ) },
{ "error-timestamp" , Encoding . UTF8 . GetBytes ( DateTime . UtcNow . ToString ( "O" )) }
};
await _producer . ProduceAsync (
deadLetterTopic ,
failedMessage . Message . Key ,
failedMessage . Message . Value ,
headers );
}
Producer Batching
{
"Kafka" : {
"ProducerConfig" : {
"LingerMs" : 10 ,
"BatchSize" : 16384 ,
"CompressionType" : "lz4"
}
}
}
Consumer Parallelism
public class ParallelConsumerService : BackgroundService
{
protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
{
var tasks = Enumerable . Range ( 0 , 3 ). Select ( i =>
ConsumeLoopAsync ( i , stoppingToken ));
await Task . WhenAll ( tasks );
}
private async Task ConsumeLoopAsync ( int instanceId , CancellationToken cancellationToken )
{
using var scope = _serviceProvider . CreateScope ();
var consumer = scope . ServiceProvider . GetRequiredService < IKafkaConsumer >();
while ( ! cancellationToken . IsCancellationRequested )
{
var result = consumer . Consume ( cancellationToken );
if ( result != null )
{
await consumer . HandleAsync ( result , cancellationToken );
consumer . Commit ( result );
}
}
}
}
Partition Assignment
public class CustomPartitionAssigner
{
public int GetPartition ( string key , int partitionCount )
{
// Consistent hashing for even distribution
return Math . Abs ( key . GetHashCode ()) % partitionCount ;
}
}
Local Development
Kafka with Docker
docker-compose.yml:
version : '3.8'
services :
zookeeper :
image : confluentinc/cp-zookeeper:latest
environment :
ZOOKEEPER_CLIENT_PORT : 2181
ZOOKEEPER_TICK_TIME : 2000
ports :
- "2181:2181"
kafka :
image : confluentinc/cp-kafka:latest
depends_on :
- zookeeper
ports :
- "9092:9092"
environment :
KAFKA_BROKER_ID : 1
KAFKA_ZOOKEEPER_CONNECT : zookeeper:2181
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE : "true"
Start Kafka:
AKHQ (Kafka HQ):
services :
akhq :
image : tchiotludo/akhq
ports :
- "8080:8080"
environment :
AKHQ_CONFIGURATION : |
akhq:
connections:
docker-kafka:
properties:
bootstrap.servers: "kafka:9092"
Access: http://localhost:8080
Best Practices
Design all consumers to be idempotent: public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
var processed = await _cache . GetAsync (
$"order:processed: { message . OrderId } " ,
cancellationToken );
if ( processed != null )
{
_logger . LogInformation ( "Order {OrderId} already processed" ,
message . OrderId );
return ;
}
await ProcessOrderAsync ( message , cancellationToken );
await _cache . SetAsync (
$"order:processed: { message . OrderId } " ,
true ,
TimeSpan . FromDays ( 7 ),
cancellationToken );
}
Choose partition keys wisely:
Use entity ID for ordered processing per entity
Avoid hot keys (uneven distribution)
Consider cardinality (number of unique keys)
// Good - even distribution, maintains order per customer
key : customerId . ToString ()
// Bad - all messages to one partition
key : "orders"
// Bad - too many partitions underutilized
key : Guid . NewGuid (). ToString ()
Keep messages reasonably sized:
Default max: 1 MB
Prefer smaller messages for better throughput
Use references for large payloads
// Good
public class OrderCreatedEvent
{
public Guid OrderId { get ; set ; }
public string BlobReference { get ; set ; } // Reference to full order data
}
// Avoid
public class OrderCreatedEvent
{
public byte [] FullOrderDocument { get ; set ; } // Large binary data
}
Track key metrics:
Consumer lag
Message processing time
Error rates
Partition distribution
_metrics . TrackMetric ( "kafka.consumer.lag" , consumerLag );
_metrics . TrackMetric ( "kafka.message.processing.time" , stopwatch . ElapsedMilliseconds );
Monitoring
Consumer Lag
public class ConsumerLagMonitor
{
public async Task < long > GetConsumerLagAsync (
string groupId ,
string topic )
{
using var adminClient = new AdminClientBuilder ( config ). Build ();
var committed = await adminClient . ListConsumerGroupOffsetsAsync (
groupId ,
new [] { new TopicPartition ( topic , 0 ) });
var watermarks = _consumer . QueryWatermarkOffsets (
new TopicPartition ( topic , 0 ),
TimeSpan . FromSeconds ( 10 ));
return watermarks . High . Value - committed . Offsets . First (). Offset . Value ;
}
}
Application Insights
_telemetry . TrackEvent ( "KafkaMessageConsumed" , new Dictionary < string , string >
{
{ "Topic" , result . Topic },
{ "Partition" , result . Partition . ToString () },
{ "Offset" , result . Offset . ToString () },
{ "MessageType" , typeof ( OrderCreatedEvent ). Name },
{ "ProcessingTime" , stopwatch . ElapsedMilliseconds . ToString () }
});
Resources
Apache Kafka Docs Official documentation
Confluent .NET Client .NET client documentation
Eventing Contracts Integration events module
Best Practices Kafka design principles