Skip to main content

Overview

The Infrastructure layer implements interfaces defined in the Application layer. It contains persistence logic (Entity Framework Core), messaging (NATS), external API clients (Binance, Alpaca), and other technical concerns.

Database Context

AppDbContext

EF Core implementation of IAppDbContext.
public class AppDbContext(DbContextOptions<AppDbContext> options) 
    : DbContext(options), IAppDbContext
{
    public DbSet<User> Users => Set<User>();
    public DbSet<UserNotificationChannel> UserNotificationChannels => Set<UserNotificationChannel>();
    public DbSet<Exchange> Exchanges => Set<Exchange>();
    public DbSet<Instrument> Instruments => Set<Instrument>();
    public DbSet<PriceRule> PriceRules => Set<PriceRule>();
    public DbSet<Waitlist> Waitlists => Set<Waitlist>();
    public DbSet<PriceRuleTriggerLog> PriceRuleTriggerLogs => Set<PriceRuleTriggerLog>();
    public DbSet<PriceCondition> PriceConditions => Set<PriceCondition>();
    public DbSet<InstrumentPrice> InstrumentPrices => Set<InstrumentPrice>();
    public DbSet<OneMinCandle> OneMinCandle => Set<OneMinCandle>();
    public DbSet<FiveMinCandle> FiveMinCandle => Set<FiveMinCandle>();
    public DbSet<TenMinCandle> TenMinCandle => Set<TenMinCandle>();
    public DbSet<FifteenMinCandle> FifteenMinCandle => Set<FifteenMinCandle>();
    public DbSet<OneHourCandle> OneHourCandle => Set<OneHourCandle>();

    protected override void OnModelCreating(ModelBuilder builder)
    {
        builder.HasPostgresExtension("uuid-ossp");
        builder.HasPostgresEnum<NotificationChannelType>(
            schema: null, name: "notification_channel_type");
        builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
        
        base.OnModelCreating(builder);
    }
}
src/Infrastructure/Data/AppDbContext.cs:15 Features:
  • PostgreSQL with enum support
  • UUID extension for GUID generation
  • Auto-applies entity configurations
  • Multiple candle timeframes for charts

Entity Configurations

PriceRuleConfiguration

Configures the PriceRule entity mapping.
public class PriceRuleConfigurations : IEntityTypeConfiguration<PriceRule>
{
    public void Configure(EntityTypeBuilder<PriceRule> builder)
    {
        builder.Property(pc => pc.EntityId)
            .HasDefaultValueSql("uuid_generate_v4()")
            .ValueGeneratedOnAdd();
        
        builder.Property(pr => pr.Name)
            .HasMaxLength(255)
            .IsRequired();

        builder.Property(pr => pr.Description)
            .HasMaxLength(2000);
        
        builder.Property(pr => pr.IsEnabled)
            .HasDefaultValue(false)
            .IsRequired();
        
        builder.Property(pr => pr.LastTriggeredAt)
            .HasDefaultValue(null);

        builder.Property(pr => pr.NotificationChannel)
            .HasDefaultValue(NotificationChannelType.none)
            .HasColumnType("notification_channel_type")
            .IsRequired();
            
        builder.Property(pr => pr.CreatedAt)
            .HasDefaultValueSql("now()")
            .ValueGeneratedOnAdd();
        
        builder.Property(pr => pr.ModifiedAt)
            .HasDefaultValueSql("now()")
            .ValueGeneratedOnUpdate();

        builder.Property(pr => pr.DeletedAt)
            .HasDefaultValue(null);

        builder.Property(pr => pr.LastTriggeredPrice)
            .HasColumnType("double precision")
            .HasDefaultValue(null);

        builder.HasMany(r => r.Conditions);
        builder.HasMany(r => r.ActivationLogs);
        builder.HasQueryFilter(pr => pr.DeletedAt == null);
    }
}
src/Infrastructure/Data/Configurations/PriceRuleConfigurations.cs:10 Key features:
  • Auto-generated UUIDs at database level
  • PostgreSQL enum mapping
  • Query filter for soft deletes
  • Timestamp defaults with now()

