Overview
The Azure Functions Service Bus module enables Azure Functions to receive and dispatch Azure Service Bus messages to event handlers. This integration provides a serverless way to process messages from Service Bus queues and topics.
This module automatically integrates with the Intent.Eventing.AzureServiceBus module for message contract generation and publishing.
Installation
Intent.AzureFunctions.AzureServiceBus
Dependencies
Intent.AzureFunctions - Core Azure Functions support
Intent.Eventing.AzureServiceBus - Service Bus eventing contracts
Intent.Eventing.Contracts - Base eventing infrastructure
How It Works
This module generates Azure Function consumers that:
Subscribe to Service Bus queues or topics
Receive messages automatically
Dispatch to application-layer event handlers
Handle acknowledgments and retries
Architecture
Modeling Service Bus Consumers
In the Services Designer
Create Integration Events in the Eventing designer
Model Event Handlers that subscribe to Service Bus messages
Configure the Service Bus trigger settings
Generated Consumer
Intent Architect generates Azure Functions like this:
public class OrderCreatedConsumer
{
private readonly IIntegrationEventHandler < OrderCreatedEvent > _handler ;
public OrderCreatedConsumer ( IIntegrationEventHandler < OrderCreatedEvent > handler )
{
_handler = handler ;
}
[ Function ( "OrderCreatedConsumer" )]
public async Task Run (
[ ServiceBusTrigger ( "%OrderQueue%" , Connection = "ServiceBusConnection" )]
ServiceBusReceivedMessage message ,
ServiceBusMessageActions messageActions ,
CancellationToken cancellationToken )
{
var eventMessage = JsonSerializer . Deserialize < OrderCreatedEvent >(
message . Body . ToString ());
await _handler . HandleAsync ( eventMessage , cancellationToken );
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
}
Configuration
Connection String
Configure your Service Bus connection in appsettings.json:
{
"ServiceBusConnection" : "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yourkey"
}
Queue/Topic Configuration
Use configuration placeholders for queue/topic names:
{
"OrderQueue" : "orders-queue" ,
"NotificationTopic" : "notifications-topic"
}
Message Processing Patterns
Queue Consumer
Process messages from a Service Bus queue:
[ Function ( "ProcessOrder" )]
public async Task Run (
[ ServiceBusTrigger ( "orders-queue" , Connection = "ServiceBusConnection" )]
ServiceBusReceivedMessage message ,
ServiceBusMessageActions messageActions ,
CancellationToken cancellationToken )
{
// Process message
await _handler . HandleAsync ( eventData , cancellationToken );
// Complete message
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
Topic Subscription Consumer
Process messages from a topic subscription:
[ Function ( "ProcessNotification" )]
public async Task Run (
[ ServiceBusTrigger (
"notifications-topic" ,
"email-subscription" ,
Connection = "ServiceBusConnection" )]
ServiceBusReceivedMessage message ,
ServiceBusMessageActions messageActions ,
CancellationToken cancellationToken )
{
await _handler . HandleAsync ( eventData , cancellationToken );
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
Batch Processing
Process multiple messages in a batch:
[ Function ( "ProcessOrdersBatch" )]
public async Task Run (
[ ServiceBusTrigger ( "orders-queue" , Connection = "ServiceBusConnection" )]
ServiceBusReceivedMessage [] messages ,
ServiceBusMessageActions messageActions ,
CancellationToken cancellationToken )
{
foreach ( var message in messages )
{
var order = JsonSerializer . Deserialize < OrderCreatedEvent >(
message . Body . ToString ());
await _handler . HandleAsync ( order , cancellationToken );
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
}
Message Actions
Complete Message
Remove message from queue after successful processing:
await messageActions . CompleteMessageAsync ( message , cancellationToken );
Abandon Message
Return message to queue for reprocessing:
await messageActions . AbandonMessageAsync ( message , cancellationToken );
Dead Letter Message
Move message to dead-letter queue:
await messageActions . DeadLetterMessageAsync (
message ,
"ProcessingError" ,
"Failed to process after multiple attempts" ,
cancellationToken );
Defer Message
Defer message for later processing:
var sequenceNumber = message . SequenceNumber ;
await messageActions . DeferMessageAsync ( message , cancellationToken );
Error Handling
Retry Policies
Configure automatic retries in host.json:
{
"extensions" : {
"serviceBus" : {
"prefetchCount" : 100 ,
"autoCompleteMessages" : false ,
"maxConcurrentCalls" : 16 ,
"maxAutoLockRenewalDuration" : "00:05:00" ,
"maxDeliveryCount" : 3
}
}
}
Exception Handling
[ Function ( "ProcessOrder" )]
public async Task Run (
[ ServiceBusTrigger ( "orders-queue" , Connection = "ServiceBusConnection" )]
ServiceBusReceivedMessage message ,
ServiceBusMessageActions messageActions ,
CancellationToken cancellationToken )
{
try
{
var order = JsonSerializer . Deserialize < OrderCreatedEvent >(
message . Body . ToString ());
await _handler . HandleAsync ( order , cancellationToken );
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
catch ( TransientException ex )
{
// Retry by abandoning
_logger . LogWarning ( ex , "Transient error, will retry" );
await messageActions . AbandonMessageAsync ( message , cancellationToken );
}
catch ( Exception ex )
{
// Dead letter for manual intervention
_logger . LogError ( ex , "Fatal error processing message" );
await messageActions . DeadLetterMessageAsync (
message ,
"ProcessingError" ,
ex . Message ,
cancellationToken );
}
}
Advanced Features
Message Properties
Access custom message properties:
var correlationId = message . CorrelationId ;
var messageId = message . MessageId ;
var customProperty = message . ApplicationProperties [ "CustomProperty" ];
Session-Based Processing
For ordered message processing:
[ Function ( "ProcessSessionMessage" )]
public async Task Run (
[ ServiceBusTrigger (
"session-queue" ,
Connection = "ServiceBusConnection" ,
IsSessionsEnabled = true )]
ServiceBusReceivedMessage message ,
ServiceBusMessageActions messageActions ,
ServiceBusSessionMessageActions sessionActions ,
CancellationToken cancellationToken )
{
var sessionId = message . SessionId ;
var sessionState = await sessionActions . GetSessionStateAsync ( cancellationToken );
// Process message
await _handler . HandleAsync ( eventData , cancellationToken );
// Update session state
await sessionActions . SetSessionStateAsync (
newState ,
cancellationToken );
await messageActions . CompleteMessageAsync ( message , cancellationToken );
}
Local Development
Using Azurite
For local development, use Azure Storage emulator:
{
"ServiceBusConnection" : "UseDevelopmentStorage=true"
}
Azurite doesn’t support Service Bus. For local Service Bus testing, use the Service Bus Emulator or a development namespace in Azure.
Monitoring
Application Insights
Track message processing:
_telemetry . TrackEvent ( "MessageProcessed" , new Dictionary < string , string >
{
{ "MessageId" , message . MessageId },
{ "Queue" , "orders-queue" },
{ "ProcessingTime" , stopwatch . Elapsed . ToString () }
});
Metrics to Monitor
Message processing time
Failed message count
Dead-letter queue depth
Function execution count
Throttling events
Best Practices
Always design handlers to be idempotent - they should produce the same result when processing the same message multiple times. public async Task HandleAsync ( OrderCreatedEvent message , CancellationToken ct )
{
var existing = await _repository . FindByOrderIdAsync ( message . OrderId , ct );
if ( existing != null )
{
_logger . LogInformation ( "Order {OrderId} already processed" , message . OrderId );
return ; // Idempotent - safe to skip
}
// Process new order
await _repository . CreateAsync ( message , ct );
}
Keep messages under 256 KB for Standard tier
Consider using claim check pattern for large payloads
Store large data in blob storage, send reference
Configure maxConcurrentCalls based on your workload:
Start conservative (1-5 for CPU-bound work)
Increase for I/O-bound operations
Monitor for throttling and adjust
Reuse Service Bus clients
Let Azure Functions manage connection pooling
Use managed identities instead of connection strings
Integration with Event Bus
This module works seamlessly with the eventing infrastructure:
// Publishing (from any service)
public class OrderService
{
private readonly IEventBus _eventBus ;
public async Task CreateOrderAsync ( CreateOrderCommand command )
{
// Create order
var order = await _repository . CreateAsync ( command );
// Publish event to Service Bus
await _eventBus . PublishAsync (
new OrderCreatedEvent ( order . Id , order . CustomerId ),
cancellationToken );
}
}
// Consuming (Azure Function)
[ IntentManaged ( Mode . Merge )]
public class OrderCreatedEventHandler
: IIntegrationEventHandler < OrderCreatedEvent >
{
public async Task HandleAsync (
OrderCreatedEvent message ,
CancellationToken cancellationToken )
{
// Your business logic here
await _notificationService . SendOrderConfirmationAsync (
message . CustomerId ,
message . OrderId ,
cancellationToken );
}
}
Resources
Service Bus Triggers Official trigger documentation
Service Bus Best Practices Performance optimization guide
Azure Service Bus Service Bus eventing module
Azure Functions Core Azure Functions module