Skip to main content

Overview

MassTransit is a free, open-source distributed application framework for .NET that provides a comprehensive set of tools and abstractions for building message-based communication systems. It simplifies the implementation of event-driven architectures, enabling components to communicate seamlessly through messages. This module integrates MassTransit with Intent Architect’s Eventing designer, allowing you to model integration events and commands visually and generate production-ready message-based systems.

Installation

Intent.Eventing.MassTransit

EntityFramework Outbox

Transactional outbox with EF Core

Request/Response

Synchronous request-response patterns

Scheduling

Delayed and scheduled messages

What’s Included

The module provides:
  • Integration Event Modeling - Visual designer for events and commands
  • Message Broker Implementation - Pre-configured MassTransit setup
  • Message Publishing - IEventBus abstraction
  • Message Consumption - Auto-generated consumers
  • Multi-tenancy Support - Finbuckle integration
  • Configuration - App settings and DI wiring
  • Telemetry - OpenTelemetry integration

Message Broker Support

MassTransit supports multiple message brokers:
Production-ready message brokerConfiguration:
{
  "RabbitMq": {
    "Host": "localhost",
    "VirtualHost": "/",
    "Username": "guest",
    "Password": "guest"
  }
}

Modeling Events and Commands

In the Eventing Designer

  1. Open the Services Designer
  2. Right-clickNew Package → “Integration Events”
  3. Add Integration Event or Integration Command
  4. Define properties and event handlers

Integration Events (Pub/Sub)

Events represent something that happened:
// Modeled in Intent Architect
public class OrderCreatedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedDate { get; set; }
}

Integration Commands (Point-to-Point)

Commands represent an instruction:
// Modeled in Intent Architect
public class ProcessPaymentCommand
{
    public Guid OrderId { get; set; }
    public Guid PaymentMethodId { get; set; }
    public decimal Amount { get; set; }
}
Important: MassTransit requires that publishers and subscribers have messages with identical names and namespaces.

Publishing Messages

Through IEventBus

public class OrderService
{
    private readonly IEventBus _eventBus;
    private readonly IOrderRepository _repository;

    public async Task CreateOrderAsync(
        CreateOrderCommand command,
        CancellationToken cancellationToken)
    {
        // Create order
        var order = new Order
        {
            Id = Guid.NewGuid(),
            CustomerId = command.CustomerId,
            TotalAmount = command.Items.Sum(i => i.Price * i.Quantity)
        };
        
        await _repository.AddAsync(order, cancellationToken);
        await _repository.SaveChangesAsync(cancellationToken);
        
        // Publish event
        await _eventBus.PublishAsync(
            new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                CreatedDate = DateTime.UtcNow
            },
            cancellationToken);
    }
}

Sending Commands

public class CheckoutService
{
    private readonly IEventBus _eventBus;

    public async Task ProcessCheckoutAsync(
        Guid orderId,
        Guid paymentMethodId,
        CancellationToken cancellationToken)
    {
        // Send command to payment service
        await _eventBus.SendAsync(
            new ProcessPaymentCommand
            {
                OrderId = orderId,
                PaymentMethodId = paymentMethodId,
                Amount = await GetOrderTotalAsync(orderId)
            },
            cancellationToken);
    }
}

Consuming Messages

Event Handlers

Generated event handler interfaces:
[IntentManaged(Mode.Merge, Signature = Mode.Fully)]
public class OrderCreatedEventHandler 
    : IIntegrationEventHandler<OrderCreatedEvent>
{
    private readonly INotificationService _notificationService;
    private readonly ILogger<OrderCreatedEventHandler> _logger;

    public OrderCreatedEventHandler(
        INotificationService notificationService,
        ILogger<OrderCreatedEventHandler> logger)
    {
        _notificationService = notificationService;
        _logger = logger;
    }

    [IntentManaged(Mode.Fully, Body = Mode.Ignore)]
    public async Task HandleAsync(
        OrderCreatedEvent message,
        CancellationToken cancellationToken = default)
    {
        _logger.LogInformation(
            "Order {OrderId} created for customer {CustomerId}",
            message.OrderId,
            message.CustomerId);

        // Send confirmation email
        await _notificationService.SendOrderConfirmationAsync(
            message.CustomerId,
            message.OrderId,
            cancellationToken);
    }
}

Generated Infrastructure Consumer

