Skip to main content

Overview

RabbitMQ provides asynchronous, event-driven communication between microservices. The key example is the BasketCheckoutEvent flow from Basket.API to Ordering.API.

Event Flow Architecture

Event Definitions

Base Integration Event

In BuildingBlocks.Messaging/Events/IntegrationEvent.cs:
namespace BuildingBlocks.Messaging.Events;

public record IntegrationEvent
{
    public Guid Id => Guid.NewGuid();
    public DateTime OccurredOn => DateTime.Now;
    public string EventType => GetType().AssemblyQualifiedName;
}

BasketCheckoutEvent

In BuildingBlocks.Messaging/Events/BasketCheckoutEvent.cs:
namespace BuildingBlocks.Messaging.Events;

public record BasketCheckoutEvent : IntegrationEvent
{
    public string UserName { get; set; } = default!;
    public Guid CustomerId { get; set; } = default!;
    public decimal TotalPrice { get; set; } = default!;

    // Shipping and BillingAddress
    public string FirstName { get; set; } = default!;
    public string LastName { get; set; } = default!;
    public string EmailAddress { get; set; } = default!;
    public string AddressLine { get; set; } = default!;
    public string Country { get; set; } = default!;
    public string State { get; set; } = default!;
    public string ZipCode { get; set; } = default!;

    // Payment
    public string CardName { get; set; } = default!;
    public string CardNumber { get; set; } = default!;
    public string Expiration { get; set; } = default!;
    public string CVV { get; set; } = default!;
    public int PaymentMethod { get; set; } = default!;
}

Publisher Implementation (Basket.API)

Publishing Events

In Basket.API/Basket/CheckoutBasket/CheckoutBasketHandler.cs:
using BuildingBlocks.Messaging.Events;
using MassTransit;

namespace Basket.API.Basket.CheckoutBasket;

public record CheckoutBasketCommand(BasketCheckoutDto BasketCheckoutDto) 
    : ICommand<CheckoutBasketResult>;
public record CheckoutBasketResult(bool IsSuccess);

public class CheckoutBasketCommandHandler
    (IBasketRepository repository, IPublishEndpoint publishEndpoint)
    : ICommandHandler<CheckoutBasketCommand, CheckoutBasketResult>
{
    public async Task<CheckoutBasketResult> Handle(
        CheckoutBasketCommand command, 
        CancellationToken cancellationToken)
    {
        // Get existing basket with total price
        var basket = await repository.GetBasket(
            command.BasketCheckoutDto.UserName, 
            cancellationToken);
        
        if (basket == null)
        {
            return new CheckoutBasketResult(false);
        }

        // Set totalprice on basketcheckout event message
        var eventMessage = command.BasketCheckoutDto.Adapt<BasketCheckoutEvent>();
        eventMessage.TotalPrice = basket.TotalPrice;

        // Send basket checkout event to rabbitmq using masstransit
        await publishEndpoint.Publish(eventMessage, cancellationToken);

        // Delete the basket
        await repository.DeleteBasket(
            command.BasketCheckoutDto.UserName, 
            cancellationToken);

        return new CheckoutBasketResult(true);
    }
}

Publisher Configuration

In Basket.API/Program.cs:
using BuildingBlocks.Messaging.MassTransit;

var builder = WebApplication.CreateBuilder(args);

// Async Communication Services
builder.Services.AddMessageBroker(builder.Configuration);

Consumer Implementation (Ordering.API)

Event Handler

In Ordering.Application/Orders/EventHandlers/Integration/BasketCheckoutEventHandler.cs:
using BuildingBlocks.Messaging.Events;
using MassTransit;
using Ordering.Application.Orders.Commands.CreateOrder;

namespace Ordering.Application.Orders.EventHandlers.Integration;

public class BasketCheckoutEventHandler
    (ISender sender, ILogger<BasketCheckoutEventHandler> logger)
    : IConsumer<BasketCheckoutEvent>
{
    public async Task Consume(ConsumeContext<BasketCheckoutEvent> context)
    {
        logger.LogInformation(
            "Integration Event handled: {IntegrationEvent}", 
            context.Message.GetType().Name);

        var command = MapToCreateOrderCommand(context.Message);
        await sender.Send(command);
    }

    private CreateOrderCommand MapToCreateOrderCommand(BasketCheckoutEvent message)
    {
        // Create full order with incoming event data
        var addressDto = new AddressDto(
            message.FirstName, 
            message.LastName, 
            message.EmailAddress, 
            message.AddressLine, 
            message.Country, 
            message.State, 
            message.ZipCode);
            
        var paymentDto = new PaymentDto(
            message.CardName, 
            message.CardNumber, 
            message.Expiration, 
            message.CVV, 
            message.PaymentMethod);
            
        var orderId = Guid.NewGuid();

        var orderDto = new OrderDto(
            Id: orderId,
            CustomerId: message.CustomerId,
            OrderName: message.UserName,
            ShippingAddress: addressDto,
            BillingAddress: addressDto,
            Payment: paymentDto,
            Status: Ordering.Domain.Enums.OrderStatus.Pending,
            OrderItems:
            [
                new OrderItemDto(orderId, new Guid("5334c996-8457-4cf0-815c-ed2b77c4ff61"), 2, 500),
                new OrderItemDto(orderId, new Guid("c67d6323-e8b1-4bdf-9a75-b0d0d2e7e914"), 1, 400)
            ]);

        return new CreateOrderCommand(orderDto);
    }
}

