Overview
The Azure Service Bus eventing module provides patterns for using Azure Service Bus, a fully managed enterprise message broker, with events modeled in Intent Architect’s designers. Service Bus enables reliable cloud messaging between applications and services, even when they’re offline.
Installation
Intent.Eventing.AzureServiceBus
Azure Functions Integration Process Service Bus messages with Azure Functions
Eventing Contracts Base eventing infrastructure
What is Azure Service Bus?
Azure Service Bus is a fully managed enterprise message broker that:
Provides reliable message delivery
Supports advanced messaging patterns
Scales automatically
Offers built-in disaster recovery
Integrates seamlessly with Azure services
Messaging Patterns
Queues - Point-to-point communication
Topics - Publish-subscribe communication
Subscriptions - Filter messages from topics
What’s Generated
This module generates:
Message Bus Implementation - IEventBus for Azure Service Bus
Configuration - Connection and dependency injection
Message Dispatcher - Routes messages to handlers
Hosted Service - Background message processing
Publisher Options - Topic and queue configuration
Subscription Options - Message filtering and routing
Configuration
Connection String
appsettings.json:
{
"AzureServiceBus" : {
"ConnectionString" : "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yourkey"
}
}
Managed Identity (Recommended)
appsettings.json:
{
"AzureServiceBus" : {
"FullyQualifiedNamespace" : "your-namespace.servicebus.windows.net"
}
}
Code:
services . AddSingleton ( sp =>
{
var config = sp . GetRequiredService < IConfiguration >();
var fqn = config [ "AzureServiceBus:FullyQualifiedNamespace" ];
return new ServiceBusClient ( fqn , new DefaultAzureCredential ());
});
Modeling Events
Integration Events
Model events in the Eventing designer:
public class OrderCreatedEvent
{
public Guid OrderId { get ; set ; }
public Guid CustomerId { get ; set ; }
public decimal TotalAmount { get ; set ; }
public DateTime CreatedAt { get ; set ; }
}
Topic Configuration
Use stereotypes to configure topics:
Topic Name: order-events
Subscription Name: notification-service
Publishing Messages
Using IEventBus
public class OrderService
{
private readonly IEventBus _eventBus ;
private readonly IOrderRepository _repository ;
public async Task CreateOrderAsync (
CreateOrderCommand command ,
CancellationToken cancellationToken )
{
// Create order
var order = await _repository . CreateAsync ( command , cancellationToken );
// Publish to Service Bus
await _eventBus . PublishAsync (
new OrderCreatedEvent
{
OrderId = order . Id ,
CustomerId = order . CustomerId ,
TotalAmount = order . TotalAmount ,
CreatedAt = DateTime . UtcNow
},
cancellationToken );
}
}
Queue vs Topic
Queue (Point-to-Point):
// Send command to specific queue
await _eventBus . SendAsync (
new ProcessPaymentCommand { OrderId = orderId },
cancellationToken );
Topic (Publish-Subscribe):
// Publish event to topic (multiple subscribers)
await _eventBus . PublishAsync (
new OrderCreatedEvent { OrderId = orderId },
cancellationToken );
Message Properties
public async Task PublishWithPropertiesAsync ()
{
var message = new ServiceBusMessage (
JsonSerializer . SerializeToUtf8Bytes ( orderEvent ))
{
MessageId = Guid . NewGuid (). ToString (),
CorrelationId = correlationId ,
SessionId = customerId . ToString (),
ContentType = "application/json" ,
Subject = "OrderCreated"
};
message . ApplicationProperties . Add ( "Priority" , "High" );
message . ApplicationProperties . Add ( "Source" , "WebAPI" );
await sender . SendMessageAsync ( message );
}
Consuming Messages
Generated Message Dispatcher
public class AzureServiceBusMessageDispatcher
{
private readonly IServiceProvider _serviceProvider ;
public async Task DispatchAsync < T >(
T message ,
CancellationToken cancellationToken ) where T : class
{
using var scope = _serviceProvider . CreateScope ();
var handler = scope . ServiceProvider
. GetRequiredService < IIntegrationEventHandler < T >>();
await handler . HandleAsync ( message , cancellationToken );
}
}
Event Handler
[ IntentManaged ( Mode . Merge , Signature = Mode . Fully )]
public class OrderCreatedEventHandler
: IIntegrationEventHandler < OrderCreatedEvent >
{
private readonly IEmailService _emailService ;
private readonly ILogger < OrderCreatedEventHandler > _logger ;
[ IntentManaged ( Mode . Fully , Body = Mode . Ignore )]
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
_logger . LogInformation (
"Processing order created event for order {OrderId}" ,
message . OrderId );
await _emailService . SendOrderConfirmationAsync (
message . CustomerId ,
message . OrderId ,
cancellationToken );
}
}
Hosted Service
public class AzureServiceBusHostedService : BackgroundService
{
private readonly ServiceBusProcessor _processor ;
private readonly IAzureServiceBusMessageDispatcher _dispatcher ;
protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
{
_processor . ProcessMessageAsync += async args =>
{
var message = JsonSerializer . Deserialize < OrderCreatedEvent >(
args . Message . Body );
await _dispatcher . DispatchAsync ( message , args . CancellationToken );
await args . CompleteMessageAsync ( args . Message );
};
_processor . ProcessErrorAsync += args =>
{
_logger . LogError ( args . Exception , "Error processing message" );
return Task . CompletedTask ;
};
await _processor . StartProcessingAsync ( stoppingToken );
}
}
Advanced Features
Message Sessions
Guarantee ordered processing:
// Enable sessions on queue/subscription
var createQueueOptions = new CreateQueueOptions ( "orders" )
{
RequiresSession = true
};
// Send with session ID
var message = new ServiceBusMessage ( data )
{
SessionId = customerId . ToString () // All customer messages in same session
};
// Process with session
var processor = client . CreateSessionProcessor ( "orders" , new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 5
});
Message Deferral
Defer messages for later processing:
public async Task ProcessMessageAsync ( ProcessMessageEventArgs args )
{
if ( ! CanProcessNow ())
{
// Defer for later
var sequenceNumber = args . Message . SequenceNumber ;
await args . DeferMessageAsync ( args . Message );
// Store sequence number for later retrieval
await _cache . SetAsync ( $"deferred: { args . Message . MessageId } " , sequenceNumber );
return ;
}
await ProcessAsync ( args . Message );
await args . CompleteMessageAsync ( args . Message );
}
// Retrieve deferred message later
public async Task ProcessDeferredMessageAsync ( string messageId )
{
var sequenceNumber = await _cache . GetAsync < long >( $"deferred: { messageId } " );
var message = await receiver . ReceiveDeferredMessageAsync ( sequenceNumber );
await ProcessAsync ( message );
}
Scheduled Messages
Schedule future delivery:
public async Task ScheduleReminderAsync (
Guid orderId ,
TimeSpan delay )
{
var message = new ServiceBusMessage (
JsonSerializer . SerializeToUtf8Bytes (
new OrderReminderEvent { OrderId = orderId }))
{
MessageId = Guid . NewGuid (). ToString ()
};
var scheduleTime = DateTimeOffset . UtcNow . Add ( delay );
await sender . ScheduleMessageAsync ( message , scheduleTime );
}
Dead Letter Queue
Handle poison messages:
public async Task ProcessMessageAsync ( ProcessMessageEventArgs args )
{
try
{
await ProcessAsync ( args . Message );
await args . CompleteMessageAsync ( args . Message );
}
catch ( ValidationException ex )
{
// Move to dead letter queue
await args . DeadLetterMessageAsync (
args . Message ,
"ValidationError" ,
ex . Message );
}
catch ( TransientException ex )
{
// Abandon for retry
await args . AbandonMessageAsync ( args . Message );
}
}
// Process dead letter queue
var dlqReceiver = client . CreateReceiver (
"orders" ,
new ServiceBusReceiverOptions
{
SubQueue = SubQueue . DeadLetter
});
Message Filters
Filter subscription messages:
// SQL Filter
var ruleOptions = new CreateRuleOptions
{
Name = "HighPriorityOrders" ,
Filter = new SqlRuleFilter (
"TotalAmount > 1000 OR Priority = 'High'" )
};
await adminClient . CreateRuleAsync (
"order-events" ,
"high-priority-subscription" ,
ruleOptions );
// Correlation Filter
var correlationFilter = new CorrelationRuleFilter
{
Subject = "OrderCreated" ,
CorrelationId = correlationId
};
var ruleOptions = new CreateRuleOptions
{
Name = "OrderCreatedFilter" ,
Filter = correlationFilter
};
Error Handling
Retry Policy
var processorOptions = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 5 ,
AutoCompleteMessages = false ,
MaxAutoLockRenewalDuration = TimeSpan . FromMinutes ( 5 )
};
var processor = client . CreateProcessor ( "orders" , processorOptions );
Exception Handling
public async Task ProcessMessageAsync ( ProcessMessageEventArgs args )
{
var retryCount = 0 ;
var maxRetries = 3 ;
while ( retryCount < maxRetries )
{
try
{
await ProcessAsync ( args . Message );
await args . CompleteMessageAsync ( args . Message );
return ;
}
catch ( TransientException ex )
{
retryCount ++ ;
if ( retryCount >= maxRetries )
{
_logger . LogError ( ex , "Max retries exceeded" );
await args . DeadLetterMessageAsync (
args . Message ,
"MaxRetriesExceeded" ,
ex . Message );
}
else
{
_logger . LogWarning ( ex , "Retry {Count}/{Max}" ,
retryCount , maxRetries );
await args . AbandonMessageAsync ( args . Message );
await Task . Delay ( TimeSpan . FromSeconds ( Math . Pow ( 2 , retryCount )));
}
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Fatal error" );
await args . DeadLetterMessageAsync (
args . Message ,
"ProcessingError" ,
ex . Message );
return ;
}
}
}
Best Practices
Always implement idempotent handlers: public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
var messageId = message . OrderId . ToString ();
var processed = await _cache . ExistsAsync (
$"processed: { messageId } " ,
cancellationToken );
if ( processed )
{
_logger . LogInformation ( "Message already processed: {MessageId}" ,
messageId );
return ;
}
await ProcessMessageAsync ( message , cancellationToken );
await _cache . SetAsync (
$"processed: { messageId } " ,
true ,
TimeSpan . FromDays ( 7 ),
cancellationToken );
}
Standard tier: 256 KB per message
Premium tier: 1 MB per message
Use claim check pattern for large payloads
// Store large data in blob storage
var blobUri = await _blobStorage . UploadAsync ( largeData );
// Send reference in message
await _eventBus . PublishAsync ( new OrderCreatedEvent
{
OrderId = orderId ,
DataReference = blobUri
});
Use managed identities
Implement least privilege access
Enable diagnostic logging
Encrypt sensitive data
Use private endpoints
Track key metrics:
Message count
Dead letter count
Processing time
Error rate
Active message count
Local Development
Service Bus Emulator
Microsoft provides a Docker-based emulator:
docker run -d -p 5672:5672 \
--name servicebus-emulator \
mcr.microsoft.com/azure-messaging/servicebus-emulator:latest
Connection String:
{
"AzureServiceBus" : {
"ConnectionString" : "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
}
}
Alternatively, use a development namespace in Azure with minimal cost.
Monitoring
Application Insights
_telemetry . TrackEvent ( "ServiceBusMessageProcessed" , new Dictionary < string , string >
{
{ "MessageId" , message . MessageId },
{ "Queue" , "orders" },
{ "ProcessingTime" , stopwatch . ElapsedMilliseconds . ToString () },
{ "DeliveryCount" , message . DeliveryCount . ToString () }
});
Health Checks
services . AddHealthChecks ()
. AddAzureServiceBusQueue (
configuration [ "AzureServiceBus:ConnectionString" ],
"orders" ,
name : "servicebus-orders" );
Troubleshooting
Common Issues
Messages not being received:
Check subscription filters
Verify connection string
Ensure queue/topic exists
Check message lock duration
Messages going to dead letter:
Review dead letter reason
Check message processing logic
Verify message format
Monitor exception logs
Performance issues:
Increase max concurrent calls
Use premium tier for high throughput
Implement prefetching
Optimize message handlers
Resources
Service Bus Docs Official documentation
Best Practices Performance optimization
Azure Functions Functions Service Bus trigger
Eventing Contracts Base eventing infrastructure