Skip to main content

Overview

The BinancePriceFetcherService is a background service that connects to Binance WebSocket streams to fetch real-time cryptocurrency prices. It manages dynamic subscriptions based on active price rules and maintains price history for technical analysis.

Architecture

Implemented in src/PriceSignal/BackgroundServices/BinancePriceFetcherService.cs, the service operates as a hosted background service that runs continuously throughout the application lifecycle.

Key Components

  • IWebsocketClientProvider: Manages WebSocket connections to Binance
  • PriceHistoryCache: In-memory cache for historical price data
  • ConcurrentBag<string>: Thread-safe collection of active trading symbols
  • AppDbContext: Database context for loading historical data

Service Initialization

public class BinancePriceFetcherService(
    IWebsocketClientProvider websocketClientProvider,
    IServiceProvider serviceProvider,
    PriceHistoryCache priceHistoryCache,
    ConcurrentBag<string> symbols)
    : BackgroundService

Execution Flow

Startup Process

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var webSocketUrl = websocketClientProvider.GetUri();

    using var scope = serviceProvider.CreateScope();
    var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
    
    // Get all enabled price rules from database
    var dBsymbols = await dbContext.PriceRules
        .Where(x => x.IsEnabled)
        .Select(r => r.Instrument.Symbol)
        .Distinct()
        .ToListAsync(cancellationToken: stoppingToken);

    if (dBsymbols.Any())
    {
        // Build WebSocket stream URLs
        var symbolStreams = dBsymbols
            .Select(s => $"{s.ToLower()}@aggTrade")
            .Aggregate((s1, s2) => $"{s1}/{s2}");
            
        // Update WebSocket URL with new streams
        string newUrl = new StringBuilder(webSocketUrl)
            .Append('/')
            .Append(appendedStreams)
            .ToString();
        websocketClientProvider.SetUri(newUrl);
    }
}

Price History Loading

The service loads the last 500 candles for each symbol on startup:
foreach (var symbol in dBsymbols)
{
    symbols.Add(symbol);
    var history = new List<IPrice>(dbContext.OneMinCandle
        .Where(o => o.Symbol == symbol)
        .OrderByDescending(o => o.Bucket.DateTime)
        .Take(500)
        .Select(o => new PriceQuote(new Price
        {
            Symbol = o.Symbol,
            Open = o.Open,
            High = o.High,
            Low = o.High,
            Close = o.Close,
            Volume = o.Volume,
            Bucket = o.Bucket,
        }))
        .ToList());
    priceHistoryCache.LoadPriceHistory(symbol, history);
}

WebSocket Stream Management

Starting the WebSocket Connection

websocketClientProvider.Start(async message =>
{
    try
    {
        await WebsocketChannel.SocketChannel.Writer.WriteAsync(message, stoppingToken);
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        throw;
    }
});

Dynamic Subscription Updates

The service can dynamically add or remove symbol subscriptions:
public void UpdateSubscriptionsAsync()
{
    using var scope = serviceProvider.CreateScope();
    var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
    var dBsymbols = dbContext.PriceRules
        .Select(r => r.Instrument.Symbol)
        .Except(symbols)
        .Distinct()
        .ToList();

    var currentStreams = _activeStreams.Keys.ToList();
    var newStreams = symbols.Except(currentStreams)
        .Select(s => $"{s.ToLower()}@aggTrade")
        .ToList();
    var obsoleteStreams = currentStreams.Except(symbols
        .Select(s => $"{s.ToLower()}@aggTrade"))
        .ToList();

    if (newStreams.Count != 0)
    {
        websocketClientProvider.Subscribe(newStreams);
        foreach (var stream in newStreams)
        {
            _activeStreams[stream] = true;
        }
    }

    if (obsoleteStreams.Count == 0) return;

    websocketClientProvider.Unsubscribe(obsoleteStreams);
    foreach (var stream in obsoleteStreams)
    {
        _activeStreams.TryRemove(stream, out _);
    }
}

Symbol Management

Add Symbol

public void AddSymbol(string symbol)
{
    symbols.Add(symbol);
}

Remove Symbol

public void RemoveSymbol(string symbol)
{
    symbols = new ConcurrentBag<string>(symbols.Except(new[] {symbol}));
}

Shutdown Handling

public override Task StopAsync(CancellationToken stoppingToken)
{
    websocketClientProvider.Stop();
    return base.StopAsync(stoppingToken);
}

Binance Stream Format

The service subscribes to Binance aggregated trade streams using the format:
ws://stream.binance.com:9443/stream?streams=btcusdt@aggTrade/ethusdt@aggTrade

Performance Features

  • ConcurrentBag for thread-safe symbol management
  • Channel-based message processing for backpressure handling
  • Lazy loading of price history (500 candles per symbol)
  • Dynamic subscriptions to minimize bandwidth usage

Build docs developers (and LLMs) love