Consumer Configuration

In Ordering.Application/DependencyInjection.cs:
using BuildingBlocks.Messaging.MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;

namespace Ordering.Application;

public static class DependencyInjection
{
    public static IServiceCollection AddApplicationServices
        (this IServiceCollection services, IConfiguration configuration)
    {
        services.AddMediatR(config =>
        {
            config.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly());
            config.AddOpenBehavior(typeof(ValidationBehavior<,>));
            config.AddOpenBehavior(typeof(LoggingBehavior<,>));
        });

        // Register message broker with consumer assembly
        services.AddMessageBroker(configuration, Assembly.GetExecutingAssembly());

        return services;
    }
}

Configuration

In appsettings.json (both Basket.API and Ordering.API):
{
  "MessageBroker": {
    "Host": "amqp://localhost:5672",
    "UserName": "guest",
    "Password": "guest"
  }
}

Message Flow Details

1. Event Publishing

// MassTransit automatically:
// 1. Serializes the event to JSON
// 2. Determines the exchange name from event type (basket-checkout-event)
// 3. Publishes to RabbitMQ exchange
// 4. Adds metadata (MessageId, CorrelationId, Timestamp)
await publishEndpoint.Publish(eventMessage, cancellationToken);

2. Exchange and Queue Creation

MassTransit automatically creates:
  • Exchange: BasketCheckoutEvent (fanout)
  • Queue: ordering-basket-checkout-event (kebab-case naming)
  • Binding: Queue bound to exchange

3. Message Consumption

// MassTransit automatically:
// 1. Receives message from queue
// 2. Deserializes JSON to BasketCheckoutEvent
// 3. Calls Consume method
// 4. Acknowledges message on success
// 5. Moves to error queue on failure (after retries)
public async Task Consume(ConsumeContext<BasketCheckoutEvent> context)
{
    // Process the event
}

Key Benefits

Publisher (Basket) doesn’t know about consumers (Ordering). New consumers can be added without changing the publisher.
Multiple instances of the Ordering service can consume from the same queue, distributing the load.
RabbitMQ persists messages to disk. If the consumer is down, messages wait in the queue.
The Basket service returns immediately after publishing. Order creation happens asynchronously.

Error Handling

Automatic Retries

MassTransit handles transient failures with automatic retries:
// If Consume() throws an exception:
// 1. Message is redelivered (up to configured retry count)
// 2. Exponential backoff between retries
// 3. After max retries, message moves to error queue

Error Queue

Failed messages go to: ordering-basket-checkout-event_error You can inspect and reprocess them:
# View messages in error queue
rabbitmqadmin get queue=ordering-basket-checkout-event_error

# Reprocess from error queue
# (requires custom tooling or MassTransit.MessageData)

Monitoring

RabbitMQ Management UI

Access at http://localhost:15672:
  • View exchanges and queues
  • Monitor message rates
  • Inspect message contents
  • Track consumer connections

Logging

Both publisher and consumer log events:
logger.LogInformation(
    "Integration Event handled: {IntegrationEvent}", 
    context.Message.GetType().Name);

Best Practices

Always delete sensitive data (like CVV) from events or encrypt them. Events are stored in RabbitMQ and logs.
Use correlation IDs to trace events across services:
await publishEndpoint.Publish(eventMessage, ctx => 
{
    ctx.CorrelationId = orderId;
});
Make events immutable using record types. This prevents accidental modifications during processing.

Testing

Publishing Test Events

[Fact]
public async Task Should_Publish_BasketCheckoutEvent()
{
    var harness = new InMemoryTestHarness();
    await harness.Start();

    await harness.Bus.Publish(new BasketCheckoutEvent { /* ... */ });

    Assert.True(await harness.Published.Any<BasketCheckoutEvent>());

    await harness.Stop();
}

Testing Consumers

[Fact]
public async Task Should_Consume_BasketCheckoutEvent()
{
    var harness = new InMemoryTestHarness();
    var consumerHarness = harness.Consumer<BasketCheckoutEventHandler>();

    await harness.Start();

    await harness.Bus.Publish(new BasketCheckoutEvent { /* ... */ });

    Assert.True(await consumerHarness.Consumed.Any<BasketCheckoutEvent>());

    await harness.Stop();
}

Next Steps

MassTransit Configuration

Deep dive into MassTransit setup and configuration

gRPC Communication

Learn about synchronous inter-service communication

Build docs developers (and LLMs) love