Skip to main content

Overview

The Email Service processes email notifications from RabbitMQ queues and sends them via SMTP. Built with C# and .NET, it features robust retry logic, idempotency checks, and exponential backoff for failed deliveries.

Purpose and Responsibilities

  • Queue consumption - Consumes messages from email.queue via RabbitMQ
  • Template rendering - Replaces template variables with actual values
  • Email delivery - Sends emails via SMTP using MailKit
  • Retry logic - Implements exponential backoff for transient failures
  • Idempotency - Prevents duplicate email sends using Redis
  • Status tracking - Updates delivery status in Redis
  • Failed message handling - Routes permanently failed messages to dead-letter queue

Tech Stack

  • Language: C# / .NET 6+
  • Email Library: MailKit (SMTP client)
  • Message Broker: RabbitMQ (RabbitMQ.Client)
  • Cache/Status Store: Redis (StackExchange.Redis)
  • Retry Library: Polly (resilience and retry policies)
  • Logging: Serilog
  • JSON: Newtonsoft.Json

Configuration

Port: 8003 (health endpoint, default 8080) Environment Variables:
PORT=8080
RABBITMQ_HOST=localhost
RABBITMQ_USER=guest
RABBITMQ_PASS=guest
RABBITMQ_EXCHANGE=notifications.direct
RABBITMQ_EMAIL_QUEUE=email.queue
RABBITMQ_FAILED_QUEUE=failed.queue
REDIS_HOST=localhost
SMTP_HOST=smtp.mailtrap.io
SMTP_PORT=2525
SMTP_USER=your-username
SMTP_PASS=your-password
SMTP_FROM=[email protected]

RabbitMQ Consumption

The service consumes messages from the email.queue bound to the notifications.direct exchange.

Queue Configuration

Exchange: notifications.direct Queue: email.queue Routing Key: email Durable: true Auto-delete: false Implementation (Services/RabbitMqSubscriber.cs:18-46):
public RabbitMqSubscriber(IConfiguration cfg, ILogger<RabbitMqSubscriber> log)
{
    _cfg = cfg;
    _log = log;
    var host = cfg["RABBITMQ_HOST"];
    log.LogInformation("Connecting to RabbitMQ host {Host}", host);
    var factory = new ConnectionFactory
    {
        ClientProvidedName = "Email Service",
        HostName = host,
        UserName = cfg["RABBITMQ_USER"],
        Password = cfg["RABBITMQ_PASS"],
        DispatchConsumersAsync = true,
        RequestedHeartbeat = TimeSpan.FromSeconds(30)
    };
    _conn = factory.CreateConnection();
    _channel = _conn.CreateModel();
    log.LogInformation("Successfully connected to RabbitMQ");

    _exchange = cfg["RABBITMQ_EXCHANGE"]!;
    _queue = cfg["RABBITMQ_EMAIL_QUEUE"]!;
    _failedQueue = cfg["RABBITMQ_FAILED_QUEUE"]!;

    log.LogInformation("Declaring exchange '{Exchange}' and queues '{Queue}', '{FailedQueue}'", 
        _exchange, _queue, _failedQueue);
    _channel.ExchangeDeclare(_exchange, "direct", durable: true);
    _channel.QueueDeclare(_queue, durable: true, exclusive: false, autoDelete: false);
    _channel.QueueDeclare(_failedQueue, durable: true, exclusive: false, autoDelete: false);
    _channel.QueueBind(_queue, _exchange, "email");
}

Message Consumer

Implementation (Services/RabbitMqSubscriber.cs:48-68):
public void StartConsuming(Func<NotificationMessage, BasicDeliverEventArgs, Task> handler)
{
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += async (s, ea) =>
    {
        _log.LogInformation("Received message with delivery tag {DeliveryTag}", ea.DeliveryTag);
        var json = Encoding.UTF8.GetString(ea.Body.ToArray());
        var msg = JsonConvert.DeserializeObject<NotificationMessage>(json, SnakeCaseJsonSettings.Settings);
        if (msg != null)
        {
            await handler(msg, ea);
        }
        else
        {
            _log.LogWarning("Could not deserialize message with delivery tag {DeliveryTag}. Acknowledging to discard.", 
                ea.DeliveryTag);
            _channel.BasicAck(ea.DeliveryTag, false);
        }
    };
    _channel.BasicConsume(_queue, false, consumer);
    _log.LogInformation("Consumer started on queue '{Queue}'", _queue);
}
The service uses manual acknowledgment (autoAck: false) to ensure messages are only removed from the queue after successful processing.

SMTP Configuration

