Skip to main content

Overview

The Azure Service Bus eventing module provides patterns for using Azure Service Bus, a fully managed enterprise message broker, with events modeled in Intent Architect’s designers. Service Bus enables reliable cloud messaging between applications and services, even when they’re offline.

Installation

Intent.Eventing.AzureServiceBus

Azure Functions Integration

Process Service Bus messages with Azure Functions

Eventing Contracts

Base eventing infrastructure

What is Azure Service Bus?

Azure Service Bus is a fully managed enterprise message broker that:
  • Provides reliable message delivery
  • Supports advanced messaging patterns
  • Scales automatically
  • Offers built-in disaster recovery
  • Integrates seamlessly with Azure services

Messaging Patterns

Queues - Point-to-point communication
Topics - Publish-subscribe communication
Subscriptions - Filter messages from topics

What’s Generated

This module generates:
  • Message Bus Implementation - IEventBus for Azure Service Bus
  • Configuration - Connection and dependency injection
  • Message Dispatcher - Routes messages to handlers
  • Hosted Service - Background message processing
  • Publisher Options - Topic and queue configuration
  • Subscription Options - Message filtering and routing

Configuration

Connection String

appsettings.json:
{
  "AzureServiceBus": {
    "ConnectionString": "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=yourkey"
  }
}
appsettings.json:
{
  "AzureServiceBus": {
    "FullyQualifiedNamespace": "your-namespace.servicebus.windows.net"
  }
}
Code:
services.AddSingleton(sp =>
{
    var config = sp.GetRequiredService<IConfiguration>();
    var fqn = config["AzureServiceBus:FullyQualifiedNamespace"];
    return new ServiceBusClient(fqn, new DefaultAzureCredential());
});

Modeling Events

Integration Events

Model events in the Eventing designer:
public class OrderCreatedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
}

Topic Configuration

Use stereotypes to configure topics: Topic Name: order-events
Subscription Name: notification-service

Publishing Messages

Using IEventBus

public class OrderService
{
    private readonly IEventBus _eventBus;
    private readonly IOrderRepository _repository;

    public async Task CreateOrderAsync(
        CreateOrderCommand command,
        CancellationToken cancellationToken)
    {
        // Create order
        var order = await _repository.CreateAsync(command, cancellationToken);
        
        // Publish to Service Bus
        await _eventBus.PublishAsync(
            new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                CreatedAt = DateTime.UtcNow
            },
            cancellationToken);
    }
}

Queue vs Topic

Queue (Point-to-Point):
// Send command to specific queue
await _eventBus.SendAsync(
    new ProcessPaymentCommand { OrderId = orderId },
    cancellationToken);
Topic (Publish-Subscribe):
// Publish event to topic (multiple subscribers)
await _eventBus.PublishAsync(
    new OrderCreatedEvent { OrderId = orderId },
    cancellationToken);

Message Properties

public async Task PublishWithPropertiesAsync()
{
    var message = new ServiceBusMessage(
        JsonSerializer.SerializeToUtf8Bytes(orderEvent))
    {
        MessageId = Guid.NewGuid().ToString(),
        CorrelationId = correlationId,
        SessionId = customerId.ToString(),
        ContentType = "application/json",
        Subject = "OrderCreated"
    };
    
    message.ApplicationProperties.Add("Priority", "High");
    message.ApplicationProperties.Add("Source", "WebAPI");
    
    await sender.SendMessageAsync(message);
}

Consuming Messages

Generated Message Dispatcher

public class AzureServiceBusMessageDispatcher
{
    private readonly IServiceProvider _serviceProvider;

    public async Task DispatchAsync<T>(
        T message,
        CancellationToken cancellationToken) where T : class
    {
        using var scope = _serviceProvider.CreateScope();
        var handler = scope.ServiceProvider
            .GetRequiredService<IIntegrationEventHandler<T>>();
        
        await handler.HandleAsync(message, cancellationToken);
    }
}

Event Handler