// Auto-generated by Intent Architect
public static class MassTransitConfiguration
{
    public static void AddMassTransitConfiguration(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddScoped<MassTransitEventBus>();
        services.AddScoped<IEventBus>(provider => 
            provider.GetRequiredService<MassTransitEventBus>());

        services.AddMassTransit(x =>
        {
            x.SetKebabCaseEndpointNameFormatter();
            x.AddConsumers();

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(configuration["RabbitMq:Host"], h =>
                {
                    h.Username(configuration["RabbitMq:Username"]);
                    h.Password(configuration["RabbitMq:Password"]);
                });
                
                cfg.UseMessageRetry(r => r.Interval(
                    configuration.GetValue<int>("MassTransit:RetryInterval:RetryLimit"),
                    TimeSpan.Parse(configuration["MassTransit:RetryInterval:Interval"])));
                
                cfg.ConfigureEndpoints(context);
            });
        });
    }

    private static void AddConsumers(this IRegistrationConfigurator cfg)
    {
        cfg.AddConsumer<WrapperConsumer<
            IIntegrationEventHandler<OrderCreatedEvent>, 
            OrderCreatedEvent>>(
            typeof(WrapperConsumerDefinition<
                IIntegrationEventHandler<OrderCreatedEvent>, 
                OrderCreatedEvent>))
            .Endpoint(config => config.InstanceId = "MyApplication");
    }
}

Module Settings

Messaging Service Provider

Configure your message broker:
  • In Memory - Testing only, not durable
  • RabbitMQ - Production-ready, feature-rich
  • Azure Service Bus - Fully managed Azure service
  • Amazon SQS - Fully managed AWS service

Outbox Pattern

Ensure reliable message delivery:
  • None - Direct publish (risk of message loss)
  • In Memory - Non-durable outbox
  • Entity Framework - Transactional outbox with database
Using an Outbox pattern also introduces idempotency to ensure messages aren’t processed more than once.

Retry Policy

Configure retry behavior:
  • None - No retries (not recommended)
  • Immediate - Consecutive retries without delay
  • Interval - Constant interval between retries
  • Exponential - Exponentially increasing intervals
  • Incremental - Linearly increasing intervals
Configuration:
{
  "MassTransit": {
    "RetryInterval": {
      "RetryLimit": 3,
      "Interval": "00:00:05"
    }
  }
}

Configuration

Complete Example

appsettings.json:
{
  "MassTransit": {
    "RetryInterval": {
      "RetryLimit": 3,
      "Interval": "00:00:05"
    },
    "RetryExponential": {
      "RetryLimit": 5,
      "MinInterval": "00:00:01",
      "MaxInterval": "00:01:00"
    }
  },
  "RabbitMq": {
    "Host": "rabbitmq.production.com",
    "VirtualHost": "/production",
    "Username": "app-user",
    "Password": "secure-password"
  }
}

Command Queue Naming

By default, commands use the command name as the queue name. Customize using the Command Distribution stereotype:

Sending Side

Click the association line between element and Integration Command:
Queue Name: "payment-processing"

Receiving Side

Click the “handles” method on the Integration Event Handler:
Queue Name: "payment-processing"
Ensure both sender and receiver use the same queue name for proper routing.

Multi-Tenancy Support

If you have Intent.Modules.AspNetCore.MultiTenancy installed:

Automatic Tenant Propagation

// Publishing - automatically adds tenant header
await _eventBus.PublishAsync(new OrderCreatedEvent
{
    OrderId = orderId,
    CustomerId = customerId
}, cancellationToken);

// Header added: "Tenant-Identifier: tenant-abc"

Tenant Context in Consumers

public class OrderCreatedEventHandler 
    : IIntegrationEventHandler<OrderCreatedEvent>
{
    private readonly ITenantAccessor _tenantAccessor;

    public async Task HandleAsync(
        OrderCreatedEvent message,
        CancellationToken cancellationToken)
    {
        // Tenant context automatically set from message header
        var tenantId = _tenantAccessor.CurrentTenant.Id;
        
        // Process message in correct tenant context
        await ProcessOrderAsync(message, tenantId, cancellationToken);
    }
}

Custom Tenant Header Name

appsettings.json:
{
  "MassTransit": {
    "TenantHeader": "X-Tenant-ID"
  }
}

Multiple Message Bus Providers

Use multiple message bus implementations in the same application.

Designating Providers

When multiple providers are installed:
  1. Right-click a Package/Folder in Services designer
  2. Add StereotypeMessage Bus
  3. Select MassTransit from the Providers list

Composite Message Bus

Intent Architect generates a composite bus that routes messages:
public class CompositeEventBus : IEventBus
{
    private readonly MassTransitEventBus _massTransit;
    private readonly AzureServiceBusEventBus _azureServiceBus;

    public async Task PublishAsync<T>(
        T message,
        CancellationToken cancellationToken) where T : class
    {
        // Route based on message type configuration
        if (IsMassTransitMessage<T>())
        {
            await _massTransit.PublishAsync(message, cancellationToken);
        }
        else
        {
            await _azureServiceBus.PublishAsync(message, cancellationToken);
        }
    }
}

OpenTelemetry Integration

If Intent.OpenTelemetry is installed, MassTransit telemetry is automatically configured:
services.AddOpenTelemetry()
    .WithTracing(builder => builder
        .AddSource("MassTransit")
        .AddAspNetCoreInstrumentation()
        .AddHttpClientInstrumentation());