Interceptors

AuditableEntityInterceptor

Automatically sets audit timestamps and dispatches domain events.
public class AuditableEntityInterceptor(TimeProvider dateTime, IMediator mediator) 
    : SaveChangesInterceptor
{
    private bool _dispatchedEvents = false;
    
    public override async ValueTask<InterceptionResult<int>> SavingChangesAsync(
        DbContextEventData eventData, 
        InterceptionResult<int> result, 
        CancellationToken cancellationToken = default)
    {
        if (_dispatchedEvents) return await base.SavingChangesAsync(eventData, result, cancellationToken);
        UpdateEntities(eventData.Context);
        await DispatchEvents(eventData.Context);
        _dispatchedEvents = true;

        return await base.SavingChangesAsync(eventData, result, cancellationToken);
    }

    public void UpdateEntities(DbContext? context)
    {
        if (context == null) return;

        foreach (var entry in context.ChangeTracker.Entries<BaseAuditableEntity>())
        {
            if (entry.State == EntityState.Added)
            {
                entry.Entity.CreatedAt = dateTime.GetUtcNow().UtcDateTime;
            } 

            if (entry.State == EntityState.Added || 
                entry.State == EntityState.Modified || 
                entry.HasChangedOwnedEntities())
            {
                entry.Entity.ModifiedAt = dateTime.GetUtcNow().UtcDateTime;
            }
            
            if (entry.State == EntityState.Deleted)
            {
                entry.Entity.DeletedAt = dateTime.GetUtcNow().UtcDateTime;
                entry.State = EntityState.Modified; // Soft delete
            }
        }
    }
    
    public async Task DispatchEvents(DbContext? context)
    {
        if (context == null) return;

        var entities = context.ChangeTracker
            .Entries<BaseEntity>()
            .Where(e => e.Entity.Events.Any())
            .Select(e => e.Entity);

        var domainEvents = entities
            .SelectMany(e => e.Events)
            .ToList();

        entities.ToList().ForEach(e => e.ClearEvents());

        foreach (var domainEvent in domainEvents)
            await mediator.Publish(domainEvent);
        
        _dispatchedEvents = true;
    }
}
src/Infrastructure/Data/Interceptors/AuditableEntityInterceptor.cs:9 Responsibilities:
  1. Audit timestamps: Automatically set CreatedAt, ModifiedAt
  2. Soft deletes: Convert DELETE to UPDATE with DeletedAt
  3. Domain events: Publish events via MediatR before save
  4. Idempotency: Ensures events dispatched only once

Messaging Infrastructure

NatsService

Implements IPubSub using NATS JetStream for reliable messaging.
public class NatsService : IPubSub, IAsyncDisposable
{
    private readonly NatsConnection _connection;
    private readonly INatsJSContext _jetStream;
    private readonly ILogger<NatsService> _logger;
    private readonly JsonSerializerOptions _jsonSerializerOptions;
    private readonly NatsSettings _natsSettings;

    public NatsService(ILogger<NatsService> logger, IOptions<NatsSettings> natsSettings)
    {
        _jsonSerializerOptions = new JsonSerializerOptions()
        {
            PropertyNameCaseInsensitive = true,
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
        };
        
        _logger = logger;
        _natsSettings = natsSettings.Value;
        _connection = new NatsConnection(new NatsOpts() { Url = _natsSettings.Url });
        _jetStream = new NatsJSContext(_connection);
        
        _jetStream.CreateStreamAsync(new StreamConfig(
                name: "notifications",
                subjects: new[] { "notifications.>" }))
            .GetAwaiter()
            .GetResult();
    }
    
