Overview
RabbitMQ provides asynchronous, event-driven communication between microservices. The key example is the BasketCheckoutEvent flow from Basket.API to Ordering.API.
Event Flow Architecture
Event Definitions
Base Integration Event
In BuildingBlocks.Messaging/Events/IntegrationEvent.cs:
namespace BuildingBlocks . Messaging . Events ;
public record IntegrationEvent
{
public Guid Id => Guid . NewGuid ();
public DateTime OccurredOn => DateTime . Now ;
public string EventType => GetType (). AssemblyQualifiedName ;
}
BasketCheckoutEvent
In BuildingBlocks.Messaging/Events/BasketCheckoutEvent.cs:
namespace BuildingBlocks . Messaging . Events ;
public record BasketCheckoutEvent : IntegrationEvent
{
public string UserName { get ; set ; } = default ! ;
public Guid CustomerId { get ; set ; } = default ! ;
public decimal TotalPrice { get ; set ; } = default ! ;
// Shipping and BillingAddress
public string FirstName { get ; set ; } = default ! ;
public string LastName { get ; set ; } = default ! ;
public string EmailAddress { get ; set ; } = default ! ;
public string AddressLine { get ; set ; } = default ! ;
public string Country { get ; set ; } = default ! ;
public string State { get ; set ; } = default ! ;
public string ZipCode { get ; set ; } = default ! ;
// Payment
public string CardName { get ; set ; } = default ! ;
public string CardNumber { get ; set ; } = default ! ;
public string Expiration { get ; set ; } = default ! ;
public string CVV { get ; set ; } = default ! ;
public int PaymentMethod { get ; set ; } = default ! ;
}
Publisher Implementation (Basket.API)
Publishing Events
In Basket.API/Basket/CheckoutBasket/CheckoutBasketHandler.cs:
using BuildingBlocks . Messaging . Events ;
using MassTransit ;
namespace Basket . API . Basket . CheckoutBasket ;
public record CheckoutBasketCommand ( BasketCheckoutDto BasketCheckoutDto )
: ICommand < CheckoutBasketResult >;
public record CheckoutBasketResult ( bool IsSuccess );
public class CheckoutBasketCommandHandler
( IBasketRepository repository , IPublishEndpoint publishEndpoint )
: ICommandHandler < CheckoutBasketCommand , CheckoutBasketResult >
{
public async Task < CheckoutBasketResult > Handle (
CheckoutBasketCommand command ,
CancellationToken cancellationToken )
{
// Get existing basket with total price
var basket = await repository . GetBasket (
command . BasketCheckoutDto . UserName ,
cancellationToken );
if ( basket == null )
{
return new CheckoutBasketResult ( false );
}
// Set totalprice on basketcheckout event message
var eventMessage = command . BasketCheckoutDto . Adapt < BasketCheckoutEvent >();
eventMessage . TotalPrice = basket . TotalPrice ;
// Send basket checkout event to rabbitmq using masstransit
await publishEndpoint . Publish ( eventMessage , cancellationToken );
// Delete the basket
await repository . DeleteBasket (
command . BasketCheckoutDto . UserName ,
cancellationToken );
return new CheckoutBasketResult ( true );
}
}
Publisher Configuration
In Basket.API/Program.cs:
using BuildingBlocks . Messaging . MassTransit ;
var builder = WebApplication . CreateBuilder ( args );
// Async Communication Services
builder . Services . AddMessageBroker ( builder . Configuration );
Consumer Implementation (Ordering.API)
Event Handler
In Ordering.Application/Orders/EventHandlers/Integration/BasketCheckoutEventHandler.cs:
using BuildingBlocks . Messaging . Events ;
using MassTransit ;
using Ordering . Application . Orders . Commands . CreateOrder ;
namespace Ordering . Application . Orders . EventHandlers . Integration ;
public class BasketCheckoutEventHandler
( ISender sender , ILogger < BasketCheckoutEventHandler > logger )
: IConsumer < BasketCheckoutEvent >
{
public async Task Consume ( ConsumeContext < BasketCheckoutEvent > context )
{
logger . LogInformation (
"Integration Event handled: {IntegrationEvent}" ,
context . Message . GetType (). Name );
var command = MapToCreateOrderCommand ( context . Message );
await sender . Send ( command );
}
private CreateOrderCommand MapToCreateOrderCommand ( BasketCheckoutEvent message )
{
// Create full order with incoming event data
var addressDto = new AddressDto (
message . FirstName ,
message . LastName ,
message . EmailAddress ,
message . AddressLine ,
message . Country ,
message . State ,
message . ZipCode );
var paymentDto = new PaymentDto (
message . CardName ,
message . CardNumber ,
message . Expiration ,
message . CVV ,
message . PaymentMethod );
var orderId = Guid . NewGuid ();
var orderDto = new OrderDto (
Id : orderId ,
CustomerId : message . CustomerId ,
OrderName : message . UserName ,
ShippingAddress : addressDto ,
BillingAddress : addressDto ,
Payment : paymentDto ,
Status : Ordering . Domain . Enums . OrderStatus . Pending ,
OrderItems :
[
new OrderItemDto ( orderId , new Guid ( "5334c996-8457-4cf0-815c-ed2b77c4ff61" ), 2 , 500 ),
new OrderItemDto ( orderId , new Guid ( "c67d6323-e8b1-4bdf-9a75-b0d0d2e7e914" ), 1 , 400 )
]);
return new CreateOrderCommand ( orderDto );
}
}
Consumer Configuration
In Ordering.Application/DependencyInjection.cs:
using BuildingBlocks . Messaging . MassTransit ;
using Microsoft . Extensions . Configuration ;
using Microsoft . Extensions . DependencyInjection ;
using System . Reflection ;
namespace Ordering . Application ;
public static class DependencyInjection
{
public static IServiceCollection AddApplicationServices
(this IServiceCollection services , IConfiguration configuration )
{
services . AddMediatR ( config =>
{
config . RegisterServicesFromAssembly ( Assembly . GetExecutingAssembly ());
config. AddOpenBehavior (typeof(ValidationBehavior<,>));
config. AddOpenBehavior (typeof(LoggingBehavior<,>));
});
// Register message broker with consumer assembly
services . AddMessageBroker ( configuration , Assembly . GetExecutingAssembly ());
return services ;
}
}
Configuration
In appsettings.json (both Basket.API and Ordering.API):
{
"MessageBroker" : {
"Host" : "amqp://localhost:5672" ,
"UserName" : "guest" ,
"Password" : "guest"
}
}
Message Flow Details
1. Event Publishing
// MassTransit automatically:
// 1. Serializes the event to JSON
// 2. Determines the exchange name from event type (basket-checkout-event)
// 3. Publishes to RabbitMQ exchange
// 4. Adds metadata (MessageId, CorrelationId, Timestamp)
await publishEndpoint . Publish ( eventMessage , cancellationToken );
2. Exchange and Queue Creation
MassTransit automatically creates:
Exchange : BasketCheckoutEvent (fanout)
Queue : ordering-basket-checkout-event (kebab-case naming)
Binding : Queue bound to exchange
3. Message Consumption
// MassTransit automatically:
// 1. Receives message from queue
// 2. Deserializes JSON to BasketCheckoutEvent
// 3. Calls Consume method
// 4. Acknowledges message on success
// 5. Moves to error queue on failure (after retries)
public async Task Consume ( ConsumeContext < BasketCheckoutEvent > context )
{
// Process the event
}
Key Benefits
Publisher (Basket) doesn’t know about consumers (Ordering). New consumers can be added without changing the publisher.
Multiple instances of the Ordering service can consume from the same queue, distributing the load.
RabbitMQ persists messages to disk. If the consumer is down, messages wait in the queue.
The Basket service returns immediately after publishing. Order creation happens asynchronously.
Error Handling
Automatic Retries
MassTransit handles transient failures with automatic retries:
// If Consume() throws an exception:
// 1. Message is redelivered (up to configured retry count)
// 2. Exponential backoff between retries
// 3. After max retries, message moves to error queue
Error Queue
Failed messages go to: ordering-basket-checkout-event_error
You can inspect and reprocess them:
# View messages in error queue
rabbitmqadmin get queue=ordering-basket-checkout-event_error
# Reprocess from error queue
# (requires custom tooling or MassTransit.MessageData)
Monitoring
RabbitMQ Management UI
Access at http://localhost:15672:
View exchanges and queues
Monitor message rates
Inspect message contents
Track consumer connections
Logging
Both publisher and consumer log events:
logger . LogInformation (
"Integration Event handled: {IntegrationEvent}" ,
context . Message . GetType (). Name );
Best Practices
Always delete sensitive data (like CVV) from events or encrypt them. Events are stored in RabbitMQ and logs.
Use correlation IDs to trace events across services: await publishEndpoint . Publish ( eventMessage , ctx =>
{
ctx . CorrelationId = orderId ;
});
Make events immutable using record types. This prevents accidental modifications during processing.
Testing
Publishing Test Events
[ Fact ]
public async Task Should_Publish_BasketCheckoutEvent ()
{
var harness = new InMemoryTestHarness ();
await harness . Start ();
await harness . Bus . Publish ( new BasketCheckoutEvent { /* ... */ });
Assert . True ( await harness . Published . Any < BasketCheckoutEvent >());
await harness . Stop ();
}
Testing Consumers
[ Fact ]
public async Task Should_Consume_BasketCheckoutEvent ()
{
var harness = new InMemoryTestHarness ();
var consumerHarness = harness . Consumer < BasketCheckoutEventHandler >();
await harness . Start ();
await harness . Bus . Publish ( new BasketCheckoutEvent { /* ... */ });
Assert . True ( await consumerHarness . Consumed . Any < BasketCheckoutEvent >());
await harness . Stop ();
}
Next Steps
MassTransit Configuration Deep dive into MassTransit setup and configuration
gRPC Communication Learn about synchronous inter-service communication