Overview
KAIU Natural Living uses Redis as the message broker for BullMQ , which handles asynchronous processing of WhatsApp messages and AI responses.
Why Redis?
Queue Management BullMQ requires Redis for job queue persistence and management
Message Deduplication Prevents duplicate message processing using job IDs
Real-time Performance In-memory processing for fast WhatsApp response times
Worker Coordination Coordinates multiple workers for horizontal scaling
Prerequisites
Redis Server 6.0 or higher
At least 256MB available RAM (512MB recommended)
Installation
Ubuntu/Debian
macOS
Docker
Upstash (Cloud)
# Install Redis
sudo apt update
sudo apt install redis-server
# Start Redis service
sudo systemctl start redis-server
sudo systemctl enable redis-server
# Verify installation
redis-cli ping
# Should return: PONG
# Using Homebrew
brew install redis
# Start Redis service
brew services start redis
# Verify installation
redis-cli ping
# Should return: PONG
# Pull Redis image
docker pull redis:7-alpine
# Run Redis container
docker run -d \
--name kaiu-redis \
-p 6379:6379 \
redis:7-alpine
# Test connection
docker exec -it kaiu-redis redis-cli ping
# Should return: PONG
Create Database
Go to Upstash Console
Click βCreate Databaseβ
Choose Redis and select your region
Click βCreateβ
Get Connection URL
Copy the connection string from the dashboard: rediss://default:[PASSWORD]@[ENDPOINT].upstash.io:6379
Configure Environment
Add to .env.local: REDIS_URL = "rediss://default:[PASSWORD]@example.upstash.io:6379"
Configuration
Recommended for production and cloud providers: # Complete Redis connection URL
REDIS_URL = "redis://localhost:6379"
# With password
REDIS_URL = "redis://default:yourpassword@localhost:6379"
# Upstash (with TLS)
REDIS_URL = "rediss://default:[email protected] :6379"
The connection is handled in backend/whatsapp/queue.js:18-27: const redisUrl = process . env . REDIS_URL ;
const redisOpts = { maxRetriesPerRequest: null };
const queueConnection = redisUrl
? new IORedis ( redisUrl , redisOpts )
: new IORedis ( redisOpts );
Common for local development: REDIS_HOST = "localhost"
REDIS_PORT = "6379"
REDIS_PASSWORD = "" # Leave empty if no password
Connection logic: if ( ! redisUrl ) {
redisOpts . host = process . env . REDIS_HOST || 'localhost' ;
redisOpts . port = process . env . REDIS_PORT || 6379 ;
redisOpts . password = process . env . REDIS_PASSWORD ;
}
If REDIS_URL is provided, it takes precedence over individual parameters.
BullMQ Integration
KAIU uses BullMQ for WhatsApp message processing with the following configuration:
Queue Definition
backend/whatsapp/queue.js
import { Queue , Worker } from 'bullmq' ;
import IORedis from 'ioredis' ;
// Create queue
export const whatsappQueue = new Queue ( 'whatsapp-ai' , {
connection: queueConnection
});
// Create worker
export const worker = new Worker ( 'whatsapp-ai' , async job => {
const { wamid , from , text } = job . data ;
// Process message...
}, {
connection: workerConnection ,
limiter: {
max: 10 , // Max 10 jobs processing simultaneously
duration: 1000 // Per second
}
});
Job Processing Flow
Webhook Receives Message
WhatsApp webhook receives incoming message and immediately returns 200 OK: backend/whatsapp/webhook.js:89-97
await whatsappQueue . add ( 'process-message' , {
wamid ,
from: message . from ,
text: message . text . body ,
timestamp: message . timestamp
}, {
jobId: wamid , // Deduplication
removeOnComplete: true
});
Worker Processes Job
BullMQ worker picks up the job from Redis and:
Checks session status
Filters PII from message
Calls AI service for response
Sends reply via WhatsApp API
Job Completion
Completed jobs are automatically removed from Redis to save memory.
Queue Features
Deduplication Using jobId: wamid prevents duplicate processing of the same WhatsApp message
Rate Limiting Limits processing to 10 concurrent jobs to prevent API rate limits
Retry Logic Exponential backoff strategy for failed jobs
Auto Cleanup Completed jobs are removed automatically to conserve memory
Redis Configuration (Optional)
For production deployments, optimize Redis settings:
redis.conf Recommendations
# Memory Management
maxmemory 512mb
maxmemory-policy allkeys-lru
# Persistence (optional for queue data)
save ""
appendonly no
# Performance
tcp-backlog 511
timeout 0
tcp-keepalive 300
# Security
bind 127.0.0.1
protected-mode yes
requirepass your-secure-password
If you set requirepass, add the password to your REDIS_PASSWORD or REDIS_URL environment variable.
Apply Configuration
# Restart Redis
sudo systemctl restart redis-server
# Verify configuration
redis-cli CONFIG GET maxmemory
Monitoring
Check Queue Status
# Connect to Redis CLI
redis-cli
# List all keys (queues)
KEYS bull:whatsapp-ai: *
# Get queue length
LLEN bull:whatsapp-ai:wait
# Get failed jobs count
LLEN bull:whatsapp-ai:failed
# Monitor real-time commands
MONITOR
BullMQ Dashboard (Optional)
Install Bull Board for a web UI:
npm install @bull-board/express @bull-board/api
import { createBullBoard } from '@bull-board/api' ;
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter' ;
import { ExpressAdapter } from '@bull-board/express' ;
import { whatsappQueue } from './backend/whatsapp/queue.js' ;
const serverAdapter = new ExpressAdapter ();
serverAdapter . setBasePath ( '/admin/queues' );
createBullBoard ({
queues: [ new BullMQAdapter ( whatsappQueue )],
serverAdapter: serverAdapter ,
});
app . use ( '/admin/queues' , serverAdapter . getRouter ());
Access at: http://localhost:3001/admin/queues
Testing
Test Redis Connection
redis-cli ping
# Expected: PONG
Test From Node.js
import IORedis from 'ioredis' ;
const redis = new IORedis ( process . env . REDIS_URL || {
host: process . env . REDIS_HOST || 'localhost' ,
port: process . env . REDIS_PORT || 6379 ,
password: process . env . REDIS_PASSWORD
});
redis . ping (). then ( result => {
console . log ( 'Redis OK:' , result );
process . exit ( 0 );
}). catch ( err => {
console . error ( 'Redis Error:' , err );
process . exit ( 1 );
});
Test Queue Processing
Start the server and check worker logs: You should see: π Initializing WhatsApp AI Worker...
π Socket.IO instance injected into Queue Worker
Simulate Job
Send a test WhatsApp message or use the mock webhook: curl -X POST http://localhost:3001/api/whatsapp/webhook \
-H "Content-Type: application/json" \
-d '{
"object": "whatsapp_business_account",
"entry": [{
"changes": [{
"value": {
"messages": [{
"id": "test-msg-123",
"from": "573001234567",
"timestamp": "1234567890",
"type": "text",
"text": { "body": "Hola" }
}]
}
}]
}]
}'
Production Recommendations
Use Cloud Redis Services like Upstash, Redis Cloud, or AWS ElastiCache provide managed Redis with automatic backups
Enable TLS Use rediss:// URLs for encrypted connections in production
Set Memory Limits Configure maxmemory and eviction policies to prevent OOM errors
Monitor Performance Track queue length, job processing time, and failure rates
Troubleshooting
If you get ECONNREFUSED errors: # Check if Redis is running
sudo systemctl status redis-server
# Check if listening on correct port
sudo netstat -plunt | grep 6379
# Try connecting manually
redis-cli ping
If using Docker: docker ps | grep redis
docker logs kaiu-redis
If you get NOAUTH or authentication errors: # Check if password is required
redis-cli CONFIG GET requirepass
# Test with password
redis-cli -a your-password ping
Update .env.local: REDIS_URL = "redis://default:your-password@localhost:6379"
# OR
REDIS_PASSWORD = "your-password"
If jobs arenβt processing: # Check worker is running
# Look for this in server logs:
# "π Initializing WhatsApp AI Worker..."
# Check failed jobs
redis-cli LRANGE bull:whatsapp-ai:failed 0 -1
# Clear stuck jobs (DANGER: data loss)
redis-cli DEL bull:whatsapp-ai:wait
redis-cli DEL bull:whatsapp-ai:active
If Redis consumes too much memory: # Check memory usage
redis-cli INFO memory
# Set memory limit
redis-cli CONFIG SET maxmemory 512mb
redis-cli CONFIG SET maxmemory-policy allkeys-lru
# Flush old data (DANGER)
redis-cli FLUSHDB
Next Steps
WhatsApp Integration Configure WhatsApp Cloud API to complete the messaging flow
Environment Variables Review all configuration options