    public async Task PublishAsync<T>(string subject, T message)
    {
        var content = JsonSerializer.Serialize(message, _jsonSerializerOptions);
        var ack = await _jetStream.PublishAsync(subject, content);
        ack.EnsureSuccess();

        _logger.LogInformation("Published message to {Subject}: {Message}", subject, content);
    }

    public async void Subscribe<T>(string stream, Func<T, Task> handler, string? subject = null)
    {
        var consumer = await _jetStream.CreateOrUpdateConsumerAsync(
            stream, 
            new ConsumerConfig($"{stream}_processor_dotnet"));
        
        await foreach (var jsMsg in consumer.ConsumeAsync<string>())
        {
            if (jsMsg.Data == null) continue;
            if (subject != null && !jsMsg.Subject.Contains(subject)) continue;
            
            var message = JsonSerializer.Deserialize<T>(jsMsg.Data, _jsonSerializerOptions);
            if (message != null) await handler(message);
            await jsMsg.AckAsync();
        }
        
        _logger.LogInformation("Subscribed to {Stream}", stream);
    }

    public async ValueTask DisposeAsync()
    {
        await _connection.DisposeAsync();
        _logger.LogInformation("NatsService disposed");
    }
}
src/Infrastructure/PubSub/NatsService.cs:12 Features:
  • JetStream: Persistent, reliable message delivery
  • Auto-ack: Messages acknowledged after processing
  • Stream creation: Auto-creates notification stream
  • JSON serialization: Camel case for consistency
  • Subject filtering: Optional filtering in subscriptions
Use cases:
  • Publishing price rule triggers
  • Distributing real-time price updates
  • Coordinating between microservices

Dependency Injection

Infrastructure Services Registration

public static class DependencyInjection
{
    public static IServiceCollection AddInfrastructure(
        this IServiceCollection services, 
        IConfiguration configuration,
        bool isDevelopment)
    {
        services.AddSingleton(TimeProvider.System);
        services.AddSingleton<IPubSub, NatsService>();

        string connectionString;
        if (isDevelopment)
        {
            connectionString = configuration.GetConnectionString("PriceSignalDB") ??
                throw new InvalidOperationException("Connection string not found");
        }
        else
        {
            connectionString = ConvertToNpgsqlConnectionString(File.ReadAllText("/app/secrets/uri"));
        }
        
        var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
        dataSourceBuilder.MapEnum<NotificationChannelType>();
        var dataSource = dataSourceBuilder.Build();
        
        services.AddDbContext<IAppDbContext, AppDbContext>((sp, options) =>
        {
            options.AddInterceptors(sp.GetServices<ISaveChangesInterceptor>());
            options.UseNpgsql(dataSource).UseSnakeCaseNamingConvention();
        });
        
        services.AddScoped<ISaveChangesInterceptor, AuditableEntityInterceptor>();

        var binanceSettings = new BinanceSettings();
        configuration.GetSection(BinanceSettings.Binance).Bind(binanceSettings);
        var binanceWebsocketUrl = binanceSettings.WebsocketUrl ?? 
            throw new InvalidOperationException();
        
        services.AddSingleton<IWebsocketClientProvider>(sp =>
        {
            var logger = sp.GetRequiredService<ILogger<WebsocketClientProvider>>();
            return new WebsocketClientProvider(binanceWebsocketUrl, logger);
        });
        
        services.AddRefitClient<IBinanceApi>()
            .ConfigureHttpClient(c =>
                c.BaseAddress = new Uri(binanceSettings.ApiUrl ??
                    throw new InvalidOperationException()))
            .AddPolicyHandler(HttpPolicyExtensions.GetRateLimitPolicy());

        var alpacaSettings = new AlpacaSettings();
        configuration.GetSection(AlpacaSettings.Alpaca).Bind(alpacaSettings);
        services.AddRefitClient<IAlpacaApi>()
            .ConfigureHttpClient(c =>
            {
                c.BaseAddress = new Uri(alpacaSettings.ApiUrl ??
                    throw new InvalidOperationException());
                c.DefaultRequestHeaders.Add("APCA-API-KEY-ID", 
                    alpacaSettings.ApiKey ?? throw new InvalidOperationException());
                c.DefaultRequestHeaders.Add("APCA-API-SECRET-KEY", 
                    alpacaSettings.ApiSecret ?? throw new InvalidOperationException());
            });

        return services;
    }
}
src/Infrastructure/DependencyInjection.cs:20 Key configurations:
  • PostgreSQL: Snake case naming, enum mapping
  • NATS: Singleton pub/sub service
  • Refit: HTTP clients for Binance and Alpaca APIs
  • Rate limiting: Policy handlers for external APIs
  • Interceptors: Audit and domain event handling