[IntentManaged(Mode.Merge, Signature = Mode.Fully)]
public class OrderCreatedEventHandler 
    : IIntegrationEventHandler<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<OrderCreatedEventHandler> _logger;

    [IntentManaged(Mode.Fully, Body = Mode.Ignore)]
    public async Task HandleAsync(
        OrderCreatedEvent message,
        CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Processing order created event for order {OrderId}",
            message.OrderId);

        await _emailService.SendOrderConfirmationAsync(
            message.CustomerId,
            message.OrderId,
            cancellationToken);
    }
}

Hosted Service

public class AzureServiceBusHostedService : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    private readonly IAzureServiceBusMessageDispatcher _dispatcher;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var message = JsonSerializer.Deserialize<OrderCreatedEvent>(
                args.Message.Body);
            
            await _dispatcher.DispatchAsync(message, args.CancellationToken);
            await args.CompleteMessageAsync(args.Message);
        };
        
        _processor.ProcessErrorAsync += args =>
        {
            _logger.LogError(args.Exception, "Error processing message");
            return Task.CompletedTask;
        };
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
}

Advanced Features

Message Sessions

Guarantee ordered processing:
// Enable sessions on queue/subscription
var createQueueOptions = new CreateQueueOptions("orders")
{
    RequiresSession = true
};

// Send with session ID
var message = new ServiceBusMessage(data)
{
    SessionId = customerId.ToString() // All customer messages in same session
};

// Process with session
var processor = client.CreateSessionProcessor("orders", new ServiceBusSessionProcessorOptions
{
    MaxConcurrentSessions = 5
});

Message Deferral

Defer messages for later processing:
public async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
    if (!CanProcessNow())
    {
        // Defer for later
        var sequenceNumber = args.Message.SequenceNumber;
        await args.DeferMessageAsync(args.Message);
        
        // Store sequence number for later retrieval
        await _cache.SetAsync($"deferred:{args.Message.MessageId}", sequenceNumber);
        return;
    }
    
    await ProcessAsync(args.Message);
    await args.CompleteMessageAsync(args.Message);
}

// Retrieve deferred message later
public async Task ProcessDeferredMessageAsync(string messageId)
{
    var sequenceNumber = await _cache.GetAsync<long>($"deferred:{messageId}");
    var message = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
    await ProcessAsync(message);
}

Scheduled Messages

Schedule future delivery:
public async Task ScheduleReminderAsync(
    Guid orderId,
    TimeSpan delay)
{
    var message = new ServiceBusMessage(
        JsonSerializer.SerializeToUtf8Bytes(
            new OrderReminderEvent { OrderId = orderId }))
    {
        MessageId = Guid.NewGuid().ToString()
    };
    
    var scheduleTime = DateTimeOffset.UtcNow.Add(delay);
    await sender.ScheduleMessageAsync(message, scheduleTime);
}

Dead Letter Queue

Handle poison messages:
public async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
    try
    {
        await ProcessAsync(args.Message);
        await args.CompleteMessageAsync(args.Message);
    }
    catch (ValidationException ex)
    {
        // Move to dead letter queue
        await args.DeadLetterMessageAsync(
            args.Message,
            "ValidationError",
            ex.Message);
    }
    catch (TransientException ex)
    {
        // Abandon for retry
        await args.AbandonMessageAsync(args.Message);
    }
}

// Process dead letter queue
var dlqReceiver = client.CreateReceiver(
    "orders",
    new ServiceBusReceiverOptions
    {
        SubQueue = SubQueue.DeadLetter
    });

Message Filters

Filter subscription messages:
// SQL Filter
var ruleOptions = new CreateRuleOptions
{
    Name = "HighPriorityOrders",
    Filter = new SqlRuleFilter(
        "TotalAmount > 1000 OR Priority = 'High'")
};

await adminClient.CreateRuleAsync(
    "order-events",
    "high-priority-subscription",
    ruleOptions);

// Correlation Filter
var correlationFilter = new CorrelationRuleFilter
{
    Subject = "OrderCreated",
    CorrelationId = correlationId
};

var ruleOptions = new CreateRuleOptions
{
    Name = "OrderCreatedFilter",
    Filter = correlationFilter
};

