Skip to main content

Overview

The Azure Functions Service Bus module enables Azure Functions to receive and dispatch Azure Service Bus messages to event handlers. This integration provides a serverless way to process messages from Service Bus queues and topics.
This module automatically integrates with the Intent.Eventing.AzureServiceBus module for message contract generation and publishing.

Installation

Intent.AzureFunctions.AzureServiceBus

Dependencies

  • Intent.AzureFunctions - Core Azure Functions support
  • Intent.Eventing.AzureServiceBus - Service Bus eventing contracts
  • Intent.Eventing.Contracts - Base eventing infrastructure

How It Works

This module generates Azure Function consumers that:
  1. Subscribe to Service Bus queues or topics
  2. Receive messages automatically
  3. Dispatch to application-layer event handlers
  4. Handle acknowledgments and retries

Architecture

Modeling Service Bus Consumers

In the Services Designer

  1. Create Integration Events in the Eventing designer
  2. Model Event Handlers that subscribe to Service Bus messages
  3. Configure the Service Bus trigger settings

Generated Consumer

Intent Architect generates Azure Functions like this:
public class OrderCreatedConsumer
{
    private readonly IIntegrationEventHandler<OrderCreatedEvent> _handler;

    public OrderCreatedConsumer(IIntegrationEventHandler<OrderCreatedEvent> handler)
    {
        _handler = handler;
    }

    [Function("OrderCreatedConsumer")]
    public async Task Run(
        [ServiceBusTrigger("%OrderQueue%", Connection = "ServiceBusConnection")] 
        ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions,
        CancellationToken cancellationToken)
    {
        var eventMessage = JsonSerializer.Deserialize<OrderCreatedEvent>(
            message.Body.ToString());
        
        await _handler.HandleAsync(eventMessage, cancellationToken);
        await messageActions.CompleteMessageAsync(message, cancellationToken);
    }
}

Configuration

Connection String

Configure your Service Bus connection in appsettings.json:
{
  "ServiceBusConnection": "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yourkey"
}

Queue/Topic Configuration

Use configuration placeholders for queue/topic names:
{
  "OrderQueue": "orders-queue",
  "NotificationTopic": "notifications-topic"
}

Message Processing Patterns

Queue Consumer