Database Features

Snake Case Naming Convention

options.UseSnakeCaseNamingConvention();
Maps C# PascalCase to PostgreSQL snake_case:
  • PriceRuleprice_rule
  • LastTriggeredAtlast_triggered_at

PostgreSQL Enums

builder.HasPostgresEnum<NotificationChannelType>(
    schema: null, 
    name: "notification_channel_type");
Native enum support for better type safety and performance.

Soft Deletes

Global query filter prevents deleted records from appearing in queries:
builder.HasQueryFilter(pr => pr.DeletedAt == null);

External Service Clients

Binance WebSocket

Real-time price streaming from Binance.
services.AddSingleton<IWebsocketClientProvider>(sp =>
{
    var logger = sp.GetRequiredService<ILogger<WebsocketClientProvider>>();
    return new WebsocketClientProvider(binanceWebsocketUrl, logger);
});
src/Infrastructure/DependencyInjection.cs:66

Binance REST API

Historical data and exchange info via Refit.
services.AddRefitClient<IBinanceApi>()
    .ConfigureHttpClient(c => c.BaseAddress = new Uri(binanceSettings.ApiUrl))
    .AddPolicyHandler(HttpPolicyExtensions.GetRateLimitPolicy());
src/Infrastructure/DependencyInjection.cs:72

Alpaca News API

Crypto news data with authentication headers.
services.AddRefitClient<IAlpacaApi>()
    .ConfigureHttpClient(c =>
    {
        c.BaseAddress = new Uri(alpacaSettings.ApiUrl);
        c.DefaultRequestHeaders.Add("APCA-API-KEY-ID", alpacaSettings.ApiKey);
        c.DefaultRequestHeaders.Add("APCA-API-SECRET-KEY", alpacaSettings.ApiSecret);
    });
src/Infrastructure/DependencyInjection.cs:80

Configuration Settings

Connection String Handling

Supports both development and production environments:
string connectionString;
if (isDevelopment)
{
    connectionString = configuration.GetConnectionString("PriceSignalDB");
}
else
{
    // Read from Kubernetes secret
    connectionString = ConvertToNpgsqlConnectionString(File.ReadAllText("/app/secrets/uri"));
}

URI to Connection String Conversion

Parses PostgreSQL URIs for cloud deployments:
private static string ConvertToNpgsqlConnectionString(string postgresUri)
{
    var uri = new Uri(postgresUri);
    string host = uri.Host;
    int port = uri.Port;
    string database = uri.AbsolutePath.Trim('/');
    string username = uri.UserInfo.Split(':')[0];
    string password = uri.UserInfo.Split(':')[1];

    return $"Host={host};Port={port};Database={database};Username={username};Password={password}";
}

Best Practices

Repository Pattern

PriceSignal uses DbContext directly instead of repositories because:
  • EF Core already implements Unit of Work
  • IQueryable provides flexible querying
  • Less abstraction overhead
  • Better integration with HotChocolate GraphQL

Transaction Management

Domain events dispatched within the same transaction:
  1. SaveChanges begins transaction
  2. Interceptor updates audit fields
  3. Interceptor publishes domain events
  4. All changes committed or rolled back together

Next Steps

Build docs developers (and LLMs) love