Skip to main content

Overview

KAIU Natural Living uses Redis as the message broker for BullMQ, which handles asynchronous processing of WhatsApp messages and AI responses.

Why Redis?

Queue Management

BullMQ requires Redis for job queue persistence and management

Message Deduplication

Prevents duplicate message processing using job IDs

Real-time Performance

In-memory processing for fast WhatsApp response times

Worker Coordination

Coordinates multiple workers for horizontal scaling

Prerequisites

  • Redis Server 6.0 or higher
  • At least 256MB available RAM (512MB recommended)

Installation

# Install Redis
sudo apt update
sudo apt install redis-server

# Start Redis service
sudo systemctl start redis-server
sudo systemctl enable redis-server

# Verify installation
redis-cli ping
# Should return: PONG

Configuration

Recommended for production and cloud providers:
.env.local
# Complete Redis connection URL
REDIS_URL="redis://localhost:6379"

# With password
REDIS_URL="redis://default:yourpassword@localhost:6379"

# Upstash (with TLS)
REDIS_URL="rediss://default:[email protected]:6379"
The connection is handled in backend/whatsapp/queue.js:18-27:
const redisUrl = process.env.REDIS_URL;
const redisOpts = { maxRetriesPerRequest: null };

const queueConnection = redisUrl 
  ? new IORedis(redisUrl, redisOpts) 
  : new IORedis(redisOpts);
If REDIS_URL is provided, it takes precedence over individual parameters.

BullMQ Integration

KAIU uses BullMQ for WhatsApp message processing with the following configuration:

Queue Definition

backend/whatsapp/queue.js
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

// Create queue
export const whatsappQueue = new Queue('whatsapp-ai', { 
  connection: queueConnection 
});

// Create worker
export const worker = new Worker('whatsapp-ai', async job => {
  const { wamid, from, text } = job.data;
  // Process message...
}, { 
  connection: workerConnection,
  limiter: {
    max: 10,      // Max 10 jobs processing simultaneously
    duration: 1000 // Per second
  }
});

Job Processing Flow

1

Webhook Receives Message

WhatsApp webhook receives incoming message and immediately returns 200 OK:
backend/whatsapp/webhook.js:89-97
await whatsappQueue.add('process-message', {
  wamid,
  from: message.from,
  text: message.text.body,
  timestamp: message.timestamp
}, {
  jobId: wamid, // Deduplication
  removeOnComplete: true
});
2

Worker Processes Job

BullMQ worker picks up the job from Redis and:
  • Checks session status
  • Filters PII from message
  • Calls AI service for response
  • Sends reply via WhatsApp API
3

Job Completion

Completed jobs are automatically removed from Redis to save memory.

Queue Features

Deduplication

Using jobId: wamid prevents duplicate processing of the same WhatsApp message

Rate Limiting

Limits processing to 10 concurrent jobs to prevent API rate limits

Retry Logic

Exponential backoff strategy for failed jobs

Auto Cleanup

Completed jobs are removed automatically to conserve memory

Redis Configuration (Optional)

For production deployments, optimize Redis settings:

redis.conf Recommendations

/etc/redis/redis.conf
# Memory Management
maxmemory 512mb
maxmemory-policy allkeys-lru

# Persistence (optional for queue data)
save ""
appendonly no

# Performance
tcp-backlog 511
timeout 0
tcp-keepalive 300

# Security
bind 127.0.0.1
protected-mode yes
requirepass your-secure-password
If you set requirepass, add the password to your REDIS_PASSWORD or REDIS_URL environment variable.

Apply Configuration

# Restart Redis
sudo systemctl restart redis-server

# Verify configuration
redis-cli CONFIG GET maxmemory

Monitoring

Check Queue Status

# Connect to Redis CLI
redis-cli

# List all keys (queues)
KEYS bull:whatsapp-ai:*

# Get queue length
LLEN bull:whatsapp-ai:wait

# Get failed jobs count
LLEN bull:whatsapp-ai:failed

# Monitor real-time commands
MONITOR

BullMQ Dashboard (Optional)

Install Bull Board for a web UI:
npm install @bull-board/express @bull-board/api
server.mjs
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { whatsappQueue } from './backend/whatsapp/queue.js';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [new BullMQAdapter(whatsappQueue)],
  serverAdapter: serverAdapter,
});

app.use('/admin/queues', serverAdapter.getRouter());
Access at: http://localhost:3001/admin/queues

Testing

1

Test Redis Connection

redis-cli ping
# Expected: PONG
2

Test From Node.js

test-redis.js
import IORedis from 'ioredis';

const redis = new IORedis(process.env.REDIS_URL || {
  host: process.env.REDIS_HOST || 'localhost',
  port: process.env.REDIS_PORT || 6379,
  password: process.env.REDIS_PASSWORD
});

redis.ping().then(result => {
  console.log('Redis OK:', result);
  process.exit(0);
}).catch(err => {
  console.error('Redis Error:', err);
  process.exit(1);
});
node test-redis.js
3

Test Queue Processing

Start the server and check worker logs:
npm run api:dev
You should see:
πŸš€ Initializing WhatsApp AI Worker...
πŸ”Œ Socket.IO instance injected into Queue Worker
4

Simulate Job

Send a test WhatsApp message or use the mock webhook:
curl -X POST http://localhost:3001/api/whatsapp/webhook \
  -H "Content-Type: application/json" \
  -d '{
    "object": "whatsapp_business_account",
    "entry": [{
      "changes": [{
        "value": {
          "messages": [{
            "id": "test-msg-123",
            "from": "573001234567",
            "timestamp": "1234567890",
            "type": "text",
            "text": { "body": "Hola" }
          }]
        }
      }]
    }]
  }'

Production Recommendations

Use Cloud Redis

Services like Upstash, Redis Cloud, or AWS ElastiCache provide managed Redis with automatic backups

Enable TLS

Use rediss:// URLs for encrypted connections in production

Set Memory Limits

Configure maxmemory and eviction policies to prevent OOM errors

Monitor Performance

Track queue length, job processing time, and failure rates

Troubleshooting

If you get ECONNREFUSED errors:
# Check if Redis is running
sudo systemctl status redis-server

# Check if listening on correct port
sudo netstat -plunt | grep 6379

# Try connecting manually
redis-cli ping
If using Docker:
docker ps | grep redis
docker logs kaiu-redis
If you get NOAUTH or authentication errors:
# Check if password is required
redis-cli CONFIG GET requirepass

# Test with password
redis-cli -a your-password ping
Update .env.local:
REDIS_URL="redis://default:your-password@localhost:6379"
# OR
REDIS_PASSWORD="your-password"
If jobs aren’t processing:
# Check worker is running
# Look for this in server logs:
# "πŸš€ Initializing WhatsApp AI Worker..."

# Check failed jobs
redis-cli LRANGE bull:whatsapp-ai:failed 0 -1

# Clear stuck jobs (DANGER: data loss)
redis-cli DEL bull:whatsapp-ai:wait
redis-cli DEL bull:whatsapp-ai:active
If Redis consumes too much memory:
# Check memory usage
redis-cli INFO memory

# Set memory limit
redis-cli CONFIG SET maxmemory 512mb
redis-cli CONFIG SET maxmemory-policy allkeys-lru

# Flush old data (DANGER)
redis-cli FLUSHDB

Next Steps

WhatsApp Integration

Configure WhatsApp Cloud API to complete the messaging flow

Environment Variables

Review all configuration options

Build docs developers (and LLMs) love