Skip to main content
Blnk uses Asynq, a Redis-based distributed task queue, to manage transaction processing with high reliability and scalability. The queue system ensures transactions are processed serially per balance to prevent race conditions while maintaining high throughput.

Architecture Overview

The queue system is implemented in /queue.go and uses a multi-queue architecture:
  • Multiple Transaction Queues: Transactions are distributed across N queues based on balance ID hashing
  • Inflight Expiry Queue: Handles automatic expiration of inflight transactions
  • Index Queue: Manages search index updates for Typesense
  • Webhook Queue: Processes webhook notifications asynchronously

Queue Types

Queue TypePurposeConcurrency
new:transaction_{1-N}Process financial transactions1 per queue
new:inflight-expiryExpire stuck inflight transactionsConfigurable
new:indexUpdate search indexesConfigurable
new:webhookSend webhook notifications20 (default)

Transaction Distribution

Transactions are distributed across multiple queues using consistent hashing to ensure all transactions for the same balance are processed in the same queue (serially):
// Source: queue.go:216-224
func (q *Queue) geTask(transaction *model.Transaction, payload []byte) *asynq.Task {
    queueIndex := hashBalanceID(transaction.Source) % q.config.Queue.NumberOfQueues
    queueName := fmt.Sprintf("%s_%d", q.config.Queue.TransactionQueue, queueIndex+1)
    
    taskOptions := []asynq.Option{asynq.TaskID(transaction.TransactionID), asynq.Queue(queueName)}
    if !transaction.ScheduledFor.IsZero() {
        taskOptions = append(taskOptions, asynq.ProcessIn(time.Until(transaction.ScheduledFor)))
    }
    
    return asynq.NewTask(queueName, payload, taskOptions...)
}
The hashBalanceID function (queue.go:234) uses FNV-1a hashing to ensure consistent distribution:
func hashBalanceID(balanceID string) int {
    hasher := fnv.New32a()
    _, _ = hasher.Write([]byte(balanceID))
    return int(hasher.Sum32())
}

Configuration

Queue Settings

Configure queue behavior in your blnk.json:
{
  "queue": {
    "transaction_queue": "new:transaction",
    "webhook_queue": "new:webhook",
    "index_queue": "new:index",
    "inflight_expiry_queue": "new:inflight-expiry",
    "number_of_queues": 20,
    "webhook_concurrency": 20,
    "monitoring_port": "5004"
  }
}
Parameters:
  • number_of_queues: Number of transaction queues (default: 20). Higher values increase parallelism but use more resources.
  • webhook_concurrency: Number of concurrent webhook workers (default: 20)
  • monitoring_port: Asynq monitoring dashboard port (default: “5004”)

Redis Connection

Queue system uses Redis configuration from redis section:
{
  "redis": {
    "dns": "redis://localhost:6379",
    "pool_size": 100,
    "min_idle_conns": 20,
    "skip_tls_verify": false
  }
}
Or via environment variables:
BLNK_REDIS_DNS=redis://localhost:6379
BLNK_REDIS_POOL_SIZE=100
BLNK_REDIS_MIN_IDLE_CONNS=20

Queue Operations

Enqueuing Transactions

Transactions are enqueued with automatic retry configuration:
// Source: queue.go:169-184
func (q *Queue) Enqueue(ctx context.Context, transaction *model.Transaction) error {
    payload, err := json.Marshal(transaction)
    if err != nil {
        return err
    }
    _, err = q.Client.EnqueueContext(ctx, q.geTask(transaction, payload), asynq.MaxRetry(5))
    if err != nil {
        logrus.WithError(err).WithField("reference", transaction.Reference).Error("failed to enqueue transaction")
        return err
    }
    return nil
}
Features:
  • Automatic retry up to 5 times on failure
  • Transaction ID used as task ID (prevents duplicates)
  • Scheduled transactions supported via scheduled_for field

Inflight Expiry

Inflight transactions are automatically expired after their expiry date:
// Source: queue.go:76-93
func (q *Queue) queueInflightExpiry(transactionID string, expiresAt time.Time) error {
    IPayload, err := json.Marshal(transactionID)
    if err != nil {
        return err
    }
    taskOptions := []asynq.Option{
        asynq.TaskID(transactionID),
        asynq.Queue(q.config.Queue.InflightExpiryQueue),
        asynq.ProcessIn(time.Until(expiresAt)),
    }
    task := asynq.NewTask(q.config.Queue.InflightExpiryQueue, IPayload, taskOptions...)
    _, err = q.Client.Enqueue(task)
    return err
}

Index Queue

Search index updates are batched and queued to avoid blocking transaction processing:
// Source: queue.go:105-124
func (q *Queue) queueIndexBatch(batch interface{}) error {
    if q.config.TypeSense.Dns == "" {
        return nil
    }
    
    payload, err := json.Marshal(batch)
    if err != nil {
        return err
    }
    
    taskOptions := []asynq.Option{asynq.Queue(q.config.Queue.IndexQueue)}
    task := asynq.NewTask("new:index:batch", payload, taskOptions...)
    _, err = q.Client.Enqueue(task)
    return err
}

Queue Recovery

Blnk includes automatic recovery for stuck queued transactions (queue_recovery.go).

Recovery Processor

