Skip to main content

System Architecture

PriceSignal is built on a modern, event-driven architecture designed for real-time processing of cryptocurrency price data and intelligent alert triggering.

Architecture Overview

All components are containerized and can be deployed to Kubernetes for horizontal scaling.

Core Components

GraphQL API Layer

The GraphQL API is built with Hot Chocolate on .NET 8, providing a unified interface for queries, mutations, and subscriptions.

Key Features

  • Queries: Fetch instruments, prices, user rules, and historical data
  • Mutations: Create, update, delete, and toggle price rules
  • Subscriptions: Real-time price updates streamed to clients
[Subscribe(With = nameof(SubscribeToUpdates))]
public Price? OnPriceUpdated(string symbol, [EventMessage] Price price)
{
    return price;
}

public async IAsyncEnumerable<Price> SubscribeToUpdates(
    [Service] ITopicEventReceiver eventReceiver,
    string symbol,
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    var stream = await eventReceiver.SubscribeAsync<Price>(
        nameof(OnPriceUpdated), cancellationToken);
    
    await foreach (var price in stream.ReadEventsAsync()
        .WithCancellation(cancellationToken))
    {
        if (price.Symbol != symbol) continue;
        yield return price;
    }
}
The GraphQL schema is type-safe and auto-generated, ensuring consistency between backend and frontend.

TimescaleDB - Time-Series Database

PriceSignal uses TimescaleDB, a PostgreSQL extension optimized for time-series data, to store price candles and historical data.

Why TimescaleDB?

  • Time-Series Optimized: Hypertables automatically partition data by time
  • SQL Compatible: Use standard PostgreSQL queries with time-series functions
  • Compression: Automatic compression of older data saves storage
  • Fast Aggregations: Built-in functions for OHLCV calculations

Schema Design

Key Tables
-- Instruments (Trading Pairs)
CREATE TABLE instruments (
    id BIGSERIAL PRIMARY KEY,
    entity_id UUID UNIQUE,
    symbol VARCHAR(20) NOT NULL,
    exchange_id BIGINT REFERENCES exchanges(id)
);

-- Price Rules
CREATE TABLE price_rules (
    id BIGSERIAL PRIMARY KEY,
    entity_id UUID UNIQUE,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    is_enabled BOOLEAN DEFAULT true,
    instrument_id BIGINT REFERENCES instruments(id),
    user_id VARCHAR(255),
    last_triggered_at TIMESTAMPTZ,
    last_triggered_price DECIMAL(18,8)
);

-- Price Conditions
CREATE TABLE price_conditions (
    id BIGSERIAL PRIMARY KEY,
    entity_id UUID UNIQUE,
    condition_type VARCHAR(50) NOT NULL,
    value DECIMAL(18,8),
    additional_values JSONB,
    rule_id BIGINT REFERENCES price_rules(id)
);

-- One-Minute Candles (Hypertable)
CREATE TABLE one_min_candle (
    bucket TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20),
    open DECIMAL(18,8),
    high DECIMAL(18,8),
    low DECIMAL(18,8),
    close DECIMAL(18,8),
    volume DECIMAL(18,8)
);

SELECT create_hypertable('one_min_candle', 'bucket');
Hypertables require careful index design. Always include bucket (time) in your indexes for optimal query performance.

Docker Configuration

compose.yaml:33
db:
  image: timescale/timescaledb-ha:pg15
  restart: always
  user: postgres
  volumes:
    - db-data:/var/lib/postgresql/data
  environment:
    - POSTGRES_DB=price_signal
    - POSTGRES_USER=postgres
    - POSTGRES_PASSWORD=example
  ports:
    - 5432:5432
  healthcheck:
    test: ["CMD", "pg_isready"]
    interval: 10s
    timeout: 5s
    retries: 5

NATS JetStream - Event Messaging

NATS provides the event-driven backbone of PriceSignal, enabling decoupled communication between services.

Message Streams

Stream: notifications
Subjects: notifications.>

- notifications.telegram: Telegram messages
- notifications.init.telegram: Telegram connection requests
- notifications.email: Email notifications (future)

NATS Service Implementation

NatsService.cs:12
public class NatsService : IPubSub, IAsyncDisposable
{
    private readonly NatsConnection _connection;
    private readonly INatsJSContext _jetStream;
    
    public NatsService(ILogger<NatsService> logger, 
                      IOptions<NatsSettings> natsSettings)
    {
        _connection = new NatsConnection(
            new NatsOpts(){Url = natsSettings.Value.Url});
        _jetStream = new NatsJSContext(_connection);
        
        // Create notifications stream
        _jetStream.CreateStreamAsync(new StreamConfig(
            name: "notifications",
            subjects: new[] { "notifications.>" }
        )).GetAwaiter().GetResult();
    }
    
    public async Task PublishAsync<T>(string subject, T message)
    {
        var content = JsonSerializer.Serialize(message);
        var ack = await _jetStream.PublishAsync(subject, content);
        ack.EnsureSuccess();
    }
    
