System Architecture
PriceSignal is built on a modern, event-driven architecture designed for real-time processing of cryptocurrency price data and intelligent alert triggering.
Architecture Overview
All components are containerized and can be deployed to Kubernetes for horizontal scaling.
Core Components
GraphQL API Layer
The GraphQL API is built with Hot Chocolate on .NET 8, providing a unified interface for queries, mutations, and subscriptions.
Key Features
Queries : Fetch instruments, prices, user rules, and historical data
Mutations : Create, update, delete, and toggle price rules
Subscriptions : Real-time price updates streamed to clients
Price Subscription (PriceSubscriptions.cs:14)
Rule Mutation (PriceRuleMutations.cs:18)
[ Subscribe ( With = nameof ( SubscribeToUpdates ))]
public Price ? OnPriceUpdated ( string symbol , [ EventMessage ] Price price )
{
return price ;
}
public async IAsyncEnumerable < Price > SubscribeToUpdates (
[ Service ] ITopicEventReceiver eventReceiver ,
string symbol ,
[ EnumeratorCancellation ] CancellationToken cancellationToken )
{
var stream = await eventReceiver . SubscribeAsync < Price >(
nameof ( OnPriceUpdated ), cancellationToken );
await foreach ( var price in stream . ReadEventsAsync ()
. WithCancellation ( cancellationToken ))
{
if ( price . Symbol != symbol ) continue ;
yield return price ;
}
}
The GraphQL schema is type-safe and auto-generated, ensuring consistency between backend and frontend.
TimescaleDB - Time-Series Database
PriceSignal uses TimescaleDB, a PostgreSQL extension optimized for time-series data, to store price candles and historical data.
Why TimescaleDB?
Time-Series Optimized : Hypertables automatically partition data by time
SQL Compatible : Use standard PostgreSQL queries with time-series functions
Compression : Automatic compression of older data saves storage
Fast Aggregations : Built-in functions for OHLCV calculations
Schema Design
-- Instruments (Trading Pairs)
CREATE TABLE instruments (
id BIGSERIAL PRIMARY KEY ,
entity_id UUID UNIQUE ,
symbol VARCHAR ( 20 ) NOT NULL ,
exchange_id BIGINT REFERENCES exchanges(id)
);
-- Price Rules
CREATE TABLE price_rules (
id BIGSERIAL PRIMARY KEY ,
entity_id UUID UNIQUE ,
name VARCHAR ( 255 ) NOT NULL ,
description TEXT ,
is_enabled BOOLEAN DEFAULT true,
instrument_id BIGINT REFERENCES instruments(id),
user_id VARCHAR ( 255 ),
last_triggered_at TIMESTAMPTZ ,
last_triggered_price DECIMAL ( 18 , 8 )
);
-- Price Conditions
CREATE TABLE price_conditions (
id BIGSERIAL PRIMARY KEY ,
entity_id UUID UNIQUE ,
condition_type VARCHAR ( 50 ) NOT NULL ,
value DECIMAL ( 18 , 8 ),
additional_values JSONB,
rule_id BIGINT REFERENCES price_rules(id)
);
-- One-Minute Candles (Hypertable)
CREATE TABLE one_min_candle (
bucket TIMESTAMPTZ NOT NULL ,
symbol VARCHAR ( 20 ),
open DECIMAL ( 18 , 8 ),
high DECIMAL ( 18 , 8 ),
low DECIMAL ( 18 , 8 ),
close DECIMAL ( 18 , 8 ),
volume DECIMAL ( 18 , 8 )
);
SELECT create_hypertable( 'one_min_candle' , 'bucket' );
Hypertables require careful index design. Always include bucket (time) in your indexes for optimal query performance.
Docker Configuration
db :
image : timescale/timescaledb-ha:pg15
restart : always
user : postgres
volumes :
- db-data:/var/lib/postgresql/data
environment :
- POSTGRES_DB=price_signal
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=example
ports :
- 5432:5432
healthcheck :
test : [ "CMD" , "pg_isready" ]
interval : 10s
timeout : 5s
retries : 5
NATS JetStream - Event Messaging
NATS provides the event-driven backbone of PriceSignal, enabling decoupled communication between services.
Message Streams
Notifications Stream
Prices Stream
Stream: notifications
Subjects: notifications.>
- notifications.telegram: Telegram messages
- notifications.init.telegram: Telegram connection requests
- notifications.email: Email notifications (future)
Stream: prices
Subjects: prices.>
- prices.binance.*: Binance price updates
- prices.coinbase.*: Coinbase price updates (future)
NATS Service Implementation
public class NatsService : IPubSub , IAsyncDisposable
{
private readonly NatsConnection _connection ;
private readonly INatsJSContext _jetStream ;
public NatsService ( ILogger < NatsService > logger ,
IOptions < NatsSettings > natsSettings )
{
_connection = new NatsConnection (
new NatsOpts (){ Url = natsSettings . Value . Url });
_jetStream = new NatsJSContext ( _connection );
// Create notifications stream
_jetStream . CreateStreamAsync ( new StreamConfig (
name : "notifications" ,
subjects : new [] { "notifications.>" }
)). GetAwaiter (). GetResult ();
}
public async Task PublishAsync < T >( string subject , T message )
{
var content = JsonSerializer . Serialize ( message );
var ack = await _jetStream . PublishAsync ( subject , content );
ack . EnsureSuccess ();
}
public async void Subscribe < T >( string stream ,
Func < T , Task > handler ,
string ? subject = null )
{
var consumer = await _jetStream
. CreateOrUpdateConsumerAsync ( stream ,
new ConsumerConfig ( $" { stream } _processor_dotnet" ));
await foreach ( var jsMsg in consumer . ConsumeAsync < string >())
{
if ( jsMsg . Data == null ) continue ;
var message = JsonSerializer . Deserialize < T >( jsMsg . Data );
if ( message != null ) await handler ( message );
await jsMsg . AckAsync ();
}
}
}
NATS JetStream provides persistence, replay, and exactly-once delivery semantics, ensuring no notifications are lost.
Rule Engine
The heart of PriceSignal - evaluates incoming price data against user-defined conditions in real-time.
Rule Evaluation Flow
Price Update Received
New price data arrives from the Binance WebSocket or other exchange feeds.
Cache Lookup
The Rule Cache is queried for all active rules watching the affected instrument. var activeRules = ruleCache . GetRulesForInstrument ( instrumentId )
. Where ( r => r . IsEnabled );
Condition Evaluation
Each condition type has a dedicated evaluator:
PriceCondition : Simple price threshold checks
PricePercentage : Percentage change calculations
TechnicalIndicator : RSI, SMA, EMA calculations from TimescaleDB
PriceCrossover : Moving average crossovers
Rule Trigger
When all conditions are met, the rule triggers: public void Trigger ( decimal price )
{
LastTriggeredPrice = price ;
LastTriggeredAt = DateTime . UtcNow ;
AddEvent ( new PriceRuleTriggeredEvent ( this ));
ActivationLogs . Add ( new PriceRuleTriggerLog ( this ));
}
Notification Dispatch
The triggered event is handled by the notification service, which publishes to NATS.
Domain Model
public class PriceRule : BaseAuditableEntity
{
public required string Name { get ; set ; }
public required string Description { get ; set ; }
public bool IsEnabled { get ; set ; }
public DateTime ? LastTriggeredAt { get ; set ; }
public decimal ? LastTriggeredPrice { get ; set ; }
public NotificationChannelType NotificationChannel { get ; set ; }
public Instrument Instrument { get ; set ; }
public required long InstrumentId { get ; set ; }
public ICollection < PriceCondition > Conditions { get ; set ; }
public User ? User { get ; set ; }
public string ? UserId { get ; init ; }
public ICollection < PriceRuleTriggerLog > ActivationLogs { get ; set ; }
}
public class PriceCondition : BaseAuditableEntity
{
public required string ConditionType { get ; set ; }
public decimal Value { get ; set ; }
public JsonDocument AdditionalValues { get ; set ; }
[ JsonIgnore ]
public PriceRule Rule { get ; set ; }
}
public enum ConditionType
{
PricePercentage ,
Price ,
PriceAction ,
PriceCrossover ,
TechnicalIndicator ,
}
Notification System
Multi-channel notification system with pluggable architecture.
Telegram Channel
TelegramNotificationChannel.cs:8
public class TelegramNotificationChannel : INotificationChannel
{
private readonly IPubSub _pubSub ;
public NotificationChannelType ChannelType =>
NotificationChannelType . telegram ;
public async Task SendAsync ( string userId , string message )
{
var telegramChats = await context . UserNotificationChannels
. Where ( unc => unc . User . Id == userId &&
unc . ChannelType == ChannelType )
. Select ( unc => unc . TelegramChatId )
. ToListAsync ();
foreach ( var chatId in telegramChats )
{
await _pubSub . PublishAsync ( "notifications.telegram" ,
new Messageinput ( chatId . Value , message ));
}
}
}
The notification system supports multiple channels per user. Future channels include Email, Discord, and Webhooks.
Telegram Bot Service
Standalone Go service that bridges NATS and Telegram’s Bot API.
func main () {
// Connect to NATS
nc , _ := nats . Connect ( os . Getenv ( "NATS_URL" ))
js , _ := jetstream . New ( nc )
// Create notifications stream
s , _ := js . CreateOrUpdateStream ( ctx , jetstream . StreamConfig {
Name : "notifications" ,
Subjects : [] string { "notifications.>" },
})
// Create Telegram bot
bot , _ := tgbotapi . NewBotAPI ( os . Getenv ( "TELEGRAM_BOT_TOKEN" ))
// Consume notification messages
c , _ := s . CreateOrUpdateConsumer ( ctx , jetstream . ConsumerConfig {
Durable : "telegram" ,
FilterSubject : "notifications.telegram" ,
AckPolicy : jetstream . AckExplicitPolicy ,
})
c . Consume ( func ( msg jetstream . Msg ) {
var notification Notification
json . Unmarshal ( msg . Data (), & notification )
telegramMsg := tgbotapi . NewMessage (
notification . ChatID ,
notification . Message
)
bot . Send ( telegramMsg )
msg . Ack ()
})
}
Connection Flow
User sends /start YOUR_USER_ID to the bot
Bot publishes connection request to notifications.init.telegram
Backend consumes message and creates UserNotificationChannel record
Bot confirms connection to user
for update := range updates {
chatID := update . Message . Chat . ID
username := update . Message . From . UserName
userId := update . Message . CommandArguments ()
chatIDMessage := ChatIDMessage {
ChatID : chatID ,
Username : username ,
UserId : userId ,
}
js . PublishAsync ( "notifications.init.telegram" ,
json . Marshal ( chatIDMessage ))
bot . Send ( tgbotapi . NewMessage ( chatID ,
"Hello, your chat ID has been recorded!" ))
}
React Frontend
Modern React application with TypeScript, Apollo Client, and shadcn/ui components.
Key Features
Real-time subscriptions : Live price charts via GraphQL subscriptions
Rule builder : Intuitive UI for creating complex alert conditions
Dashboard : Overview of active rules and recent triggers
Type-safe : Auto-generated GraphQL types via GraphQL Code Generator
const createRuleMutation = graphql ( `
mutation CreatePriceRule($newRule: PriceRuleInput!) {
createPriceRule(input:$newRule) {
id
name
description
}
}
` )
const onSubmit = ( data : z . infer < typeof CreateRuleFormSchema >) => {
createRule ({
variables: {
newRule: {
id: uuidv4 (),
name: data . name ,
description: data . description ,
instrumentId: data . symbol ,
conditions: data . conditions . map ( condition => ({
conditionType: condition . conditionType ,
value: condition . threshold ,
additionalValues: JSON . stringify ({
name: condition . indicator ,
period: condition . period ,
direction: condition . direction
})
}))
}
}
})
}
Price Ingestion Pipeline
Binance WebSocket Integration
Binance WebSocket → Price Fetcher Service → TimescaleDB Hypertable
↓
GraphQL Subscription
↓
React Frontend
WebSocket Connection
The BinancePriceFetcherService maintains persistent WebSocket connections to Binance for subscribed trading pairs.
Price Processing
Incoming kline (candlestick) data is parsed and normalized into the internal Price model.
Database Insert
Prices are batch-inserted into the TimescaleDB hypertable for efficient storage. INSERT INTO one_min_candle (bucket, symbol, open , high, low, close , volume)
VALUES ($ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 , $ 7 )
ON CONFLICT (bucket, symbol) DO UPDATE SET
open = EXCLUDED . open ,
high = EXCLUDED . high ,
low = EXCLUDED . low ,
close = EXCLUDED . close ,
volume = EXCLUDED . volume ;
Event Publishing
Price updates are published to GraphQL subscribers and trigger rule evaluation.
Deployment Architecture
Docker Compose (Development)
services :
server :
image : nayth/price-signal-graph:latest
ports :
- 8080:8080
environment :
- Nats__Url=nats://host.docker.internal:4222
- ASPNETCORE_ENVIRONMENT=Development
db :
image : timescale/timescaledb-ha:pg15
environment :
- POSTGRES_DB=price_signal
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=example
ports :
- 5432:5432
Kubernetes (Production)
Infrastructure as Code is managed with Pulumi in the /infra directory. The deployment includes separate pods for GraphQL API, Telegram bot, and background services.
Price Ingestion : 10,000+ updates/second per instance
Rule Evaluation : Sub-millisecond per rule
GraphQL Latency : <50ms p99 for queries
Notification Delivery : <1 second from trigger to Telegram
Database : Handles 100M+ candle rows with automatic compression
Rule cache is in-memory. Ensure sufficient RAM allocation (recommend 2GB minimum) for production deployments with thousands of active rules.
Monitoring and Observability
Logging : Structured logging via Serilog
Metrics : Custom metrics exported for Prometheus
Tracing : Distributed tracing via OpenTelemetry (planned)
Health Checks : /health endpoints for all services
Scaling Considerations
Horizontal Scaling
GraphQL API : Stateless, scale behind load balancer
Rule Engine : Requires distributed cache (Redis) for multi-instance deployments
Database : TimescaleDB supports read replicas and multi-node clustering
NATS : Cluster mode for high availability
Vertical Scaling
Database : Increase RAM for larger time-series datasets
API Server : CPU-bound during rule evaluation spikes
For deployments with >10,000 active rules, consider implementing rule sharding across multiple rule engine instances.
Security
Authentication : JWT-based authentication with secure token storage
Authorization : User-scoped queries prevent cross-user data access
Database : Parameterized queries prevent SQL injection
API : Rate limiting prevents abuse
Secrets : Environment variables and secret managers for sensitive data
Next Steps
API Reference Explore the GraphQL schema
Deployment Guide Deploy PriceSignal to production