Process messages from a Service Bus queue:
[Function("ProcessOrder")]
public async Task Run(
    [ServiceBusTrigger("orders-queue", Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage message,
    ServiceBusMessageActions messageActions,
    CancellationToken cancellationToken)
{
    // Process message
    await _handler.HandleAsync(eventData, cancellationToken);
    
    // Complete message
    await messageActions.CompleteMessageAsync(message, cancellationToken);
}

Topic Subscription Consumer

Process messages from a topic subscription:
[Function("ProcessNotification")]
public async Task Run(
    [ServiceBusTrigger(
        "notifications-topic", 
        "email-subscription",
        Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage message,
    ServiceBusMessageActions messageActions,
    CancellationToken cancellationToken)
{
    await _handler.HandleAsync(eventData, cancellationToken);
    await messageActions.CompleteMessageAsync(message, cancellationToken);
}

Batch Processing

Process multiple messages in a batch:
[Function("ProcessOrdersBatch")]
public async Task Run(
    [ServiceBusTrigger("orders-queue", Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage[] messages,
    ServiceBusMessageActions messageActions,
    CancellationToken cancellationToken)
{
    foreach (var message in messages)
    {
        var order = JsonSerializer.Deserialize<OrderCreatedEvent>(
            message.Body.ToString());
        await _handler.HandleAsync(order, cancellationToken);
        await messageActions.CompleteMessageAsync(message, cancellationToken);
    }
}

Message Actions

Complete Message

Remove message from queue after successful processing:
await messageActions.CompleteMessageAsync(message, cancellationToken);

Abandon Message

Return message to queue for reprocessing:
await messageActions.AbandonMessageAsync(message, cancellationToken);

Dead Letter Message

Move message to dead-letter queue:
await messageActions.DeadLetterMessageAsync(
    message, 
    "ProcessingError",
    "Failed to process after multiple attempts",
    cancellationToken);

Defer Message

Defer message for later processing:
var sequenceNumber = message.SequenceNumber;
await messageActions.DeferMessageAsync(message, cancellationToken);

Error Handling

Retry Policies

Configure automatic retries in host.json:
{
  "extensions": {
    "serviceBus": {
      "prefetchCount": 100,
      "autoCompleteMessages": false,
      "maxConcurrentCalls": 16,
      "maxAutoLockRenewalDuration": "00:05:00",
      "maxDeliveryCount": 3
    }
  }
}

Exception Handling

[Function("ProcessOrder")]
public async Task Run(
    [ServiceBusTrigger("orders-queue", Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage message,
    ServiceBusMessageActions messageActions,
    CancellationToken cancellationToken)
{
    try
    {
        var order = JsonSerializer.Deserialize<OrderCreatedEvent>(
            message.Body.ToString());
        await _handler.HandleAsync(order, cancellationToken);
        await messageActions.CompleteMessageAsync(message, cancellationToken);
    }
    catch (TransientException ex)
    {
        // Retry by abandoning
        _logger.LogWarning(ex, "Transient error, will retry");
        await messageActions.AbandonMessageAsync(message, cancellationToken);
    }
    catch (Exception ex)
    {
        // Dead letter for manual intervention
        _logger.LogError(ex, "Fatal error processing message");
        await messageActions.DeadLetterMessageAsync(
            message, 
            "ProcessingError",
            ex.Message,
            cancellationToken);
    }
}

Advanced Features

Message Properties

Access custom message properties:
var correlationId = message.CorrelationId;
var messageId = message.MessageId;
var customProperty = message.ApplicationProperties["CustomProperty"];

Session-Based Processing

For ordered message processing:
[Function("ProcessSessionMessage")]
public async Task Run(
    [ServiceBusTrigger(
        "session-queue", 
        Connection = "ServiceBusConnection",
        IsSessionsEnabled = true)] 
    ServiceBusReceivedMessage message,
    ServiceBusMessageActions messageActions,
    ServiceBusSessionMessageActions sessionActions,
    CancellationToken cancellationToken)
{
    var sessionId = message.SessionId;
    var sessionState = await sessionActions.GetSessionStateAsync(cancellationToken);
    
    // Process message
    await _handler.HandleAsync(eventData, cancellationToken);
    
    // Update session state
    await sessionActions.SetSessionStateAsync(
        newState, 
        cancellationToken);
    await messageActions.CompleteMessageAsync(message, cancellationToken);
}

Local Development

Using Azurite

For local development, use Azure Storage emulator:
{
  "ServiceBusConnection": "UseDevelopmentStorage=true"
}
Azurite doesn’t support Service Bus. For local Service Bus testing, use the Service Bus Emulator or a development namespace in Azure.

Monitoring

Application Insights

Track message processing:
_telemetry.TrackEvent("MessageProcessed", new Dictionary<string, string>
{
    { "MessageId", message.MessageId },
    { "Queue", "orders-queue" },
    { "ProcessingTime", stopwatch.Elapsed.ToString() }
});

Metrics to Monitor

  • Message processing time
  • Failed message count
  • Dead-letter queue depth
  • Function execution count
  • Throttling events

Best Practices

Always design handlers to be idempotent - they should produce the same result when processing the same message multiple times.
public async Task HandleAsync(OrderCreatedEvent message, CancellationToken ct)
{
    var existing = await _repository.FindByOrderIdAsync(message.OrderId, ct);
    if (existing != null)
    {
        _logger.LogInformation("Order {OrderId} already processed", message.OrderId);
        return; // Idempotent - safe to skip
    }
    
    // Process new order
    await _repository.CreateAsync(message, ct);
}
  • Keep messages under 256 KB for Standard tier
  • Consider using claim check pattern for large payloads
  • Store large data in blob storage, send reference
Configure maxConcurrentCalls based on your workload:
  • Start conservative (1-5 for CPU-bound work)
  • Increase for I/O-bound operations
  • Monitor for throttling and adjust
  • Reuse Service Bus clients
  • Let Azure Functions manage connection pooling
  • Use managed identities instead of connection strings

Integration with Event Bus

This module works seamlessly with the eventing infrastructure:
// Publishing (from any service)
public class OrderService
{
    private readonly IEventBus _eventBus;

    public async Task CreateOrderAsync(CreateOrderCommand command)
    {
        // Create order
        var order = await _repository.CreateAsync(command);
        
        // Publish event to Service Bus
        await _eventBus.PublishAsync(
            new OrderCreatedEvent(order.Id, order.CustomerId),
            cancellationToken);
    }
}

// Consuming (Azure Function)
[IntentManaged(Mode.Merge)]
public class OrderCreatedEventHandler 
    : IIntegrationEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(
        OrderCreatedEvent message, 
        CancellationToken cancellationToken)
    {
        // Your business logic here
        await _notificationService.SendOrderConfirmationAsync(
            message.CustomerId, 
            message.OrderId,
            cancellationToken);
    }
}

Resources

Service Bus Triggers

Official trigger documentation

Service Bus Best Practices

Performance optimization guide

Azure Service Bus

Service Bus eventing module

Azure Functions

Core Azure Functions module

Build docs developers (and LLMs) love