    public async void Subscribe<T>(string stream, 
                                   Func<T, Task> handler, 
                                   string? subject = null)
    {
        var consumer = await _jetStream
            .CreateOrUpdateConsumerAsync(stream, 
                new ConsumerConfig($"{stream}_processor_dotnet"));
                
        await foreach (var jsMsg in consumer.ConsumeAsync<string>())
        {
            if (jsMsg.Data == null) continue;
            var message = JsonSerializer.Deserialize<T>(jsMsg.Data);
            if (message != null) await handler(message);
            await jsMsg.AckAsync();
        }
    }
}
NATS JetStream provides persistence, replay, and exactly-once delivery semantics, ensuring no notifications are lost.

Rule Engine

The heart of PriceSignal - evaluates incoming price data against user-defined conditions in real-time.

Rule Evaluation Flow

1

Price Update Received

New price data arrives from the Binance WebSocket or other exchange feeds.
2

Cache Lookup

The Rule Cache is queried for all active rules watching the affected instrument.
var activeRules = ruleCache.GetRulesForInstrument(instrumentId)
    .Where(r => r.IsEnabled);
3

Condition Evaluation

Each condition type has a dedicated evaluator:
  • PriceCondition: Simple price threshold checks
  • PricePercentage: Percentage change calculations
  • TechnicalIndicator: RSI, SMA, EMA calculations from TimescaleDB
  • PriceCrossover: Moving average crossovers
4

Rule Trigger

When all conditions are met, the rule triggers:
PriceRule.cs:28
public void Trigger(decimal price)
{
    LastTriggeredPrice = price;
    LastTriggeredAt = DateTime.UtcNow;
    AddEvent(new PriceRuleTriggeredEvent(this));
    ActivationLogs.Add(new PriceRuleTriggerLog(this));
}
5

Notification Dispatch

The triggered event is handled by the notification service, which publishes to NATS.

Domain Model

PriceRule.cs:8
public class PriceRule : BaseAuditableEntity
{
    public required string Name { get; set; }
    public required string Description { get; set; }
    public bool IsEnabled { get; set; }
    public DateTime? LastTriggeredAt { get; set; }
    public decimal? LastTriggeredPrice { get; set; }
    public NotificationChannelType NotificationChannel { get; set; }
    
    public Instrument Instrument { get; set; }
    public required long InstrumentId { get; set; }
    
    public ICollection<PriceCondition> Conditions { get; set; }
    public User? User { get; set; }
    public string? UserId { get; init; }
    
    public ICollection<PriceRuleTriggerLog> ActivationLogs { get; set; }
}
PriceCondition.cs:6
public class PriceCondition : BaseAuditableEntity
{
    public required string ConditionType { get; set; }
    public decimal Value { get; set; }
    public JsonDocument AdditionalValues { get; set; }
    
    [JsonIgnore]
    public PriceRule Rule { get; set; }
}

public enum ConditionType
{
    PricePercentage,
    Price,
    PriceAction,
    PriceCrossover,
    TechnicalIndicator,
}

Notification System

Multi-channel notification system with pluggable architecture.

Telegram Channel

TelegramNotificationChannel.cs:8
public class TelegramNotificationChannel : INotificationChannel
{
    private readonly IPubSub _pubSub;
    public NotificationChannelType ChannelType => 
        NotificationChannelType.telegram;
    
    public async Task SendAsync(string userId, string message)
    {
        var telegramChats = await context.UserNotificationChannels
            .Where(unc => unc.User.Id == userId && 
                         unc.ChannelType == ChannelType)
            .Select(unc => unc.TelegramChatId)
            .ToListAsync();
        
        foreach (var chatId in telegramChats)
        {
            await _pubSub.PublishAsync("notifications.telegram", 
                new Messageinput(chatId.Value, message));
        }
    }
}
The notification system supports multiple channels per user. Future channels include Email, Discord, and Webhooks.

Telegram Bot Service

Standalone Go service that bridges NATS and Telegram’s Bot API.
telegram-bot/main.go:28
func main() {
    // Connect to NATS
    nc, _ := nats.Connect(os.Getenv("NATS_URL"))
    js, _ := jetstream.New(nc)
    
    // Create notifications stream
    s, _ := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
        Name:     "notifications",
        Subjects: []string{"notifications.>"},
    })
    
    // Create Telegram bot
    bot, _ := tgbotapi.NewBotAPI(os.Getenv("TELEGRAM_BOT_TOKEN"))
    
    // Consume notification messages
    c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable:       "telegram",
        FilterSubject: "notifications.telegram",
        AckPolicy:     jetstream.AckExplicitPolicy,
    })
    
    c.Consume(func(msg jetstream.Msg) {
        var notification Notification
        json.Unmarshal(msg.Data(), &notification)
        
        telegramMsg := tgbotapi.NewMessage(
            notification.ChatID, 
            notification.Message
        )
        bot.Send(telegramMsg)
        msg.Ack()
    })
}

