Overview
The Email Service processes email notifications from RabbitMQ queues and sends them via SMTP. Built with C# and .NET, it features robust retry logic, idempotency checks, and exponential backoff for failed deliveries.
Purpose and Responsibilities
- Queue consumption - Consumes messages from
email.queue via RabbitMQ
- Template rendering - Replaces template variables with actual values
- Email delivery - Sends emails via SMTP using MailKit
- Retry logic - Implements exponential backoff for transient failures
- Idempotency - Prevents duplicate email sends using Redis
- Status tracking - Updates delivery status in Redis
- Failed message handling - Routes permanently failed messages to dead-letter queue
Tech Stack
- Language: C# / .NET 6+
- Email Library: MailKit (SMTP client)
- Message Broker: RabbitMQ (RabbitMQ.Client)
- Cache/Status Store: Redis (StackExchange.Redis)
- Retry Library: Polly (resilience and retry policies)
- Logging: Serilog
- JSON: Newtonsoft.Json
Configuration
Port: 8003 (health endpoint, default 8080)
Environment Variables:
PORT=8080
RABBITMQ_HOST=localhost
RABBITMQ_USER=guest
RABBITMQ_PASS=guest
RABBITMQ_EXCHANGE=notifications.direct
RABBITMQ_EMAIL_QUEUE=email.queue
RABBITMQ_FAILED_QUEUE=failed.queue
REDIS_HOST=localhost
SMTP_HOST=smtp.mailtrap.io
SMTP_PORT=2525
SMTP_USER=your-username
SMTP_PASS=your-password
SMTP_FROM=[email protected]
RabbitMQ Consumption
The service consumes messages from the email.queue bound to the notifications.direct exchange.
Queue Configuration
Exchange: notifications.direct
Queue: email.queue
Routing Key: email
Durable: true
Auto-delete: false
Implementation (Services/RabbitMqSubscriber.cs:18-46):
public RabbitMqSubscriber(IConfiguration cfg, ILogger<RabbitMqSubscriber> log)
{
_cfg = cfg;
_log = log;
var host = cfg["RABBITMQ_HOST"];
log.LogInformation("Connecting to RabbitMQ host {Host}", host);
var factory = new ConnectionFactory
{
ClientProvidedName = "Email Service",
HostName = host,
UserName = cfg["RABBITMQ_USER"],
Password = cfg["RABBITMQ_PASS"],
DispatchConsumersAsync = true,
RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
log.LogInformation("Successfully connected to RabbitMQ");
_exchange = cfg["RABBITMQ_EXCHANGE"]!;
_queue = cfg["RABBITMQ_EMAIL_QUEUE"]!;
_failedQueue = cfg["RABBITMQ_FAILED_QUEUE"]!;
log.LogInformation("Declaring exchange '{Exchange}' and queues '{Queue}', '{FailedQueue}'",
_exchange, _queue, _failedQueue);
_channel.ExchangeDeclare(_exchange, "direct", durable: true);
_channel.QueueDeclare(_queue, durable: true, exclusive: false, autoDelete: false);
_channel.QueueDeclare(_failedQueue, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(_queue, _exchange, "email");
}
Message Consumer
Implementation (Services/RabbitMqSubscriber.cs:48-68):
public void StartConsuming(Func<NotificationMessage, BasicDeliverEventArgs, Task> handler)
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (s, ea) =>
{
_log.LogInformation("Received message with delivery tag {DeliveryTag}", ea.DeliveryTag);
var json = Encoding.UTF8.GetString(ea.Body.ToArray());
var msg = JsonConvert.DeserializeObject<NotificationMessage>(json, SnakeCaseJsonSettings.Settings);
if (msg != null)
{
await handler(msg, ea);
}
else
{
_log.LogWarning("Could not deserialize message with delivery tag {DeliveryTag}. Acknowledging to discard.",
ea.DeliveryTag);
_channel.BasicAck(ea.DeliveryTag, false);
}
};
_channel.BasicConsume(_queue, false, consumer);
_log.LogInformation("Consumer started on queue '{Queue}'", _queue);
}
The service uses manual acknowledgment (autoAck: false) to ensure messages are only removed from the queue after successful processing.
SMTP Configuration
The service uses MailKit to send emails via SMTP.
Email Sender (Services/EmailSender.cs:8-37):
public async Task SendAsync(string to, string subject, string html, CancellationToken ct)
{
var msg = new MimeMessage();
msg.From.Add(MailboxAddress.Parse(cfg["SMTP_FROM"]));
msg.To.Add(MailboxAddress.Parse(to));
msg.Subject = subject;
msg.Body = new TextPart("html") { Text = html };
using var client = new SmtpClient();
var host = cfg["SMTP_HOST"];
var port = int.Parse(cfg["SMTP_PORT"]);
log.LogInformation("Connecting to SMTP server {Host}:{Port}", host, port);
await client.ConnectAsync(host, port, false, ct);
log.LogInformation("Connection established");
if (!string.IsNullOrEmpty(cfg["SMTP_USER"]))
{
log.LogInformation("Attempting to authenticate as {User}", cfg["SMTP_USER"]);
await client.AuthenticateAsync(cfg["SMTP_USER"], cfg["SMTP_PASS"], ct);
log.LogInformation("Authentication successful");
}
log.LogInformation("Sending email with subject '{Subject}' to {To}", subject, to);
await client.SendAsync(msg, ct);
log.LogInformation("Email sent successfully");
await client.DisconnectAsync(true, ct);
log.LogInformation("Disconnected from SMTP server");
}
Supported SMTP Providers:
- Gmail (smtp.gmail.com:587)
- SendGrid (smtp.sendgrid.net:587)
- Mailgun (smtp.mailgun.org:587)
- Mailtrap (smtp.mailtrap.io:2525) - for testing
- Amazon SES
- Custom SMTP servers
For Gmail, you must use an App Password instead of your regular password. Enable 2FA and generate an app-specific password.
Retry Logic
The service implements exponential backoff retry logic for transient failures.
Retry Strategy (Services/EmailProcessorBackgroundService.cs:46-72):
catch (Exception ex)
{
log.LogError(ex, "Failed to process notification {NotificationId}", msg.notification_id);
var attempt = (msg.metadata?.ContainsKey("attempt") == true)
? Convert.ToInt32(msg.metadata["attempt"]) + 1
: 1;
if (attempt > 5)
{
log.LogCritical("Attempt {Attempt} failed for notification {NotificationId}. Moving to failed queue.",
attempt, msg.notification_id);
mq.PublishToFailed(ea.Body.ToArray());
await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed", attempt, ex.Message);
}
else
{
var delay = 2000 * (int)Math.Pow(2, attempt - 1);
log.LogWarning("Attempt {Attempt} for notification {NotificationId} failed. Retrying in {Delay}ms",
attempt, msg.notification_id, delay);
await Task.Delay(delay);
msg.metadata ??= new();
msg.metadata["attempt"] = attempt;
var json = JsonConvert.SerializeObject(msg, SnakeCaseJsonSettings.Settings);
mq.Publish("email", Encoding.UTF8.GetBytes(json));
}
}
finally
{
mq.Ack(ea.DeliveryTag, false);
}
Retry Schedule:
| Attempt | Delay | Total Time Elapsed |
|---|
| 1 | 0ms | 0s |
| 2 | 2s | 2s |
| 3 | 4s | 6s |
| 4 | 8s | 14s |
| 5 | 16s | 30s |
| 6+ | Failed | Moved to dead-letter queue |
Formula: delay = 2000ms × 2^(attempt - 1)
After 5 failed attempts, the message is moved to the failed.queue for manual investigation.
Idempotency
The service uses Redis to prevent duplicate email sends.
Idempotency Check (Services/EmailProcessorBackgroundService.cs:19-24):
if (await store.IsProcessedAsync(msg.notification_id))
{
mq.Ack(ea.DeliveryTag, false);
return;
}
Status Tracking:
await store.MarkProcessedAsync(msg.notification_id, msg.request_id); // Success
await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed", attempt, ex.Message); // Failure
Redis Keys:
notification:{notification_id} - Tracks processing status
- Automatically expires after a configurable TTL
Template Rendering
The service renders templates by replacing {{variable}} placeholders with actual values.
Rendering Logic (Services/EmailProcessorBackgroundService.cs:77-78):
private string Render(string template, Dictionary<string, string>? vars) =>
vars?.Aggregate(template, (c, kv) => c.Replace($"{{{{{kv.Key}}}}}", kv.Value)) ?? template;
Example:
// Template
subject = "Welcome, {{name}}!";
body = "Hi {{name}}, thanks for signing up at {{link}}";
// Variables
vars = { "name": "John", "link": "https://example.com" };
// Rendered
subject = "Welcome, John!";
body = "Hi John, thanks for signing up at https://example.com";
Message Processing Flow
The service follows this processing workflow:
- Receive Message - Consume from
email.queue
- Idempotency Check - Check if notification_id already processed
- Validation - Ensure user and template are present, check email preference
- Template Rendering - Replace variables in subject and body
- Email Sending - Send via SMTP
- Status Update - Mark as processed in Redis
- Acknowledgment - ACK message to RabbitMQ
Full Implementation (Services/EmailProcessorBackgroundService.cs:14-72):
protected override Task ExecuteAsync(CancellationToken ct)
{
mq.StartConsuming(async (msg, ea) =>
{
log.LogInformation("Starting to process notification {NotificationId} (Request ID: {RequestId})",
msg.notification_id, msg.request_id);
try
{
// Idempotency check
if (await store.IsProcessedAsync(msg.notification_id))
{
mq.Ack(ea.DeliveryTag, false);
return;
}
var user = msg.user;
var template = msg.template;
// Validation
if (user == null || template == null || !user.preferences.GetValueOrDefault("email"))
{
log.LogWarning("Validation failed for notification {NotificationId}. User or template is null, or email preference is disabled.",
msg.notification_id);
await store.MarkProcessedAsync(msg.notification_id, msg.request_id, "failed");
mq.Ack(ea.DeliveryTag, false);
return;
}
// Render template
log.LogInformation("Rendering template '{TemplateCode}' for user {UserId}",
template.template_code, user.user_id);
var subject = Render(template.subject, msg.variables);
var body = Render(template.body, msg.variables);
// Send email
await sender.SendAsync(user.email, subject, body, ct);
await store.MarkProcessedAsync(msg.notification_id, msg.request_id);
log.LogInformation("Successfully processed notification {NotificationId}", msg.notification_id);
}
catch (Exception ex)
{
// Retry logic (see above)
}
finally
{
mq.Ack(ea.DeliveryTag, false);
}
});
return Task.CompletedTask;
}
Health Endpoint
The service exposes a lightweight health check endpoint without ASP.NET Core.
Implementation (Program.cs:33-53):
_ = Task.Run(async () =>
{
var port = Environment.GetEnvironmentVariable("PORT") ?? "8080";
var listener = new HttpListener();
listener.Prefixes.Add($"http://+:{port}/health/");
listener.Start();
Log.Information($"Health endpoint listening on http://+:{port}/health");
while (true)
{
var context = await listener.GetContextAsync();
var response = context.Response;
var buffer = Encoding.UTF8.GetBytes("{\"status\":\"healthy\"}");
response.ContentType = "application/json";
response.ContentLength64 = buffer.Length;
await response.OutputStream.WriteAsync(buffer);
response.Close();
}
});
Response (200 OK):
Logging
The service uses Serilog for structured logging.
Configuration (Program.cs:8-10):
Log.Logger = new LoggerConfiguration()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();
Log Levels:
Information - Normal operations (connection, message received, email sent)
Warning - Validation failures, retry attempts
Error - Email sending failures, processing errors
Critical - Max retries exceeded, moving to failed queue
Running the Service
Development
dotnet run --project EmailService
Production
dotnet publish -c Release
cd bin/Release/net6.0/publish
dotnet EmailService.dll
Docker
docker build -t email-service .
docker run -p 8003:8080 \
-e RABBITMQ_HOST=rabbitmq \
-e REDIS_HOST=redis \
-e SMTP_HOST=smtp.mailtrap.io \
-e SMTP_PORT=2525 \
-e SMTP_USER=your-username \
-e SMTP_PASS=your-password \
-e [email protected] \
email-service
Error Handling
Validation Failures:
- Missing user or template → Mark as failed, ACK message
- Email preference disabled → Mark as failed, ACK message
Transient Failures (retry):
- SMTP connection timeout
- Network errors
- Rate limiting (429)
Permanent Failures (move to failed queue):
- Invalid email address
- Max retries exceeded
- SMTP authentication failure
- Connection reuse: SMTP connections are created per email (consider connection pooling)
- Concurrency: Single-threaded processing (consider parallel processing)
- Rate limiting: No built-in rate limiting (add if needed)
- Batch processing: Processes one message at a time