The service uses MailKit to send emails via SMTP. Email Sender (Services/EmailSender.cs:8-37):
public async Task SendAsync(string to, string subject, string html, CancellationToken ct)
{
    var msg = new MimeMessage();
    msg.From.Add(MailboxAddress.Parse(cfg["SMTP_FROM"]));
    msg.To.Add(MailboxAddress.Parse(to));
    msg.Subject = subject;
    msg.Body = new TextPart("html") { Text = html };

    using var client = new SmtpClient();
    var host = cfg["SMTP_HOST"];
    var port = int.Parse(cfg["SMTP_PORT"]);

    log.LogInformation("Connecting to SMTP server {Host}:{Port}", host, port);
    await client.ConnectAsync(host, port, false, ct);
    log.LogInformation("Connection established");

    if (!string.IsNullOrEmpty(cfg["SMTP_USER"]))
    {
        log.LogInformation("Attempting to authenticate as {User}", cfg["SMTP_USER"]);
        await client.AuthenticateAsync(cfg["SMTP_USER"], cfg["SMTP_PASS"], ct);
        log.LogInformation("Authentication successful");
    }

    log.LogInformation("Sending email with subject '{Subject}' to {To}", subject, to);
    await client.SendAsync(msg, ct);
    log.LogInformation("Email sent successfully");

    await client.DisconnectAsync(true, ct);
    log.LogInformation("Disconnected from SMTP server");
}
Supported SMTP Providers:
  • Gmail (smtp.gmail.com:587)
  • SendGrid (smtp.sendgrid.net:587)
  • Mailgun (smtp.mailgun.org:587)
  • Mailtrap (smtp.mailtrap.io:2525) - for testing
  • Amazon SES
  • Custom SMTP servers
For Gmail, you must use an App Password instead of your regular password. Enable 2FA and generate an app-specific password.

Retry Logic

The service implements exponential backoff retry logic for transient failures. Retry Strategy (Services/EmailProcessorBackgroundService.cs:46-72):
catch (Exception ex)
{
    log.LogError(ex, "Failed to process notification {NotificationId}", msg.notification_id);
    var attempt = (msg.metadata?.ContainsKey("attempt") == true) 
        ? Convert.ToInt32(msg.metadata["attempt"]) + 1 
        : 1;
    
    if (attempt > 5)
    {
        log.LogCritical("Attempt {Attempt} failed for notification {NotificationId}. Moving to failed queue.", 
            attempt, msg.notification_id);
        mq.PublishToFailed(ea.Body.ToArray());
        await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed", attempt, ex.Message);
    }
    else
    {
        var delay = 2000 * (int)Math.Pow(2, attempt - 1);
        log.LogWarning("Attempt {Attempt} for notification {NotificationId} failed. Retrying in {Delay}ms", 
            attempt, msg.notification_id, delay);
        await Task.Delay(delay);
        
        msg.metadata ??= new();
        msg.metadata["attempt"] = attempt;
        var json = JsonConvert.SerializeObject(msg, SnakeCaseJsonSettings.Settings);
        mq.Publish("email", Encoding.UTF8.GetBytes(json));
    }
}
finally
{
    mq.Ack(ea.DeliveryTag, false);
}
Retry Schedule:
AttemptDelayTotal Time Elapsed
10ms0s
22s2s
34s6s
48s14s
516s30s
6+FailedMoved to dead-letter queue
Formula: delay = 2000ms × 2^(attempt - 1)
After 5 failed attempts, the message is moved to the failed.queue for manual investigation.

Idempotency

The service uses Redis to prevent duplicate email sends. Idempotency Check (Services/EmailProcessorBackgroundService.cs:19-24):
if (await store.IsProcessedAsync(msg.notification_id))
{
    mq.Ack(ea.DeliveryTag, false);
    return;
}
Status Tracking:
await store.MarkProcessedAsync(msg.notification_id, msg.request_id);  // Success
await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed", attempt, ex.Message);  // Failure
Redis Keys:
  • notification:{notification_id} - Tracks processing status
  • Automatically expires after a configurable TTL

Template Rendering

The service renders templates by replacing {{variable}} placeholders with actual values. Rendering Logic (Services/EmailProcessorBackgroundService.cs:77-78):
private string Render(string template, Dictionary<string, string>? vars) =>
    vars?.Aggregate(template, (c, kv) => c.Replace($"{{{{{kv.Key}}}}}", kv.Value)) ?? template;
Example:
// Template
subject = "Welcome, {{name}}!";
body = "Hi {{name}}, thanks for signing up at {{link}}";

// Variables
vars = { "name": "John", "link": "https://example.com" };