Error Handling

Retry Policy

var processorOptions = new ServiceBusProcessorOptions
{
    MaxConcurrentCalls = 5,
    AutoCompleteMessages = false,
    MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(5)
};

var processor = client.CreateProcessor("orders", processorOptions);

Exception Handling

public async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
    var retryCount = 0;
    var maxRetries = 3;
    
    while (retryCount < maxRetries)
    {
        try
        {
            await ProcessAsync(args.Message);
            await args.CompleteMessageAsync(args.Message);
            return;
        }
        catch (TransientException ex)
        {
            retryCount++;
            if (retryCount >= maxRetries)
            {
                _logger.LogError(ex, "Max retries exceeded");
                await args.DeadLetterMessageAsync(
                    args.Message,
                    "MaxRetriesExceeded",
                    ex.Message);
            }
            else
            {
                _logger.LogWarning(ex, "Retry {Count}/{Max}",
                    retryCount, maxRetries);
                await args.AbandonMessageAsync(args.Message);
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retryCount)));
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Fatal error");
            await args.DeadLetterMessageAsync(
                args.Message,
                "ProcessingError",
                ex.Message);
            return;
        }
    }
}

Best Practices

Always implement idempotent handlers:
public async Task HandleAsync(
    OrderCreatedEvent message,
    CancellationToken cancellationToken)
{
    var messageId = message.OrderId.ToString();
    var processed = await _cache.ExistsAsync(
        $"processed:{messageId}",
        cancellationToken);
    
    if (processed)
    {
        _logger.LogInformation("Message already processed: {MessageId}",
            messageId);
        return;
    }
    
    await ProcessMessageAsync(message, cancellationToken);
    
    await _cache.SetAsync(
        $"processed:{messageId}",
        true,
        TimeSpan.FromDays(7),
        cancellationToken);
}
  • Standard tier: 256 KB per message
  • Premium tier: 1 MB per message
  • Use claim check pattern for large payloads
// Store large data in blob storage
var blobUri = await _blobStorage.UploadAsync(largeData);

// Send reference in message
await _eventBus.PublishAsync(new OrderCreatedEvent
{
    OrderId = orderId,
    DataReference = blobUri
});
  • Use managed identities
  • Implement least privilege access
  • Enable diagnostic logging
  • Encrypt sensitive data
  • Use private endpoints
Track key metrics:
  • Message count
  • Dead letter count
  • Processing time
  • Error rate
  • Active message count

Local Development

Service Bus Emulator

Microsoft provides a Docker-based emulator:
docker run -d -p 5672:5672 \
  --name servicebus-emulator \
  mcr.microsoft.com/azure-messaging/servicebus-emulator:latest
Connection String:
{
  "AzureServiceBus": {
    "ConnectionString": "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
  }
}
Alternatively, use a development namespace in Azure with minimal cost.

Monitoring

Application Insights

_telemetry.TrackEvent("ServiceBusMessageProcessed", new Dictionary<string, string>
{
    { "MessageId", message.MessageId },
    { "Queue", "orders" },
    { "ProcessingTime", stopwatch.ElapsedMilliseconds.ToString() },
    { "DeliveryCount", message.DeliveryCount.ToString() }
});

Health Checks

services.AddHealthChecks()
    .AddAzureServiceBusQueue(
        configuration["AzureServiceBus:ConnectionString"],
        "orders",
        name: "servicebus-orders");

Troubleshooting

Common Issues

Messages not being received:
  • Check subscription filters
  • Verify connection string
  • Ensure queue/topic exists
  • Check message lock duration
Messages going to dead letter:
  • Review dead letter reason
  • Check message processing logic
  • Verify message format
  • Monitor exception logs
Performance issues:
  • Increase max concurrent calls
  • Use premium tier for high throughput
  • Implement prefetching
  • Optimize message handlers

Resources

Service Bus Docs

Official documentation

Best Practices

Performance optimization

Azure Functions

Functions Service Bus trigger

Eventing Contracts

Base eventing infrastructure

Build docs developers (and LLMs) love