Overview
MassTransit is a free, open-source distributed application framework for .NET that provides a comprehensive set of tools and abstractions for building message-based communication systems. It simplifies the implementation of event-driven architectures, enabling components to communicate seamlessly through messages.
This module integrates MassTransit with Intent Architect’s Eventing designer, allowing you to model integration events and commands visually and generate production-ready message-based systems.
Installation
Intent.Eventing.MassTransit
EntityFramework Outbox Transactional outbox with EF Core
Request/Response Synchronous request-response patterns
Scheduling Delayed and scheduled messages
What’s Included
The module provides:
Integration Event Modeling - Visual designer for events and commands
Message Broker Implementation - Pre-configured MassTransit setup
Message Publishing - IEventBus abstraction
Message Consumption - Auto-generated consumers
Multi-tenancy Support - Finbuckle integration
Configuration - App settings and DI wiring
Telemetry - OpenTelemetry integration
Message Broker Support
MassTransit supports multiple message brokers:
RabbitMQ
Azure Service Bus
Amazon SQS
In-Memory
Production-ready message broker Configuration: {
"RabbitMq" : {
"Host" : "localhost" ,
"VirtualHost" : "/" ,
"Username" : "guest" ,
"Password" : "guest"
}
}
Fully managed enterprise messaging Configuration: {
"AzureServiceBus" : {
"ConnectionString" : "Endpoint=sb://..."
}
}
Fully managed message queuing Configuration: {
"AmazonSqs" : {
"Region" : "us-east-1" ,
"AccessKey" : "..." ,
"SecretKey" : "..."
}
}
Testing and development Configuration: {
"MassTransit" : {
"Transport" : "InMemory"
}
}
In-memory transport is not durable. Messages are lost when the application stops. Use only for testing.
Modeling Events and Commands
In the Eventing Designer
Open the Services Designer
Right-click → New Package → “Integration Events”
Add Integration Event or Integration Command
Define properties and event handlers
Integration Events (Pub/Sub)
Events represent something that happened:
// Modeled in Intent Architect
public class OrderCreatedEvent
{
public Guid OrderId { get ; set ; }
public Guid CustomerId { get ; set ; }
public decimal TotalAmount { get ; set ; }
public DateTime CreatedDate { get ; set ; }
}
Integration Commands (Point-to-Point)
Commands represent an instruction:
// Modeled in Intent Architect
public class ProcessPaymentCommand
{
public Guid OrderId { get ; set ; }
public Guid PaymentMethodId { get ; set ; }
public decimal Amount { get ; set ; }
}
Important : MassTransit requires that publishers and subscribers have messages with identical names and namespaces .
Publishing Messages
Through IEventBus
public class OrderService
{
private readonly IEventBus _eventBus ;
private readonly IOrderRepository _repository ;
public async Task CreateOrderAsync (
CreateOrderCommand command ,
CancellationToken cancellationToken )
{
// Create order
var order = new Order
{
Id = Guid . NewGuid (),
CustomerId = command . CustomerId ,
TotalAmount = command . Items . Sum ( i => i . Price * i . Quantity )
};
await _repository . AddAsync ( order , cancellationToken );
await _repository . SaveChangesAsync ( cancellationToken );
// Publish event
await _eventBus . PublishAsync (
new OrderCreatedEvent
{
OrderId = order . Id ,
CustomerId = order . CustomerId ,
TotalAmount = order . TotalAmount ,
CreatedDate = DateTime . UtcNow
},
cancellationToken );
}
}
Sending Commands
public class CheckoutService
{
private readonly IEventBus _eventBus ;
public async Task ProcessCheckoutAsync (
Guid orderId ,
Guid paymentMethodId ,
CancellationToken cancellationToken )
{
// Send command to payment service
await _eventBus . SendAsync (
new ProcessPaymentCommand
{
OrderId = orderId ,
PaymentMethodId = paymentMethodId ,
Amount = await GetOrderTotalAsync ( orderId )
},
cancellationToken );
}
}
Consuming Messages
Event Handlers
Generated event handler interfaces:
[ IntentManaged ( Mode . Merge , Signature = Mode . Fully )]
public class OrderCreatedEventHandler
: IIntegrationEventHandler < OrderCreatedEvent >
{
private readonly INotificationService _notificationService ;
private readonly ILogger < OrderCreatedEventHandler > _logger ;
public OrderCreatedEventHandler (
INotificationService notificationService ,
ILogger < OrderCreatedEventHandler > logger )
{
_notificationService = notificationService ;
_logger = logger ;
}
[ IntentManaged ( Mode . Fully , Body = Mode . Ignore )]
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken = default )
{
_logger . LogInformation (
"Order {OrderId} created for customer {CustomerId}" ,
message . OrderId ,
message . CustomerId );
// Send confirmation email
await _notificationService . SendOrderConfirmationAsync (
message . CustomerId ,
message . OrderId ,
cancellationToken );
}
}
Generated Infrastructure Consumer
// Auto-generated by Intent Architect
public static class MassTransitConfiguration
{
public static void AddMassTransitConfiguration (
this IServiceCollection services ,
IConfiguration configuration )
{
services . AddScoped < MassTransitEventBus >();
services . AddScoped < IEventBus >( provider =>
provider . GetRequiredService < MassTransitEventBus >());
services . AddMassTransit ( x =>
{
x . SetKebabCaseEndpointNameFormatter ();
x . AddConsumers ();
x . UsingRabbitMq (( context , cfg ) =>
{
cfg . Host ( configuration [ "RabbitMq:Host" ], h =>
{
h . Username ( configuration [ "RabbitMq:Username" ]);
h . Password ( configuration [ "RabbitMq:Password" ]);
});
cfg . UseMessageRetry ( r => r . Interval (
configuration . GetValue < int >( "MassTransit:RetryInterval:RetryLimit" ),
TimeSpan . Parse ( configuration [ "MassTransit:RetryInterval:Interval" ])));
cfg . ConfigureEndpoints ( context );
});
});
}
private static void AddConsumers ( this IRegistrationConfigurator cfg )
{
cfg . AddConsumer < WrapperConsumer <
IIntegrationEventHandler < OrderCreatedEvent > ,
OrderCreatedEvent >> (
typeof ( WrapperConsumerDefinition <
IIntegrationEventHandler < OrderCreatedEvent >,
OrderCreatedEvent >))
. Endpoint ( config => config . InstanceId = "MyApplication" );
}
}
Module Settings
Messaging Service Provider
Configure your message broker:
In Memory - Testing only, not durable
RabbitMQ - Production-ready, feature-rich
Azure Service Bus - Fully managed Azure service
Amazon SQS - Fully managed AWS service
Outbox Pattern
Ensure reliable message delivery:
None - Direct publish (risk of message loss)
In Memory - Non-durable outbox
Entity Framework - Transactional outbox with database
Using an Outbox pattern also introduces idempotency to ensure messages aren’t processed more than once.
Retry Policy
Configure retry behavior:
None - No retries (not recommended)
Immediate - Consecutive retries without delay
Interval - Constant interval between retries
Exponential - Exponentially increasing intervals
Incremental - Linearly increasing intervals
Configuration:
{
"MassTransit" : {
"RetryInterval" : {
"RetryLimit" : 3 ,
"Interval" : "00:00:05"
}
}
}
Configuration
Complete Example
appsettings.json:
{
"MassTransit" : {
"RetryInterval" : {
"RetryLimit" : 3 ,
"Interval" : "00:00:05"
},
"RetryExponential" : {
"RetryLimit" : 5 ,
"MinInterval" : "00:00:01" ,
"MaxInterval" : "00:01:00"
}
},
"RabbitMq" : {
"Host" : "rabbitmq.production.com" ,
"VirtualHost" : "/production" ,
"Username" : "app-user" ,
"Password" : "secure-password"
}
}
Command Queue Naming
By default, commands use the command name as the queue name. Customize using the Command Distribution stereotype:
Sending Side
Click the association line between element and Integration Command:
Queue Name: "payment-processing"
Receiving Side
Click the “handles” method on the Integration Event Handler:
Queue Name: "payment-processing"
Ensure both sender and receiver use the same queue name for proper routing.
Multi-Tenancy Support
If you have Intent.Modules.AspNetCore.MultiTenancy installed:
Automatic Tenant Propagation
// Publishing - automatically adds tenant header
await _eventBus . PublishAsync ( new OrderCreatedEvent
{
OrderId = orderId ,
CustomerId = customerId
}, cancellationToken );
// Header added: "Tenant-Identifier: tenant-abc"
Tenant Context in Consumers
public class OrderCreatedEventHandler
: IIntegrationEventHandler < OrderCreatedEvent >
{
private readonly ITenantAccessor _tenantAccessor ;
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
// Tenant context automatically set from message header
var tenantId = _tenantAccessor . CurrentTenant . Id ;
// Process message in correct tenant context
await ProcessOrderAsync ( message , tenantId , cancellationToken );
}
}
appsettings.json:
{
"MassTransit" : {
"TenantHeader" : "X-Tenant-ID"
}
}
Multiple Message Bus Providers
Use multiple message bus implementations in the same application.
Designating Providers
When multiple providers are installed:
Right-click a Package/Folder in Services designer
Add Stereotype → Message Bus
Select MassTransit from the Providers list
Composite Message Bus
Intent Architect generates a composite bus that routes messages:
public class CompositeEventBus : IEventBus
{
private readonly MassTransitEventBus _massTransit ;
private readonly AzureServiceBusEventBus _azureServiceBus ;
public async Task PublishAsync < T >(
T message ,
CancellationToken cancellationToken ) where T : class
{
// Route based on message type configuration
if ( IsMassTransitMessage < T >())
{
await _massTransit . PublishAsync ( message , cancellationToken );
}
else
{
await _azureServiceBus . PublishAsync ( message , cancellationToken );
}
}
}
OpenTelemetry Integration
If Intent.OpenTelemetry is installed, MassTransit telemetry is automatically configured:
services . AddOpenTelemetry ()
. WithTracing ( builder => builder
. AddSource ( "MassTransit" )
. AddAspNetCoreInstrumentation ()
. AddHttpClientInstrumentation ());
Tracked metrics:
Message publish duration
Message consumption duration
Retry attempts
Failed deliveries
Queue depths
Advanced Patterns
Sagas (Orchestration)
public class OrderStateMachine : MassTransitStateMachine < OrderState >
{
public OrderStateMachine ()
{
Initially (
When ( OrderCreated )
. TransitionTo ( AwaitingPayment )
. Publish ( context => new ProcessPaymentCommand
{
OrderId = context . Instance . OrderId
}));
During ( AwaitingPayment ,
When ( PaymentProcessed )
. TransitionTo ( Completed ));
}
public State AwaitingPayment { get ; private set ; }
public State Completed { get ; private set ; }
public Event < OrderCreatedEvent > OrderCreated { get ; private set ; }
public Event < PaymentProcessedEvent > PaymentProcessed { get ; private set ; }
}
public async Task PublishWithHeadersAsync ()
{
await _eventBus . PublishAsync (
new OrderCreatedEvent { OrderId = orderId },
metadata : new Dictionary < string , object >
{
[ "CorrelationId" ] = Guid . NewGuid (),
[ "Source" ] = "WebAPI" ,
[ "Priority" ] = "High"
});
}
Consumer Concurrency
public class OrderCreatedEventHandlerDefinition
: ConsumerDefinition < OrderCreatedEventHandler >
{
protected override void ConfigureConsumer (
IReceiveEndpointConfigurator endpointConfigurator ,
IConsumerConfigurator < OrderCreatedEventHandler > consumerConfigurator )
{
endpointConfigurator . PrefetchCount = 20 ;
endpointConfigurator . ConcurrentMessageLimit = 10 ;
}
}
Local Development
RabbitMQ with Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:3.11-management
Access Management UI: http://localhost:15672
Default credentials: guest/guest
Configuration
appsettings.Development.json:
{
"RabbitMq" : {
"Host" : "localhost" ,
"VirtualHost" : "/" ,
"Username" : "guest" ,
"Password" : "guest"
}
}
Best Practices
Keep messages small and focused
Make messages immutable
Include only necessary data
Version your messages
Use meaningful names
// Good
public class OrderCreatedEvent
{
public Guid OrderId { get ; init ; }
public DateTime CreatedAt { get ; init ; }
}
// Avoid
public class OrderEvent // Too generic
{
public Order Order { get ; set ; } // Too much data
}
Always design handlers to be idempotent: public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
var existing = await _cache . GetAsync (
$"order: { message . OrderId } " ,
cancellationToken );
if ( existing != null )
{
_logger . LogInformation ( "Order already processed" );
return ; // Safe to skip
}
await ProcessOrderAsync ( message , cancellationToken );
await _cache . SetAsync (
$"order: { message . OrderId } " ,
true ,
TimeSpan . FromDays ( 7 ),
cancellationToken );
}
Use retry policies for transient errors
Implement circuit breakers for external dependencies
Move poison messages to error queues
Log all failures with context
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
try
{
await ProcessOrderAsync ( message , cancellationToken );
}
catch ( TransientException ex )
{
_logger . LogWarning ( ex , "Transient error, will retry" );
throw ; // Let MassTransit retry
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Fatal error processing order {OrderId}" ,
message . OrderId );
await _deadLetterService . SendAsync ( message , ex );
// Don't throw - message handled
}
}
Monitoring
Health Checks
services . AddHealthChecks ()
. AddRabbitMQ (
configuration [ "RabbitMq:ConnectionString" ],
name : "rabbitmq" );
Application Insights
_telemetry . TrackEvent ( "MessagePublished" , new Dictionary < string , string >
{
{ "MessageType" , typeof ( OrderCreatedEvent ). Name },
{ "OrderId" , message . OrderId . ToString () },
{ "Timestamp" , DateTime . UtcNow . ToString ( "O" ) }
});
Troubleshooting
Common Issues
Messages not being consumed:
Check queue bindings in RabbitMQ management UI
Verify message type names match exactly
Ensure consumer is registered
Check for exceptions in consumer
Duplicate message processing:
Implement idempotency in handlers
Use outbox pattern
Check for multiple consumer instances
Performance issues:
Adjust concurrency limits
Monitor queue depths
Check for slow consumers
Review retry policies
Resources
MassTransit Docs Official documentation
Message Patterns Common messaging patterns
Eventing Contracts Integration events module
Troubleshooting Common issues and solutions