Skip to main content

Overview

The Kafka module provides patterns for using Apache Kafka, a distributed event streaming platform, with events modeled in Intent Architect’s designers. Kafka excels at building real-time streaming data pipelines and applications that process streams of records. Apache Kafka is ideal for high-throughput, fault-tolerant, and scalable event streaming scenarios.

Installation

Intent.Eventing.Kafka

What is Apache Kafka?

Kafka is a distributed streaming platform that:
  • Publishes and subscribes to streams of records
  • Stores streams of records durably and fault-tolerantly
  • Processes streams of records as they occur

Use Cases

Event Streaming

Real-time data pipelines

Log Aggregation

Centralized logging from multiple sources

Metrics Collection

High-volume telemetry and metrics

Stream Processing

Real-time analytics and transformations

What’s Generated

This module generates:
  • Kafka Configuration - Connection and broker setup
  • Kafka Producers - Message publishing infrastructure
  • Kafka Consumers - Message consumption with background services
  • Event Dispatchers - Route messages to handlers
  • Message Bus - IEventBus implementation
  • Dependency Injection - Service registration

Core Concepts

Topics

Topics are categories or feed names to which records are published:
// Events published to topic "order-events"
public class OrderCreatedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
}

Partitions

Topics are divided into partitions for parallel processing:
Topic: order-events
Partition 0: [msg1, msg2, msg3]
Partition 1: [msg4, msg5, msg6]
Partition 2: [msg7, msg8, msg9]

Consumer Groups

Multiple consumers work together to process messages:
// Consumer Group: "order-processing-service"
// - Consumer 1 reads Partition 0
// - Consumer 2 reads Partition 1
// - Consumer 3 reads Partition 2

Configuration

Basic Configuration

appsettings.json:
{
  "Kafka": {
    "BootstrapServers": "localhost:9092",
    "ProducerConfig": {
      "Acks": "all",
      "Retries": 3,
      "CompressionType": "snappy"
    },
    "ConsumerConfig": {
      "GroupId": "my-service",
      "AutoOffsetReset": "earliest",
      "EnableAutoCommit": false
    }
  }
}

Production Configuration

appsettings.Production.json:
{
  "Kafka": {
    "BootstrapServers": "kafka1.prod.com:9092,kafka2.prod.com:9092,kafka3.prod.com:9092",
    "SecurityProtocol": "SaslSsl",
    "SaslMechanism": "Plain",
    "SaslUsername": "${KAFKA_USERNAME}",
    "SaslPassword": "${KAFKA_PASSWORD}",
    "ProducerConfig": {
      "Acks": "all",
      "Retries": 10,
      "MaxInFlight": 5,
      "CompressionType": "lz4",
      "LingerMs": 10,
      "BatchSize": 16384
    },
    "ConsumerConfig": {
      "GroupId": "order-service-prod",
      "AutoOffsetReset": "earliest",
      "EnableAutoCommit": false,
      "MaxPollIntervalMs": 300000,
      "SessionTimeoutMs": 10000
    }
  }
}

Modeling Events

In the Services Designer

  1. Create Integration Events in the Eventing designer
  2. Define properties for your events
  3. Model Event Handlers that subscribe to Kafka topics
  4. Configure topic mappings using stereotypes

Example Event

public class OrderCreatedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
    public List<OrderItem> Items { get; set; }
}

public class OrderItem
{
    public string ProductId { get; set; }
    public int Quantity { get; set; }
    public decimal Price { get; set; }
}

Publishing Events

Using IEventBus

public class OrderService
{
    private readonly IEventBus _eventBus;
    private readonly IOrderRepository _repository;

    public async Task CreateOrderAsync(
        CreateOrderCommand command,
        CancellationToken cancellationToken)
    {
        // Create order
        var order = await _repository.CreateAsync(command, cancellationToken);
        
        // Publish to Kafka
        await _eventBus.PublishAsync(
            new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                CreatedAt = DateTime.UtcNow,
                Items = order.Items.Select(i => new OrderItem
                {
                    ProductId = i.ProductId,
                    Quantity = i.Quantity,
                    Price = i.Price
                }).ToList()
            },
            cancellationToken);
    }
}

Direct Producer Usage

public class KafkaPublisher
{
    private readonly IKafkaProducer _producer;

    public async Task PublishAsync<T>(
        string topic,
        string key,
        T message,
        CancellationToken cancellationToken) where T : class
    {
        var result = await _producer.ProduceAsync(
            topic,
            key,
            message,
            cancellationToken);
        
        _logger.LogInformation(
            "Published to {Topic} partition {Partition} offset {Offset}",
            result.Topic,
            result.Partition,
            result.Offset);
    }
}

Partition Keys

