Skip to main content

Overview

PriceSignal uses NATS JetStream for reliable, distributed messaging between microservices. The NatsService provides a robust pub/sub interface with guaranteed delivery and message persistence.

Architecture

Implemented in src/Infrastructure/PubSub/NatsService.cs, the service implements the IPubSub interface and provides both publishing and subscription capabilities.

Dependencies

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using System.Text.Json;

Service Configuration

NatsSettings

public class NatsSettings
{
    public string Url { get; set; }
}

appsettings.json

{
  "NatsSettings": {
    "Url": "nats://localhost:4222"
  }
}

Initialization

public class NatsService : IPubSub, IAsyncDisposable
{
    private readonly NatsConnection _connection;
    private readonly INatsJSContext _jetStream;
    private readonly ILogger<NatsService> _logger;
    private readonly JsonSerializerOptions _jsonSerializerOptions;
    private readonly NatsSettings _natsSettings;

    public NatsService(ILogger<NatsService> logger, IOptions<NatsSettings> natsSettings)
    {
        _jsonSerializerOptions = new JsonSerializerOptions()
        {
            PropertyNameCaseInsensitive = true,
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
        };
        
        _logger = logger;
        _natsSettings = natsSettings.Value;
        _connection = new NatsConnection(new NatsOpts(){Url = _natsSettings.Url});
        _jetStream = new NatsJSContext(_connection);
        
        // Create notifications stream on startup
        _jetStream.CreateStreamAsync(new StreamConfig(
            name: "notifications",
            subjects: new[] { "notifications.>" }
        ))
        .GetAwaiter()
        .GetResult();
    }
}

Publishing Messages

The PublishAsync method serializes messages to JSON and publishes them to JetStream:
public async Task PublishAsync<T>(string subject, T message)
{
    var content = JsonSerializer.Serialize(message, _jsonSerializerOptions);
    var ack = await _jetStream.PublishAsync(subject, content);
    ack.EnsureSuccess();

    _logger.LogInformation("Published message to {Subject}: {Message}", subject, content);
}

Usage Example

// Publish a notification
var notification = new Notification
{
    ChatID = 123456789,
    Message = "BTC has crossed $50,000!"
};

await natsService.PublishAsync("notifications.telegram", notification);

Subscribing to Messages

The Subscribe method creates a consumer and processes messages:
public async void Subscribe<T>(string stream, Func<T, Task> handler, string? subject = null)
{
    var consumer = await _jetStream.CreateOrUpdateConsumerAsync(
        stream, 
        new ConsumerConfig($"{stream}_processor_dotnet")
    );
    
    await foreach (var jsMsg in consumer.ConsumeAsync<string>())
    {
        if (jsMsg.Data == null) continue;
        if (subject != null && !jsMsg.Subject.Contains(subject)) continue;
        
        var message = JsonSerializer.Deserialize<T>(jsMsg.Data, _jsonSerializerOptions);
        if (message != null) await handler(message);
        
        await jsMsg.AckAsync();
    }
    
    _logger.LogInformation("Subscribed to {Stream}", stream);
}

Usage Example

// Subscribe to telegram initialization messages
natsService.Subscribe<ChatIDMessage>(
    "notifications",
    async (chatIdMsg) => 
    {
        // Save chat ID to database
        await SaveChatIdToDatabase(chatIdMsg);
    },
    subject: "notifications.init.telegram"
);

Stream Configuration

Notifications Stream

Created automatically on service initialization:
new StreamConfig(
    name: "notifications",
    subjects: new[] { "notifications.>" }
)
This stream captures all subjects matching notifications.*, including:
  • notifications.telegram - Telegram notifications
  • notifications.init.telegram - User registration events
  • notifications.email - Email notifications (future)
  • notifications.webhook - Webhook notifications (future)

Consumer Configuration

Consumers are created with durable names for reliability:
var consumer = await _jetStream.CreateOrUpdateConsumerAsync(
    stream, 
    new ConsumerConfig($"{stream}_processor_dotnet")
);

Consumer Features

  • Durable: Consumer state persists across restarts
  • Automatic acknowledgment: Messages are acked after successful processing
  • Subject filtering: Optional filtering by subject pattern

Message Serialization

JSON Options

private readonly JsonSerializerOptions _jsonSerializerOptions = new()
{
    PropertyNameCaseInsensitive = true,
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
This ensures compatibility with the Go-based Telegram bot service, which uses camelCase JSON.

Example Message Format

{
  "chatId": 123456789,
  "message": "BTC price: $50,000"
}

Cleanup and Disposal

public async ValueTask DisposeAsync()
{
    await _connection.DisposeAsync();
    _logger.LogInformation("NatsService disposed");
}

Dependency Injection Setup

Program.cs

builder.Services.Configure<NatsSettings>(builder.Configuration.GetSection("NatsSettings"));
builder.Services.AddSingleton<IPubSub, NatsService>();

Message Flow Patterns

Alert Notifications

  1. Rule engine triggers alert
  2. Alert handler publishes to notifications.telegram
  3. Telegram bot consumes message
  4. User receives notification

User Registration

  1. User messages Telegram bot
  2. Bot publishes to notifications.init.telegram
  3. .NET service consumes message
  4. User info saved to database

Error Handling

  • Publish failures: EnsureSuccess() throws if message not acknowledged
  • Deserialization errors: Null messages are skipped
  • Consumer errors: Logged but don’t stop subscription

Performance Features

  • Persistent connections: Single connection reused for all operations
  • Async/await: Non-blocking message processing
  • JetStream: Guaranteed delivery and message persistence
  • Subject filtering: Reduces unnecessary message processing

Monitoring

All operations are logged with structured logging:
_logger.LogInformation("Published message to {Subject}: {Message}", subject, content);
_logger.LogInformation("Subscribed to {Stream}", stream);
_logger.LogInformation("NatsService disposed");

Build docs developers (and LLMs) love