Connection Flow

  1. User sends /start YOUR_USER_ID to the bot
  2. Bot publishes connection request to notifications.init.telegram
  3. Backend consumes message and creates UserNotificationChannel record
  4. Bot confirms connection to user
telegram-bot/main.go:99
for update := range updates {
    chatID := update.Message.Chat.ID
    username := update.Message.From.UserName
    userId := update.Message.CommandArguments()
    
    chatIDMessage := ChatIDMessage{
        ChatID:   chatID,
        Username: username,
        UserId:   userId,
    }
    
    js.PublishAsync("notifications.init.telegram", 
                   json.Marshal(chatIDMessage))
    
    bot.Send(tgbotapi.NewMessage(chatID, 
        "Hello, your chat ID has been recorded!"))
}

React Frontend

Modern React application with TypeScript, Apollo Client, and shadcn/ui components.

Key Features

  • Real-time subscriptions: Live price charts via GraphQL subscriptions
  • Rule builder: Intuitive UI for creating complex alert conditions
  • Dashboard: Overview of active rules and recent triggers
  • Type-safe: Auto-generated GraphQL types via GraphQL Code Generator
create-rule.tsx:42
const createRuleMutation = graphql(`
    mutation CreatePriceRule($newRule: PriceRuleInput!) {
        createPriceRule(input:$newRule) {
            id
            name
            description
        }
    }
`)

const onSubmit = (data: z.infer<typeof CreateRuleFormSchema>) => {
    createRule({
        variables: {
            newRule: {
                id: uuidv4(),
                name: data.name,
                description: data.description,
                instrumentId: data.symbol,
                conditions: data.conditions.map(condition => ({
                    conditionType: condition.conditionType,
                    value: condition.threshold, 
                    additionalValues: JSON.stringify({
                        name: condition.indicator, 
                        period: condition.period, 
                        direction: condition.direction
                    })
                }))
            }
        }
    })
}

Price Ingestion Pipeline

Binance WebSocket Integration

Binance WebSocket → Price Fetcher Service → TimescaleDB Hypertable

                                    GraphQL Subscription

                                    React Frontend
1

WebSocket Connection

The BinancePriceFetcherService maintains persistent WebSocket connections to Binance for subscribed trading pairs.
2

Price Processing

Incoming kline (candlestick) data is parsed and normalized into the internal Price model.
3

Database Insert

Prices are batch-inserted into the TimescaleDB hypertable for efficient storage.
INSERT INTO one_min_candle (bucket, symbol, open, high, low, close, volume)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (bucket, symbol) DO UPDATE SET
    open = EXCLUDED.open,
    high = EXCLUDED.high,
    low = EXCLUDED.low,
    close = EXCLUDED.close,
    volume = EXCLUDED.volume;
4

Event Publishing

Price updates are published to GraphQL subscribers and trigger rule evaluation.

Deployment Architecture

Docker Compose (Development)

compose.yaml:10
services:
  server:
    image: nayth/price-signal-graph:latest
    ports:
      - 8080:8080
    environment:
      - Nats__Url=nats://host.docker.internal:4222
      - ASPNETCORE_ENVIRONMENT=Development
  
  db:
    image: timescale/timescaledb-ha:pg15
    environment:
      - POSTGRES_DB=price_signal
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=example
    ports:
      - 5432:5432

Kubernetes (Production)

Infrastructure as Code is managed with Pulumi in the /infra directory. The deployment includes separate pods for GraphQL API, Telegram bot, and background services.

Performance Characteristics

  • Price Ingestion: 10,000+ updates/second per instance
  • Rule Evaluation: Sub-millisecond per rule
  • GraphQL Latency: <50ms p99 for queries
  • Notification Delivery: <1 second from trigger to Telegram
  • Database: Handles 100M+ candle rows with automatic compression
Rule cache is in-memory. Ensure sufficient RAM allocation (recommend 2GB minimum) for production deployments with thousands of active rules.

Monitoring and Observability

  • Logging: Structured logging via Serilog
  • Metrics: Custom metrics exported for Prometheus
  • Tracing: Distributed tracing via OpenTelemetry (planned)
  • Health Checks: /health endpoints for all services

Scaling Considerations

Horizontal Scaling

  • GraphQL API: Stateless, scale behind load balancer
  • Rule Engine: Requires distributed cache (Redis) for multi-instance deployments
  • Database: TimescaleDB supports read replicas and multi-node clustering
  • NATS: Cluster mode for high availability

Vertical Scaling

  • Database: Increase RAM for larger time-series datasets
  • API Server: CPU-bound during rule evaluation spikes
For deployments with >10,000 active rules, consider implementing rule sharding across multiple rule engine instances.

Security

  • Authentication: JWT-based authentication with secure token storage
  • Authorization: User-scoped queries prevent cross-user data access
  • Database: Parameterized queries prevent SQL injection
  • API: Rate limiting prevents abuse
  • Secrets: Environment variables and secret managers for sensitive data

Next Steps

API Reference

Explore the GraphQL schema

Deployment Guide

Deploy PriceSignal to production

Build docs developers (and LLMs) love