// Rendered
subject = "Welcome, John!";
body = "Hi John, thanks for signing up at https://example.com";

Message Processing Flow

The service follows this processing workflow:
  1. Receive Message - Consume from email.queue
  2. Idempotency Check - Check if notification_id already processed
  3. Validation - Ensure user and template are present, check email preference
  4. Template Rendering - Replace variables in subject and body
  5. Email Sending - Send via SMTP
  6. Status Update - Mark as processed in Redis
  7. Acknowledgment - ACK message to RabbitMQ
Full Implementation (Services/EmailProcessorBackgroundService.cs:14-72):
protected override Task ExecuteAsync(CancellationToken ct)
{
    mq.StartConsuming(async (msg, ea) =>
    {
        log.LogInformation("Starting to process notification {NotificationId} (Request ID: {RequestId})", 
            msg.notification_id, msg.request_id);
        try
        {
            // Idempotency check
            if (await store.IsProcessedAsync(msg.notification_id))
            {
                mq.Ack(ea.DeliveryTag, false);
                return;
            }

            var user = msg.user;
            var template = msg.template;

            // Validation
            if (user == null || template == null || !user.preferences.GetValueOrDefault("email"))
            {
                log.LogWarning("Validation failed for notification {NotificationId}. User or template is null, or email preference is disabled.", 
                    msg.notification_id);
                await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed");
                mq.Ack(ea.DeliveryTag, false);
                return;
            }

            // Render template
            log.LogInformation("Rendering template '{TemplateCode}' for user {UserId}", 
                template.template_code, user.user_id);
            var subject = Render(template.subject, msg.variables);
            var body = Render(template.body, msg.variables);

            // Send email
            await sender.SendAsync(user.email, subject, body, ct);
            await store.MarkProcessedAsync(msg.notification_id, msg.request_id);
            log.LogInformation("Successfully processed notification {NotificationId}", msg.notification_id);
        }
        catch (Exception ex)
        {
            // Retry logic (see above)
        }
        finally
        {
            mq.Ack(ea.DeliveryTag, false);
        }
    });

    return Task.CompletedTask;
}

Health Endpoint

The service exposes a lightweight health check endpoint without ASP.NET Core. Implementation (Program.cs:33-53):
_ = Task.Run(async () =>
{
    var port = Environment.GetEnvironmentVariable("PORT") ?? "8080";
    var listener = new HttpListener();
    listener.Prefixes.Add($"http://+:{port}/health/");
    listener.Start();

    Log.Information($"Health endpoint listening on http://+:{port}/health");

    while (true)
    {
        var context = await listener.GetContextAsync();
        var response = context.Response;
        var buffer = Encoding.UTF8.GetBytes("{\"status\":\"healthy\"}");
        response.ContentType = "application/json";
        response.ContentLength64 = buffer.Length;
        await response.OutputStream.WriteAsync(buffer);
        response.Close();
    }
});
Response (200 OK):
{
  "status": "healthy"
}

Logging

The service uses Serilog for structured logging. Configuration (Program.cs:8-10):
Log.Logger = new LoggerConfiguration()
    .WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
    .CreateLogger();
Log Levels:
  • Information - Normal operations (connection, message received, email sent)
  • Warning - Validation failures, retry attempts
  • Error - Email sending failures, processing errors
  • Critical - Max retries exceeded, moving to failed queue

Running the Service

Development

dotnet run --project EmailService

Production

dotnet publish -c Release
cd bin/Release/net6.0/publish
dotnet EmailService.dll

Docker

docker build -t email-service .
docker run -p 8003:8080 \
  -e RABBITMQ_HOST=rabbitmq \
  -e REDIS_HOST=redis \
  -e SMTP_HOST=smtp.mailtrap.io \
  -e SMTP_PORT=2525 \
  -e SMTP_USER=your-username \
  -e SMTP_PASS=your-password \
  -e [email protected] \
  email-service

Error Handling

Validation Failures:
  • Missing user or template → Mark as failed, ACK message
  • Email preference disabled → Mark as failed, ACK message
Transient Failures (retry):
  • SMTP connection timeout
  • Network errors
  • Rate limiting (429)
Permanent Failures (move to failed queue):
  • Invalid email address
  • Max retries exceeded
  • SMTP authentication failure

Performance Considerations

  • Connection reuse: SMTP connections are created per email (consider connection pooling)
  • Concurrency: Single-threaded processing (consider parallel processing)
  • Rate limiting: No built-in rate limiting (add if needed)
  • Batch processing: Processes one message at a time

Build docs developers (and LLMs) love