Skip to main content

Event-Driven Architecture

SpecKit uses Apache Kafka as its event bus to enable asynchronous communication between microservices. This event-driven approach allows services to react to state changes without tight coupling, enabling scalable and resilient workflows.

Event Choreography Overview

Kafka Topics & Events

Producer: Inventory Service
Consumers: Ordering Service, Catalog Service
Event Schema:
public record ReservationCreatedEvent
{
    [JsonPropertyName("eventId")]
    public string EventId { get; init; } = string.Empty;
    
    [JsonPropertyName("reservationId")]
    public string ReservationId { get; init; } = string.Empty;
    
    [JsonPropertyName("customerId")]
    public string? CustomerId { get; init; }
    
    [JsonPropertyName("seatId")]
    public string SeatId { get; init; } = string.Empty;
    
    [JsonPropertyName("seatNumber")]
    public string SeatNumber { get; init; } = string.Empty;
    
    [JsonPropertyName("section")]
    public string Section { get; init; } = string.Empty;
    
    [JsonPropertyName("basePrice")]
    public decimal BasePrice { get; init; }
    
    [JsonPropertyName("createdAt")]
    public DateTime CreatedAt { get; init; }
    
    [JsonPropertyName("expiresAt")]
    public DateTime ExpiresAt { get; init; }
    
    [JsonPropertyName("status")]
    public string Status { get; init; } = "active";
}
Publishing Code:
// services/inventory/src/Application/UseCases/CreateReservation/CreateReservationCommandHandler.cs
private async Task PublishReservationCreatedEvent(
    Reservation reservation, 
    Seat seat, 
    CancellationToken cancellationToken)
{
    var @event = new ReservationCreatedEvent(
        EventId: Guid.NewGuid().ToString("D"),
        ReservationId: reservation.Id.ToString("D"),
        CustomerId: reservation.CustomerId,
        SeatId: reservation.SeatId.ToString("D"),
        SeatNumber: $"{seat.Section}-{seat.Row}-{seat.Number}",
        Section: seat.Section,
        BasePrice: 0m,
        CreatedAt: reservation.CreatedAt,
        ExpiresAt: reservation.ExpiresAt,
        Status: reservation.Status
    );

    var json = JsonSerializer.Serialize(@event, _jsonOptions);
    await _kafkaProducer.ProduceAsync(
        "reservation-created", 
        json, 
        reservation.SeatId.ToString("N")
    );
}
Use Case:
  • Notify other services that a seat has been reserved
  • Ordering service adds reservation to cart
  • Catalog service updates seat availability display

Producer Implementation

// services/inventory/src/Inventory.Domain/Ports/IKafkaProducer.cs
namespace Inventory.Domain.Ports;

public interface IKafkaProducer
{
    /// <summary>
    /// Publishes a message to the specified Kafka topic.
    /// </summary>
    /// <param name="topicName">The name of the Kafka topic</param>
    /// <param name="message">The message payload (typically JSON)</param>
    /// <param name="key">Optional message key for partitioning</param>
    Task ProduceAsync(string topicName, string message, string? key = null);
}
Design Benefits:
  • Domain layer depends on abstraction, not Kafka library
  • Easy to mock for unit testing
  • Can swap Kafka for alternative message brokers
// services/inventory/src/Inventory.Infrastructure/Messaging/KafkaProducer.cs
using Confluent.Kafka;
using Inventory.Domain.Ports;

namespace Inventory.Infrastructure.Messaging;

public class KafkaProducer : IKafkaProducer
{
    private readonly IProducer<string?, string> _producer;
    private readonly ILogger<KafkaProducer> _logger;

    public KafkaProducer(
        IProducer<string?, string> producer,
        ILogger<KafkaProducer> logger)
    {
        _producer = producer ?? throw new ArgumentNullException(nameof(producer));
        _logger = logger;
    }

