Kafka integration enables you to stream WhatsApp events to Apache Kafka topics for real-time data pipelines, stream processing, and event-driven architectures.
Configuration
Configure Kafka connection and event routing through environment variables.
Basic Connection
# Enable Kafka integration
KAFKA_ENABLED = true
# Kafka client ID
KAFKA_CLIENT_ID = evolution-api
# Comma-separated list of broker addresses
KAFKA_BROKERS = localhost:9092,localhost:9093,localhost:9094
# Connection and request timeouts (milliseconds)
KAFKA_CONNECTION_TIMEOUT = 3000
KAFKA_REQUEST_TIMEOUT = 30000
For production environments, specify multiple brokers for high availability. The client will automatically connect to available brokers.
Global vs Instance Topics
Configure Kafka to work in two modes:
Per-Instance Topics
Global Topics
Each instance publishes to its own topics: KAFKA_ENABLED = true
KAFKA_GLOBAL_ENABLED = false
KAFKA_TOPIC_PREFIX = evolution
Creates topics like:
evolution.my_instance.messages.upsert
evolution.my_instance.qrcode.updated
evolution.my_instance.connection.update
All instances publish to shared global topics: KAFKA_ENABLED = true
KAFKA_GLOBAL_ENABLED = true
KAFKA_TOPIC_PREFIX = evolution
Creates topics like:
evolution.global.messages.upsert
evolution.global.qrcode.updated
evolution.global.connection.update
Topic Configuration
# Topic prefix for namespacing
KAFKA_TOPIC_PREFIX = evolution
# Number of partitions per topic
KAFKA_NUM_PARTITIONS = 1
# Replication factor
KAFKA_REPLICATION_FACTOR = 1
# Auto-create topics if they don't exist
KAFKA_AUTO_CREATE_TOPICS = false
In production, set KAFKA_AUTO_CREATE_TOPICS=false and create topics manually with appropriate partition and replication settings for your cluster.
Consumer Configuration
# Consumer group ID for reading events
KAFKA_CONSUMER_GROUP_ID = evolution-api-consumers
SASL Authentication
Secure your Kafka connection with SASL authentication:
# Enable SASL authentication
KAFKA_SASL_ENABLED = true
# SASL mechanism: plain, scram-sha-256, scram-sha-512
KAFKA_SASL_MECHANISM = plain
# SASL credentials
KAFKA_SASL_USERNAME = your_username
KAFKA_SASL_PASSWORD = your_password
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Simple username/password authentication: KAFKA_SASL_MECHANISM = plain
KAFKA_SASL_USERNAME = admin
KAFKA_SASL_PASSWORD = admin-secret
Salted Challenge Response Authentication: KAFKA_SASL_MECHANISM = scram-sha-256
KAFKA_SASL_USERNAME = your_username
KAFKA_SASL_PASSWORD = your_password
More secure variant of SCRAM: KAFKA_SASL_MECHANISM = scram-sha-512
KAFKA_SASL_USERNAME = your_username
KAFKA_SASL_PASSWORD = your_password
SSL Configuration
Enable SSL/TLS encryption for Kafka connections:
# Enable SSL
KAFKA_SSL_ENABLED = true
# Reject unauthorized certificates
KAFKA_SSL_REJECT_UNAUTHORIZED = true
# Path to CA certificate (optional)
KAFKA_SSL_CA = /path/to/ca-cert.pem
# Path to client certificate (optional)
KAFKA_SSL_CERT = /path/to/client-cert.pem
# Path to client key (optional)
KAFKA_SSL_KEY = /path/to/client-key.pem
For development with self-signed certificates, you can set KAFKA_SSL_REJECT_UNAUTHORIZED=false, but this is not recommended for production.
Available Events
Configure which events are sent to Kafka:
Instance Events
Message Events
Contact Events
Chat & Group Events
Other Events
# Application lifecycle
KAFKA_EVENTS_APPLICATION_STARTUP = false
KAFKA_EVENTS_INSTANCE_CREATE = false
KAFKA_EVENTS_INSTANCE_DELETE = false
# Connection events
KAFKA_EVENTS_QRCODE_UPDATED = true
KAFKA_EVENTS_CONNECTION_UPDATE = true
# Message events
KAFKA_EVENTS_MESSAGES_SET = true
KAFKA_EVENTS_MESSAGES_UPSERT = true
KAFKA_EVENTS_MESSAGES_EDITED = true
KAFKA_EVENTS_MESSAGES_UPDATE = true
KAFKA_EVENTS_MESSAGES_DELETE = true
KAFKA_EVENTS_SEND_MESSAGE = true
KAFKA_EVENTS_SEND_MESSAGE_UPDATE = true
# Contact events
KAFKA_EVENTS_CONTACTS_SET = true
KAFKA_EVENTS_CONTACTS_UPSERT = true
KAFKA_EVENTS_CONTACTS_UPDATE = true
KAFKA_EVENTS_PRESENCE_UPDATE = true
# Chat events
KAFKA_EVENTS_CHATS_SET = true
KAFKA_EVENTS_CHATS_UPSERT = true
KAFKA_EVENTS_CHATS_UPDATE = true
KAFKA_EVENTS_CHATS_DELETE = true
# Group events
KAFKA_EVENTS_GROUPS_UPSERT = true
KAFKA_EVENTS_GROUPS_UPDATE = true
KAFKA_EVENTS_GROUP_PARTICIPANTS_UPDATE = true
# Label events
KAFKA_EVENTS_LABELS_EDIT = true
KAFKA_EVENTS_LABELS_ASSOCIATION = true
# Call events
KAFKA_EVENTS_CALL = true
# Typebot integration
KAFKA_EVENTS_TYPEBOT_START = false
KAFKA_EVENTS_TYPEBOT_CHANGE_STATUS = false
Per-Instance Configuration
Configure Kafka events for a specific instance via API:
curl -X POST https://your-api.com/kafka/set/instance_name \
-H "apikey: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"kafka": {
"enabled": true,
"events": [
"MESSAGES_UPSERT",
"MESSAGES_UPDATE",
"QRCODE_UPDATED",
"CONNECTION_UPDATE"
]
}
}'
Messages published to Kafka include both headers and body:
{
"event" : "messages.upsert" ,
"instance" : "my_instance" ,
"origin" : "WhatsApp" ,
"timestamp" : "2024-03-04T10:30:00.000Z"
}
Message Body
{
"event" : "messages.upsert" ,
"instance" : "my_instance" ,
"data" : {
"key" : {
"remoteJid" : "[email protected] " ,
"fromMe" : false ,
"id" : "3EB0XXXXX"
},
"message" : {
"conversation" : "Hello from WhatsApp!"
},
"messageTimestamp" : 1709550600 ,
"pushName" : "John Doe"
},
"server_url" : "https://your-evolution-api.com" ,
"date_time" : "2024-03-04T10:30:00.000Z" ,
"sender" : "5511999999999" ,
"apikey" : "instance_api_key" ,
"timestamp" : 1709550600000
}
The message key is set to the instance name for per-instance topics, or instanceName-event for global topics. This ensures proper partitioning.
Consuming Events
Examples of consuming events from Kafka:
const { Kafka } = require ( 'kafkajs' );
const kafka = new Kafka ({
clientId: 'my-consumer' ,
brokers: [ 'localhost:9092' ]
});
const consumer = kafka . consumer ({
groupId: 'evolution-consumers'
});
const run = async () => {
await consumer . connect ();
// Subscribe to topics
await consumer . subscribe ({
topics: [
'evolution.global.messages.upsert' ,
'evolution.global.qrcode.updated'
],
fromBeginning: false
});
// Process messages
await consumer . run ({
eachMessage : async ({ topic , partition , message }) => {
const event = JSON . parse ( message . value . toString ());
console . log ({
topic ,
partition ,
offset: message . offset ,
event: event . event ,
instance: event . instance ,
timestamp: message . timestamp
});
// Process the event
await processEvent ( event );
},
});
};
run (). catch ( console . error );
Connection Resilience
Evolution API implements automatic reconnection:
Initial delay : 5 seconds
Maximum attempts : 10
Exponential backoff : Delay doubles each attempt
Retry on send : Up to 3 attempts per message
The producer is configured with idempotent: true and maxInFlightRequests: 1 to ensure exactly-once semantics and message ordering.
Best Practices
Choose Appropriate Partitions
Set KAFKA_NUM_PARTITIONS based on your throughput needs:
More partitions = higher parallelism
Start with 3-6 partitions per topic
Can increase later, but can’t decrease
Set Replication Factor
For production: KAFKA_REPLICATION_FACTOR = 3
Protects against broker failures.
Use Consumer Groups
Multiple consumers in the same group share the load: const consumer = kafka . consumer ({
groupId: 'evolution-processors'
});
Handle Deserialization Errors
Always wrap JSON parsing in try-catch: try {
const event = JSON . parse ( message . value . toString ());
await processEvent ( event );
} catch ( error ) {
console . error ( 'Failed to process message:' , error );
// Log to dead letter topic
}
Monitor Lag
Set up monitoring for consumer lag to detect processing issues early.
Kafka requires proper disk space and memory allocation. Monitor your cluster resources to prevent message loss.
Troubleshooting
Verify Kafka brokers are running and accessible
Check firewall rules allow connections to broker ports
Increase KAFKA_CONNECTION_TIMEOUT if network is slow
Verify KAFKA_BROKERS addresses are correct
SASL authentication failed
Verify credentials in .env file
Check SASL mechanism matches Kafka server configuration
Ensure user has proper ACLs in Kafka
Review Kafka server logs for authentication errors
Messages not appearing in topics
Check that specific events are enabled in configuration
Verify KAFKA_ENABLED=true
If AUTO_CREATE_TOPICS=false, manually create topics
Check Evolution API logs for Kafka errors
Verify instance is connected to WhatsApp
Increase number of consumers in consumer group
Optimize message processing code
Increase number of partitions (requires topic recreation)
Check for slow downstream dependencies
Verify certificate paths are correct
Check certificate validity dates
Ensure CA certificate is properly formatted
For testing, temporarily set KAFKA_SSL_REJECT_UNAUTHORIZED=false