Tracked metrics:
  • Message publish duration
  • Message consumption duration
  • Retry attempts
  • Failed deliveries
  • Queue depths

Advanced Patterns

Sagas (Orchestration)

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(OrderCreated)
                .TransitionTo(AwaitingPayment)
                .Publish(context => new ProcessPaymentCommand
                {
                    OrderId = context.Instance.OrderId
                }));

        During(AwaitingPayment,
            When(PaymentProcessed)
                .TransitionTo(Completed));
    }

    public State AwaitingPayment { get; private set; }
    public State Completed { get; private set; }
    
    public Event<OrderCreatedEvent> OrderCreated { get; private set; }
    public Event<PaymentProcessedEvent> PaymentProcessed { get; private set; }
}

Message Headers

public async Task PublishWithHeadersAsync()
{
    await _eventBus.PublishAsync(
        new OrderCreatedEvent { OrderId = orderId },
        metadata: new Dictionary<string, object>
        {
            ["CorrelationId"] = Guid.NewGuid(),
            ["Source"] = "WebAPI",
            ["Priority"] = "High"
        });
}

Consumer Concurrency

public class OrderCreatedEventHandlerDefinition 
    : ConsumerDefinition<OrderCreatedEventHandler>
{
    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<OrderCreatedEventHandler> consumerConfigurator)
    {
        endpointConfigurator.PrefetchCount = 20;
        endpointConfigurator.ConcurrentMessageLimit = 10;
    }
}

Local Development

RabbitMQ with Docker

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:3.11-management
Access Management UI: http://localhost:15672
Default credentials: guest/guest

Configuration

appsettings.Development.json:
{
  "RabbitMq": {
    "Host": "localhost",
    "VirtualHost": "/",
    "Username": "guest",
    "Password": "guest"
  }
}

Best Practices

  • Keep messages small and focused
  • Make messages immutable
  • Include only necessary data
  • Version your messages
  • Use meaningful names
// Good
public class OrderCreatedEvent
{
    public Guid OrderId { get; init; }
    public DateTime CreatedAt { get; init; }
}

// Avoid
public class OrderEvent  // Too generic
{
    public Order Order { get; set; }  // Too much data
}
Always design handlers to be idempotent:
public async Task HandleAsync(
    OrderCreatedEvent message,
    CancellationToken cancellationToken)
{
    var existing = await _cache.GetAsync(
        $"order:{message.OrderId}", 
        cancellationToken);
    
    if (existing != null)
    {
        _logger.LogInformation("Order already processed");
        return; // Safe to skip
    }
    
    await ProcessOrderAsync(message, cancellationToken);
    
    await _cache.SetAsync(
        $"order:{message.OrderId}",
        true,
        TimeSpan.FromDays(7),
        cancellationToken);
}
  • Use retry policies for transient errors
  • Implement circuit breakers for external dependencies
  • Move poison messages to error queues
  • Log all failures with context
public async Task HandleAsync(
    OrderCreatedEvent message,
    CancellationToken cancellationToken)
{
    try
    {
        await ProcessOrderAsync(message, cancellationToken);
    }
    catch (TransientException ex)
    {
        _logger.LogWarning(ex, "Transient error, will retry");
        throw; // Let MassTransit retry
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Fatal error processing order {OrderId}",
            message.OrderId);
        
        await _deadLetterService.SendAsync(message, ex);
        // Don't throw - message handled
    }
}
  • Use outbox pattern for consistency
  • Configure appropriate concurrency limits
  • Monitor queue depths
  • Use message batching when appropriate
  • Implement proper connection pooling

Monitoring

Health Checks

services.AddHealthChecks()
    .AddRabbitMQ(
        configuration["RabbitMq:ConnectionString"],
        name: "rabbitmq");

Application Insights

_telemetry.TrackEvent("MessagePublished", new Dictionary<string, string>
{
    { "MessageType", typeof(OrderCreatedEvent).Name },
    { "OrderId", message.OrderId.ToString() },
    { "Timestamp", DateTime.UtcNow.ToString("O") }
});

Troubleshooting

Common Issues

Messages not being consumed:
  • Check queue bindings in RabbitMQ management UI
  • Verify message type names match exactly
  • Ensure consumer is registered
  • Check for exceptions in consumer
Duplicate message processing:
  • Implement idempotency in handlers
  • Use outbox pattern
  • Check for multiple consumer instances
Performance issues:
  • Adjust concurrency limits
  • Monitor queue depths
  • Check for slow consumers
  • Review retry policies

Resources

MassTransit Docs

Official documentation

Message Patterns

Common messaging patterns

Eventing Contracts

Integration events module

Troubleshooting

Common issues and solutions

Build docs developers (and LLMs) love