Skip to main content

Overview

MassTransit is a free, open-source distributed application framework for .NET. It provides an abstraction over message brokers like RabbitMQ, making it easier to implement event-driven architectures.

Architecture

BuildingBlocks.Messaging Project

The shared messaging library provides common infrastructure for all services.

Project Structure

BuildingBlocks.Messaging/
├── Events/
│   ├── IntegrationEvent.cs          # Base event class
│   └── BasketCheckoutEvent.cs       # Specific event
├── MassTransit/
│   └── Extentions.cs                # MassTransit configuration
└── BuildingBlocks.Messaging.csproj

Extension Method Implementation

In BuildingBlocks.Messaging/MassTransit/Extentions.cs:
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;

namespace BuildingBlocks.Messaging.MassTransit;

public static class Extentions
{
    public static IServiceCollection AddMessageBroker
        (this IServiceCollection services, IConfiguration configuration, Assembly? assembly = null)
    {
        services.AddMassTransit(config =>
        {
            // Set kebab-case endpoint name formatter
            // Example: BasketCheckoutEvent -> basket-checkout-event
            config.SetKebabCaseEndpointNameFormatter();

            // Register all consumers from the provided assembly
            if (assembly != null)
                config.AddConsumers(assembly);

            // Configure RabbitMQ as the transport
            config.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(new Uri(configuration["MessageBroker:Host"]!), host =>
                {
                    host.Username(configuration["MessageBroker:UserName"]);
                    host.Password(configuration["MessageBroker:Password"]);
                });
                
                // Automatically configure endpoints for all consumers
                configurator.ConfigureEndpoints(context);
            });
        });

        return services;
    }
}

Service Configuration

Publisher Configuration (Basket.API)

No consumer assembly - only publishes events:
using BuildingBlocks.Messaging.MassTransit;

var builder = WebApplication.CreateBuilder(args);

// Register MassTransit without consumers
builder.Services.AddMessageBroker(builder.Configuration);
// assembly parameter is null - no consumers to register

Consumer Configuration (Ordering.API)

With consumer assembly - consumes events:
using BuildingBlocks.Messaging.MassTransit;
using System.Reflection;

namespace Ordering.Application;

public static class DependencyInjection
{
    public static IServiceCollection AddApplicationServices
        (this IServiceCollection services, IConfiguration configuration)
    {
        // Other service registrations...

        // Register MassTransit with consumer assembly
        services.AddMessageBroker(
            configuration, 
            Assembly.GetExecutingAssembly());
        // Automatically finds and registers all IConsumer<T> implementations

        return services;
    }
}

Naming Conventions

MassTransit uses kebab-case naming by default:
C# TypeExchange NameQueue Name
BasketCheckoutEventbasket-checkout-event{service}-basket-checkout-event
OrderCreatedEventorder-created-event{service}-order-created-event
PaymentProcessedEventpayment-processed-event{service}-payment-processed-event

Queue Naming Pattern

{service-name}-{event-name}
Example:
  • Service: Ordering
  • Event: BasketCheckoutEvent
  • Queue: ordering-basket-checkout-event

Message Publishing

Using IPublishEndpoint

Inject IPublishEndpoint into your handler:
public class CheckoutBasketCommandHandler
    (IBasketRepository repository, IPublishEndpoint publishEndpoint)
    : ICommandHandler<CheckoutBasketCommand, CheckoutBasketResult>
{
    public async Task<CheckoutBasketResult> Handle(
        CheckoutBasketCommand command, 
        CancellationToken cancellationToken)
    {
        var eventMessage = command.BasketCheckoutDto.Adapt<BasketCheckoutEvent>();
        eventMessage.TotalPrice = basket.TotalPrice;

        // Publish the event
        await publishEndpoint.Publish(eventMessage, cancellationToken);

        return new CheckoutBasketResult(true);
    }
}

Publishing with Headers

await publishEndpoint.Publish<BasketCheckoutEvent>(eventMessage, context =>
{
    context.CorrelationId = Guid.NewGuid();
    context.TimeToLive = TimeSpan.FromMinutes(5);
    context.Headers.Set("X-Custom-Header", "value");
}, cancellationToken);

Message Consumption

Implementing IConsumer

using BuildingBlocks.Messaging.Events;
using MassTransit;

namespace Ordering.Application.Orders.EventHandlers.Integration;

public class BasketCheckoutEventHandler
    (ISender sender, ILogger<BasketCheckoutEventHandler> logger)
    : IConsumer<BasketCheckoutEvent>
{
    public async Task Consume(ConsumeContext<BasketCheckoutEvent> context)
    {
        logger.LogInformation(
            "Integration Event handled: {IntegrationEvent}", 
            context.Message.GetType().Name);

        // Access message properties
        var message = context.Message;
        var correlationId = context.CorrelationId;
        var messageId = context.MessageId;

        // Process the event
        var command = MapToCreateOrderCommand(message);
        await sender.Send(command);
    }
}

Accessing Context Information

public async Task Consume(ConsumeContext<BasketCheckoutEvent> context)
{
    // Message content
    var message = context.Message;
    
    // Metadata
    var messageId = context.MessageId;
    var correlationId = context.CorrelationId;
    var conversationId = context.ConversationId;
    var timestamp = context.SentTime;
    
    // Headers
    var customHeader = context.Headers.Get<string>("X-Custom-Header");
    
    // Response (for request/response pattern)
    await context.RespondAsync(new ResponseMessage());
}

Advanced Configuration

Retry Policy

config.UsingRabbitMq((context, configurator) =>
{
    configurator.Host(new Uri(configuration["MessageBroker:Host"]!), host =>
    {
        host.Username(configuration["MessageBroker:UserName"]);
        host.Password(configuration["MessageBroker:Password"]);
    });

    // Configure retry policy
    configurator.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));

    configurator.ConfigureEndpoints(context);
});