The recovery processor runs continuously to detect and recover stuck transactions:
// Source: queue_recovery.go:43-58
func NewQueuedTransactionRecoveryProcessor(blnk *Blnk) *QueuedTransactionRecoveryProcessor {
    maxWorkers := 10
    cfg, err := config.Fetch()
    if err == nil && cfg.Transaction.MaxWorkers > 0 {
        maxWorkers = cfg.Transaction.MaxWorkers
    }
    
    return &QueuedTransactionRecoveryProcessor{
        blnk:                blnk,
        batchSize:           maxWorkers * 100,
        maxWorkers:          maxWorkers,
        pollInterval:        30 * time.Second,
        stuckThreshold:      2 * time.Hour,
        maxRecoveryAttempts: 3,
        stopCh:              make(chan struct{}),
    }
}
Default Settings:
  • Poll interval: 30 seconds
  • Stuck threshold: 2 hours
  • Max recovery attempts: 3
  • Batch size: maxWorkers * 100

Recovery Logic

Transactions stuck in “QUEUED” status are automatically reprocessed:
// Source: queue_recovery.go:165-232
func (p *QueuedTransactionRecoveryProcessor) processStuckTransaction(ctx context.Context, stuckTxn *model.Transaction) error {
    attempts++
    
    if attempts > p.maxRecoveryAttempts {
        logrus.Warnf("Stuck transaction %s exceeded max recovery attempts (%d), rejecting", stuckTxn.TransactionID, p.maxRecoveryAttempts)
        rejectionCopy := createQueueCopy(stuckTxn, stuckTxn.Reference)
        _, err := p.blnk.RejectTransaction(ctx, rejectionCopy, "exceeded max queued recovery attempts")
        return err
    }
    
    queueCopy := createQueueCopy(stuckTxn, stuckTxn.Reference)
    _, err := p.blnk.RecordTransaction(ctx, queueCopy)
    if err != nil {
        if isReferenceAlreadyUsedError(err) {
            // Already processed, mark as recovered
            p.updateRecoveryMetadata(ctx, stuckTxn, attempts, "already_processed")
            return nil
        }
        p.updateRecoveryMetadata(ctx, stuckTxn, attempts, "failed")
        return err
    }
    
    p.updateRecoveryMetadata(ctx, stuckTxn, attempts, "recovered")
    return nil
}

Manual Recovery Trigger

You can manually trigger recovery via API:
POST /admin/recover-queued-transactions
Content-Type: application/json

{
  "threshold_minutes": 120
}
Response:
{
  "recovered_count": 15,
  "message": "Recovery completed"
}

Queue Inspection

Retrieve a queued transaction by ID:
// Source: queue.go:248-261
func (q *Queue) GetTransactionFromQueue(transactionID string) (*model.Transaction, error) {
    for i := 1; i <= q.config.Queue.NumberOfQueues; i++ {
        queueName := fmt.Sprintf("%s_%d", q.config.Queue.TransactionQueue, i)
        task, err := q.Inspector.GetTaskInfo(queueName, transactionID)
        if err == nil && task != nil {
            var txn model.Transaction
            if err := json.Unmarshal(task.Payload, &txn); err != nil {
                return nil, err
            }
            return &txn, nil
        }
    }
    return nil, nil // Not found in any queue
}

Monitoring

Asynq provides a web UI for monitoring queues:
# Access monitoring dashboard
open http://localhost:5004
The dashboard shows:
  • Queue sizes and latencies
  • Task success/failure rates
  • Retry statistics
  • Scheduled tasks
  • Dead letter queue

Performance Tuning

Optimal Queue Count

Choose number_of_queues based on your workload:
  • Low volume (< 100 TPS): 10-20 queues
  • Medium volume (100-1000 TPS): 20-50 queues
  • High volume (> 1000 TPS): 50-100 queues
Formula: number_of_queues = concurrent_workers * 2

Redis Connection Pool

Size the Redis pool to handle queue throughput:
{
  "redis": {
    "pool_size": 100,      // Max connections
    "min_idle_conns": 20   // Minimum idle connections
  }
}
Sizing Guide:
  • pool_size: At least number_of_queues + webhook_concurrency + 20
  • min_idle_conns: 20% of pool_size

Worker Configuration

Adjust transaction processing workers:
{
  "transaction": {
    "max_workers": 10,           // Concurrent transaction processors
    "batch_size": 100000,        // Batch size for bulk operations
    "max_queue_size": 1000       // Max in-memory queue size
  }
}

Best Practices

  1. Use Unique References: Always provide unique transaction references to prevent duplicate processing
  2. Monitor Queue Depth: Set up alerts when queue depth exceeds thresholds:
    # Example: Alert when queued > 10,000
    redis-cli LLEN blnk:queue:new:transaction_1
    
  3. Handle Scheduled Transactions: Use scheduled_for for future-dated transactions instead of external schedulers
  4. Enable Recovery: The recovery processor automatically runs - ensure it’s not disabled
  5. Set Appropriate Timeouts: Configure lock duration based on your transaction complexity:
    {
      "transaction": {
        "lock_duration": "30m"  // 30 minutes
      }
    }
    

Troubleshooting

Transactions Stuck in QUEUED Status

Symptom: Transactions remain in QUEUED status for extended periods Solutions:
  1. Check queue recovery processor is running
  2. Verify Redis connectivity
  3. Check worker health: curl http://localhost:5004/health
  4. Manually trigger recovery via API

High Queue Latency

Symptom: Transactions taking too long to process Solutions:
  1. Increase number_of_queues
  2. Increase max_workers
  3. Scale Redis vertically (more CPU/memory)
  4. Enable Redis persistence for durability

Duplicate Transaction Processing

Symptom: Same transaction processed multiple times Solutions:
  1. Ensure unique transaction references
  2. Check for queue recovery conflicts
  3. Verify Asynq task ID uniqueness

Build docs developers (and LLMs) love