Control message partitioning:
public async Task PublishOrderEventAsync(
    OrderCreatedEvent orderEvent,
    CancellationToken cancellationToken)
{
    // Use CustomerId as partition key to ensure
    // all orders for a customer go to same partition (ordered)
    await _producer.ProduceAsync(
        topic: "order-events",
        key: orderEvent.CustomerId.ToString(),
        message: orderEvent,
        cancellationToken);
}

Consuming Events

Generated Consumer

public class OrderCreatedEventConsumer : IKafkaConsumer<OrderCreatedEvent>
{
    private readonly IKafkaEventDispatcher _dispatcher;
    private readonly ILogger<OrderCreatedEventConsumer> _logger;

    public async Task HandleAsync(
        ConsumeResult<string, OrderCreatedEvent> result,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Received from {Topic} partition {Partition} offset {Offset}",
            result.Topic,
            result.Partition,
            result.Offset);

        await _dispatcher.DispatchAsync(result.Message.Value, cancellationToken);
    }
}

Event Handler

[IntentManaged(Mode.Merge, Signature = Mode.Fully)]
public class OrderCreatedEventHandler 
    : IIntegrationEventHandler<OrderCreatedEvent>
{
    private readonly IInventoryService _inventoryService;
    private readonly ILogger<OrderCreatedEventHandler> _logger;

    [IntentManaged(Mode.Fully, Body = Mode.Ignore)]
    public async Task HandleAsync(
        OrderCreatedEvent message,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Processing order {OrderId} for customer {CustomerId}",
            message.OrderId,
            message.CustomerId);

        // Reserve inventory for all items
        foreach (var item in message.Items)
        {
            await _inventoryService.ReserveStockAsync(
                item.ProductId,
                item.Quantity,
                cancellationToken);
        }
        
        _logger.LogInformation(
            "Order {OrderId} processed successfully",
            message.OrderId);
    }
}

Background Service

Generated Kafka consumer background service:
public class KafkaConsumerBackgroundService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<KafkaConsumerBackgroundService> _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var consumer = scope.ServiceProvider
            .GetRequiredService<IKafkaConsumer<OrderCreatedEvent>>();
        
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var result = consumer.Consume(stoppingToken);
                if (result != null)
                {
                    await consumer.HandleAsync(result, stoppingToken);
                    consumer.Commit(result);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error consuming message");
                await Task.Delay(1000, stoppingToken); // Back off
            }
        }
    }
}

Offset Management

Manual Commit

Recommended for reliability:
public class ManualCommitConsumer : IKafkaConsumer<OrderCreatedEvent>
{
    public async Task HandleAsync(
        ConsumeResult<string, OrderCreatedEvent> result,
        CancellationToken cancellationToken)
    {
        try
        {
            // Process message
            await ProcessOrderAsync(result.Message.Value, cancellationToken);
            
            // Commit only after successful processing
            _consumer.Commit(result);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message");
            // Don't commit - message will be reprocessed
            throw;
        }
    }
}
{
  "Kafka": {
    "ConsumerConfig": {
      "EnableAutoCommit": true,
      "AutoCommitIntervalMs": 5000
    }
  }
}
Auto-commit can lead to message loss if processing fails between commits. Always use manual commit for critical data.

Message Ordering

Guaranteed Ordering

Messages in the same partition are guaranteed to be ordered:
// Ensure all customer events are ordered
public async Task PublishCustomerEventAsync<T>(
    Guid customerId,
    T eventData,
    CancellationToken cancellationToken) where T : class
{
    await _producer.ProduceAsync(
        topic: "customer-events",
        key: customerId.ToString(), // Same key = same partition
        message: eventData,
        cancellationToken);
}

No Ordering Across Partitions

Messages across different partitions have no ordering guarantees:
Partition 0: [Event1, Event2, Event3]
Partition 1: [Event4, Event5, Event6]

# Processing order could be:
Event1, Event4, Event2, Event5, Event3, Event6

Error Handling

Retry Logic

public async Task HandleAsync(
    ConsumeResult<string, OrderCreatedEvent> result,
    CancellationToken cancellationToken)
{
    var retryCount = 0;
    var maxRetries = 3;
    
    while (retryCount < maxRetries)
    {
        try
        {
            await ProcessMessageAsync(result.Message.Value, cancellationToken);
            _consumer.Commit(result);
            return;
        }
        catch (TransientException ex)
        {
            retryCount++;
            if (retryCount >= maxRetries)
            {
                _logger.LogError(ex, "Max retries exceeded");
                await SendToDeadLetterAsync(result);
                _consumer.Commit(result); // Move past poison message
            }
            else
            {
                _logger.LogWarning(ex, "Retry {Count}/{Max}",
                    retryCount, maxRetries);
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retryCount)));
            }
        }
    }
}

Dead Letter Topic