Circuit Breaker

configurator.UseCircuitBreaker(cb =>
{
    cb.TrackingPeriod = TimeSpan.FromMinutes(1);
    cb.TripThreshold = 15;
    cb.ActiveThreshold = 10;
    cb.ResetInterval = TimeSpan.FromMinutes(5);
});

Rate Limiting

configurator.UseRateLimit(1000, TimeSpan.FromSeconds(60));

Message Persistence

await publishEndpoint.Publish<BasketCheckoutEvent>(eventMessage, context =>
{
    context.Durable = true; // Message persists to disk
}, cancellationToken);

Consumer Configuration

Custom Queue Configuration

config.AddConsumer<BasketCheckoutEventHandler>()
    .Endpoint(e =>
    {
        e.Name = "custom-queue-name";
        e.PrefetchCount = 16;
        e.ConcurrentMessageLimit = 8;
    });

Consumer Definition

For more control, create a consumer definition:
public class BasketCheckoutEventHandlerDefinition 
    : ConsumerDefinition<BasketCheckoutEventHandler>
{
    public BasketCheckoutEventHandlerDefinition()
    {
        EndpointName = "custom-basket-checkout";
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<BasketCheckoutEventHandler> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(r => r.Interval(3, 1000));
        endpointConfigurator.UseInMemoryOutbox();
    }
}

Monitoring and Observability

Built-in Health Checks

builder.Services.AddHealthChecks()
    .AddRabbitMQ(configuration["MessageBroker:Host"]!);

OpenTelemetry Integration

builder.Services.AddOpenTelemetry()
    .WithTracing(tracing =>
    {
        tracing.AddSource("MassTransit");
        tracing.AddMassTransitInstrumentation();
    });

Logging

MassTransit automatically logs:
  • Message publishing
  • Message consumption
  • Retries and failures
  • Performance metrics
info: MassTransit[0]
      Configured endpoint basket-checkout-event, Consumer: BasketCheckoutEventHandler
info: MassTransit[0]
      Bus started: rabbitmq://localhost/

Testing

In-Memory Test Harness

using MassTransit.Testing;

[Fact]
public async Task Should_Publish_And_Consume_BasketCheckoutEvent()
{
    await using var provider = new ServiceCollection()
        .AddMassTransitTestHarness(cfg =>
        {
            cfg.AddConsumer<BasketCheckoutEventHandler>();
        })
        .BuildServiceProvider(true);

    var harness = provider.GetRequiredService<ITestHarness>();
    await harness.Start();

    // Publish event
    await harness.Bus.Publish(new BasketCheckoutEvent
    {
        UserName = "[email protected]",
        TotalPrice = 100
    });

    // Assert published
    Assert.True(await harness.Published.Any<BasketCheckoutEvent>());

    // Assert consumed
    Assert.True(await harness.Consumed.Any<BasketCheckoutEvent>());

    await harness.Stop();
}

Best Practices

Keep event definitions in a shared project (BuildingBlocks.Messaging) to ensure consistency across services.
Always design consumers to be idempotent. A message may be delivered multiple times.
public async Task Consume(ConsumeContext<BasketCheckoutEvent> context)
{
    // Check if already processed
    if (await _repository.ExistsAsync(context.Message.OrderId))
        return; // Already processed
        
    // Process...
}
Track events across services using correlation IDs for distributed tracing.
Configure error queues and monitor them. Implement logic to handle messages that repeatedly fail.
Plan for event versioning from the start:
public record BasketCheckoutEvent : IntegrationEvent
{
    public int Version { get; init; } = 1;
    // ...
}

Common Patterns

Saga Pattern

For complex workflows spanning multiple services:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderSubmitted);
        Event(() => PaymentProcessed);
        Event(() => OrderShipped);

        Initially(
            When(OrderSubmitted)
                .TransitionTo(Submitted));

        During(Submitted,
            When(PaymentProcessed)
                .TransitionTo(Paid));

        During(Paid,
            When(OrderShipped)
                .TransitionTo(Shipped));
    }
}

Request/Response Pattern

// Publisher
var response = await client.GetResponse<OrderCreatedResponse>(
    new CreateOrderRequest { /* ... */ });

// Consumer
public async Task Consume(ConsumeContext<CreateOrderRequest> context)
{
    // Process...
    await context.RespondAsync(new OrderCreatedResponse { /* ... */ });
}

Troubleshooting

If consumers aren’t being discovered, ensure:
  1. Consumer class is public
  2. Implements IConsumer<TMessage>
  3. Assembly is passed to AddMessageBroker()
Enable diagnostic logging to troubleshoot connection issues:
{
  "Logging": {
    "LogLevel": {
      "MassTransit": "Debug"
    }
  }
}

Performance Tuning

Prefetch Count

Controls how many messages are fetched from RabbitMQ at once:
endpoint.PrefetchCount = 16; // Default is 16

Concurrent Message Limit

Controls how many messages are processed concurrently:
endpoint.ConcurrentMessageLimit = 8; // Default is CPU count * 4

Example Configuration

config.AddConsumer<BasketCheckoutEventHandler>()
    .Endpoint(e =>
    {
        e.PrefetchCount = 32;              // Fetch 32 messages at a time
        e.ConcurrentMessageLimit = 16;     // Process 16 concurrently
        e.UseConcurrentMessageLimit = true;
    });

Next Steps

RabbitMQ Deep Dive

Explore RabbitMQ event flow and patterns

gRPC Communication

Learn about synchronous service calls

Build docs developers (and LLMs) love