    public async Task ProduceAsync(
        string topicName, 
        string message, 
        string? key = null)
    {
        try
        {
            var deliveryReport = await _producer.ProduceAsync(
                topicName,
                new Message<string?, string>
                {
                    Key = key,
                    Value = message
                }
            );

            _logger.LogInformation(
                "Delivered message to {Topic} [{Partition}] @ {Offset}",
                deliveryReport.Topic,
                deliveryReport.Partition,
                deliveryReport.Offset
            );
        }
        catch (ProduceException<string?, string> ex)
        {
            _logger.LogError(ex, 
                "Failed to deliver message to {Topic}: {Error}",
                topicName,
                ex.Error.Reason
            );
            throw;
        }
    }
}
Configuration:
// services/inventory/src/Infrastructure/ServiceCollectionExtensions.cs
var kafkaConfig = new ProducerConfig
{
    BootstrapServers = configuration.GetConnectionString("Kafka"),
    AllowAutoCreateTopics = true,
    Acks = Acks.All  // Wait for all replicas to acknowledge
};

var producer = new ProducerBuilder<string?, string>(kafkaConfig).Build();
services.AddSingleton(producer);
services.AddSingleton<IKafkaProducer, KafkaProducer>();

Consumer Implementation

// services/ordering/src/Infrastructure/Events/ReservationEventConsumer.cs
using System.Text.Json;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class ReservationEventConsumer : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<ReservationEventConsumer> _logger;
    private readonly KafkaOptions _kafkaOptions;
    private readonly JsonSerializerOptions _jsonOptions;

    public ReservationEventConsumer(
        IServiceProvider serviceProvider,
        ILogger<ReservationEventConsumer> logger,
        IOptions<KafkaOptions> kafkaOptions)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;
        _kafkaOptions = kafkaOptions.Value;
        
        _jsonOptions = new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        };
    }

    protected override async Task ExecuteAsync(
        CancellationToken stoppingToken)
    {
        if (!_kafkaOptions.EnableConsumer)
        {
            _logger.LogInformation("Kafka consumer is disabled");
            return;
        }

        await Task.Delay(2000, stoppingToken); // Wait for Kafka readiness

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ConsumeMessagesAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "Kafka consumer error. Retrying...");
                await Task.Delay(3000, stoppingToken);
            }
        }
    }

    private async Task ConsumeMessagesAsync(
        CancellationToken stoppingToken)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = _kafkaOptions.BootstrapServers,
            GroupId = _kafkaOptions.ConsumerGroupId,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = true
        };

        using var consumer = new ConsumerBuilder<string, string>(config)
            .SetErrorHandler((_, e) => 
                _logger.LogError("Kafka error: {Error}", e.Reason))
            .Build();

        consumer.Subscribe(new[] { 
            "reservation-created", 
            "reservation-expired",
            "payment-succeeded" 
        });
        
        _logger.LogInformation(
            "Started consuming events from Kafka topics"
        );

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var consumeResult = consumer.Consume(stoppingToken);
                
                if (consumeResult?.Message?.Value != null)
                {
                    await ProcessMessage(
                        consumeResult.Topic, 
                        consumeResult.Message.Value, 
                        stoppingToken
                    );
                }
            }
            catch (ConsumeException ex)
            {
                _logger.LogError(ex, "Error consuming Kafka message");
            }
        }

        consumer.Close();
    }

    private async Task ProcessMessage(
        string topic, 
        string messageValue, 
        CancellationToken cancellationToken)
    {
        using var scope = _serviceProvider.CreateScope();
        var reservationStore = scope.ServiceProvider
            .GetRequiredService<ReservationStore>();

        switch (topic)
        {
            case "reservation-created":
                var createdEvent = JsonSerializer
                    .Deserialize<ReservationCreatedEvent>(
                        messageValue, 
                        _jsonOptions
                    );
                
                if (createdEvent != null)
                {
                    reservationStore.AddReservation(createdEvent);
                    _logger.LogInformation(
                        "Processed reservation-created for {ReservationId}",
                        createdEvent.ReservationId
                    );
                }
                break;

            case "reservation-expired":
                var expiredEvent = JsonSerializer
                    .Deserialize<ReservationExpiredEvent>(
                        messageValue, 
                        _jsonOptions
                    );
                
                if (expiredEvent != null)
                {
                    reservationStore.RemoveReservation(expiredEvent);
                    _logger.LogInformation(
                        "Processed reservation-expired for {ReservationId}",
                        expiredEvent.ReservationId
                    );
                }
                break;

            case "payment-succeeded":
                var paymentEvent = JsonSerializer
                    .Deserialize<PaymentSucceededEvent>(
                        messageValue, 
                        _jsonOptions
                    );
                
                if (paymentEvent != null)
                {
                    await UpdateOrderToPaid(
                        paymentEvent.OrderId, 
                        scope, 
                        cancellationToken
                    );
                }
                break;
        }
    }
}
Registration:
// services/ordering/src/Infrastructure/ServiceCollectionExtensions.cs
services.AddHostedService<ReservationEventConsumer>();

