Architecture Overview
The queue system is implemented in/queue.go and uses a multi-queue architecture:
- Multiple Transaction Queues: Transactions are distributed across N queues based on balance ID hashing
- Inflight Expiry Queue: Handles automatic expiration of inflight transactions
- Index Queue: Manages search index updates for Typesense
- Webhook Queue: Processes webhook notifications asynchronously
Queue Types
| Queue Type | Purpose | Concurrency |
|---|---|---|
new:transaction_{1-N} | Process financial transactions | 1 per queue |
new:inflight-expiry | Expire stuck inflight transactions | Configurable |
new:index | Update search indexes | Configurable |
new:webhook | Send webhook notifications | 20 (default) |
Transaction Distribution
Transactions are distributed across multiple queues using consistent hashing to ensure all transactions for the same balance are processed in the same queue (serially):hashBalanceID function (queue.go:234) uses FNV-1a hashing to ensure consistent distribution:
Configuration
Queue Settings
Configure queue behavior in yourblnk.json:
number_of_queues: Number of transaction queues (default: 20). Higher values increase parallelism but use more resources.webhook_concurrency: Number of concurrent webhook workers (default: 20)monitoring_port: Asynq monitoring dashboard port (default: “5004”)
Redis Connection
Queue system uses Redis configuration fromredis section:
Queue Operations
Enqueuing Transactions
Transactions are enqueued with automatic retry configuration:- Automatic retry up to 5 times on failure
- Transaction ID used as task ID (prevents duplicates)
- Scheduled transactions supported via
scheduled_forfield
Inflight Expiry
Inflight transactions are automatically expired after their expiry date:Index Queue
Search index updates are batched and queued to avoid blocking transaction processing:Queue Recovery
Blnk includes automatic recovery for stuck queued transactions (queue_recovery.go).Recovery Processor
The recovery processor runs continuously to detect and recover stuck transactions:- Poll interval: 30 seconds
- Stuck threshold: 2 hours
- Max recovery attempts: 3
- Batch size:
maxWorkers * 100
Recovery Logic
Transactions stuck in “QUEUED” status are automatically reprocessed:Manual Recovery Trigger
You can manually trigger recovery via API:Queue Inspection
Retrieve a queued transaction by ID:Monitoring
Asynq provides a web UI for monitoring queues:- Queue sizes and latencies
- Task success/failure rates
- Retry statistics
- Scheduled tasks
- Dead letter queue
Performance Tuning
Optimal Queue Count
Choosenumber_of_queues based on your workload:
- Low volume (< 100 TPS): 10-20 queues
- Medium volume (100-1000 TPS): 20-50 queues
- High volume (> 1000 TPS): 50-100 queues
number_of_queues = concurrent_workers * 2
Redis Connection Pool
Size the Redis pool to handle queue throughput:pool_size: At leastnumber_of_queues + webhook_concurrency + 20min_idle_conns: 20% ofpool_size
Worker Configuration
Adjust transaction processing workers:Best Practices
- Use Unique References: Always provide unique transaction references to prevent duplicate processing
-
Monitor Queue Depth: Set up alerts when queue depth exceeds thresholds:
-
Handle Scheduled Transactions: Use
scheduled_forfor future-dated transactions instead of external schedulers - Enable Recovery: The recovery processor automatically runs - ensure it’s not disabled
-
Set Appropriate Timeouts: Configure lock duration based on your transaction complexity:
Troubleshooting
Transactions Stuck in QUEUED Status
Symptom: Transactions remain in QUEUED status for extended periods Solutions:- Check queue recovery processor is running
- Verify Redis connectivity
- Check worker health:
curl http://localhost:5004/health - Manually trigger recovery via API
High Queue Latency
Symptom: Transactions taking too long to process Solutions:- Increase
number_of_queues - Increase
max_workers - Scale Redis vertically (more CPU/memory)
- Enable Redis persistence for durability
Duplicate Transaction Processing
Symptom: Same transaction processed multiple times Solutions:- Ensure unique transaction references
- Check for queue recovery conflicts
- Verify Asynq task ID uniqueness