public async Task SendToDeadLetterAsync<T>(
    ConsumeResult<string, T> failedMessage) where T : class
{
    var deadLetterTopic = $"{failedMessage.Topic}.dead-letter";
    
    var headers = new Headers
    {
        { "original-topic", Encoding.UTF8.GetBytes(failedMessage.Topic) },
        { "original-partition", BitConverter.GetBytes(failedMessage.Partition.Value) },
        { "original-offset", BitConverter.GetBytes(failedMessage.Offset.Value) },
        { "error-timestamp", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("O")) }
    };
    
    await _producer.ProduceAsync(
        deadLetterTopic,
        failedMessage.Message.Key,
        failedMessage.Message.Value,
        headers);
}

Performance Optimization

Producer Batching

{
  "Kafka": {
    "ProducerConfig": {
      "LingerMs": 10,
      "BatchSize": 16384,
      "CompressionType": "lz4"
    }
  }
}

Consumer Parallelism

public class ParallelConsumerService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var tasks = Enumerable.Range(0, 3).Select(i => 
            ConsumeLoopAsync(i, stoppingToken));
        
        await Task.WhenAll(tasks);
    }
    
    private async Task ConsumeLoopAsync(int instanceId, CancellationToken cancellationToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var consumer = scope.ServiceProvider.GetRequiredService<IKafkaConsumer>();
        
        while (!cancellationToken.IsCancellationRequested)
        {
            var result = consumer.Consume(cancellationToken);
            if (result != null)
            {
                await consumer.HandleAsync(result, cancellationToken);
                consumer.Commit(result);
            }
        }
    }
}

Partition Assignment

public class CustomPartitionAssigner
{
    public int GetPartition(string key, int partitionCount)
    {
        // Consistent hashing for even distribution
        return Math.Abs(key.GetHashCode()) % partitionCount;
    }
}

Local Development

Kafka with Docker

docker-compose.yml:
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Start Kafka:
docker-compose up -d

Kafka UI Tools

AKHQ (Kafka HQ):
services:
  akhq:
    image: tchiotludo/akhq
    ports:
      - "8080:8080"
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka:
              properties:
                bootstrap.servers: "kafka:9092"
Access: http://localhost:8080

Best Practices

Design all consumers to be idempotent:
public async Task HandleAsync(
    OrderCreatedEvent message,
    CancellationToken cancellationToken)
{
    var processed = await _cache.GetAsync(
        $"order:processed:{message.OrderId}",
        cancellationToken);
    
    if (processed != null)
    {
        _logger.LogInformation("Order {OrderId} already processed",
            message.OrderId);
        return;
    }
    
    await ProcessOrderAsync(message, cancellationToken);
    
    await _cache.SetAsync(
        $"order:processed:{message.OrderId}",
        true,
        TimeSpan.FromDays(7),
        cancellationToken);
}
Choose partition keys wisely:
  • Use entity ID for ordered processing per entity
  • Avoid hot keys (uneven distribution)
  • Consider cardinality (number of unique keys)
// Good - even distribution, maintains order per customer
key: customerId.ToString()

// Bad - all messages to one partition
key: "orders"

// Bad - too many partitions underutilized
key: Guid.NewGuid().ToString()
Keep messages reasonably sized:
  • Default max: 1 MB
  • Prefer smaller messages for better throughput
  • Use references for large payloads
// Good
public class OrderCreatedEvent
{
    public Guid OrderId { get; set; }
    public string BlobReference { get; set; } // Reference to full order data
}

// Avoid
public class OrderCreatedEvent
{
    public byte[] FullOrderDocument { get; set; } // Large binary data
}
Track key metrics:
  • Consumer lag
  • Message processing time
  • Error rates
  • Partition distribution
_metrics.TrackMetric("kafka.consumer.lag", consumerLag);
_metrics.TrackMetric("kafka.message.processing.time", stopwatch.ElapsedMilliseconds);

Monitoring

Consumer Lag

public class ConsumerLagMonitor
{
    public async Task<long> GetConsumerLagAsync(
        string groupId,
        string topic)
    {
        using var adminClient = new AdminClientBuilder(config).Build();
        
        var committed = await adminClient.ListConsumerGroupOffsetsAsync(
            groupId,
            new[] { new TopicPartition(topic, 0) });
        
        var watermarks = _consumer.QueryWatermarkOffsets(
            new TopicPartition(topic, 0),
            TimeSpan.FromSeconds(10));
        
        return watermarks.High.Value - committed.Offsets.First().Offset.Value;
    }
}

Application Insights

_telemetry.TrackEvent("KafkaMessageConsumed", new Dictionary<string, string>
{
    { "Topic", result.Topic },
    { "Partition", result.Partition.ToString() },
    { "Offset", result.Offset.ToString() },
    { "MessageType", typeof(OrderCreatedEvent).Name },
    { "ProcessingTime", stopwatch.ElapsedMilliseconds.ToString() }
});

Resources

Apache Kafka Docs

Official documentation

Confluent .NET Client

.NET client documentation

Eventing Contracts

Integration events module

Best Practices

Kafka design principles

Build docs developers (and LLMs) love