Event Patterns & Best Practices

Problem: Events may be delivered more than onceSolution: Make consumers idempotent
// Using in-memory dictionary with reservation ID as key
public void AddReservation(ReservationCreatedEvent @event)
{
    // Idempotent: Adding same reservation twice has same effect
    _activeReservations[@event.ReservationId] = @event;
}

// Alternative: Check database before processing
var existing = await _repository.GetByIdAsync(@event.ReservationId);
if (existing == null)
{
    // Process event only if not already processed
    await _repository.CreateAsync(newEntity);
}
Database-Level Idempotency:
// Track processed events
public class ProcessedEvent
{
    public string EventId { get; set; }
    public DateTime ProcessedAt { get; set; }
}

// Before processing
var alreadyProcessed = await _context.ProcessedEvents
    .AnyAsync(e => e.EventId == @event.EventId);

if (!alreadyProcessed)
{
    // Process event
    // ...
    
    // Mark as processed
    _context.ProcessedEvents.Add(new ProcessedEvent 
    { 
        EventId = @event.EventId,
        ProcessedAt = DateTime.UtcNow 
    });
    await _context.SaveChangesAsync();
}

Saga Pattern (Future Enhancement)

Current Approach: ChoreographyServices react to events independently:
Inventory → reservation-created → Ordering (adds to cart)

                              Catalog (updates availability)
Benefits:
  • Services remain decoupled
  • Easy to add new consumers
  • No central point of failure
Drawbacks:
  • Hard to track overall workflow state
  • No built-in rollback mechanism

Alternative: Orchestration (Future)Central orchestrator manages workflow:
// Hypothetical OrderSaga orchestrator
public class OrderSaga
{
    public async Task ExecuteAsync(CreateOrderCommand command)
    {
        try
        {
            // Step 1: Reserve seat
            var reservation = await _inventoryService.ReserveSeatAsync(seatId);
            
            // Step 2: Create order
            var order = await _orderingService.CreateOrderAsync(reservation);
            
            // Step 3: Process payment
            var payment = await _paymentService.ProcessPaymentAsync(order);
            
            // Step 4: Issue ticket
            var ticket = await _fulfillmentService.IssueTicketAsync(payment);
        }
        catch (Exception ex)
        {
            // Compensate: rollback all steps
            await CompensateAsync();
        }
    }
}
When to Consider:
  • Complex multi-step transactions
  • Need for guaranteed compensation
  • Require audit trail of workflow state

Monitoring & Observability

Event Tracing

Use eventId to trace events across services:
var @event = new ReservationCreatedEvent(
    EventId: Guid.NewGuid().ToString("D"),
    // ...
);

_logger.LogInformation(
    "Published event {EventId} to {Topic}",
    @event.EventId,
    "reservation-created"
);
Consumer logs:
_logger.LogInformation(
    "Processing event {EventId} from {Topic}",
    @event.EventId,
    topic
);

Kafka Metrics

Monitor:
  • Consumer lag (messages behind)
  • Throughput (messages/second)
  • Error rates
  • Partition distribution
Tools:
  • Kafka Manager
  • Confluent Control Center
  • Prometheus + Grafana

Microservices Design

See how services are structured and deployed

CQRS Pattern

Learn about command/query separation

Hexagonal Architecture

Understand ports and adapters for messaging

System Architecture

View complete architecture overview

Build docs developers (and LLMs) love