Skip to main content
Kafka integration enables you to stream WhatsApp events to Apache Kafka topics for real-time data pipelines, stream processing, and event-driven architectures.

Configuration

Configure Kafka connection and event routing through environment variables.

Basic Connection

.env
# Enable Kafka integration
KAFKA_ENABLED=true

# Kafka client ID
KAFKA_CLIENT_ID=evolution-api

# Comma-separated list of broker addresses
KAFKA_BROKERS=localhost:9092,localhost:9093,localhost:9094

# Connection and request timeouts (milliseconds)
KAFKA_CONNECTION_TIMEOUT=3000
KAFKA_REQUEST_TIMEOUT=30000
For production environments, specify multiple brokers for high availability. The client will automatically connect to available brokers.

Global vs Instance Topics

Configure Kafka to work in two modes:
Each instance publishes to its own topics:
.env
KAFKA_ENABLED=true
KAFKA_GLOBAL_ENABLED=false
KAFKA_TOPIC_PREFIX=evolution
Creates topics like:
  • evolution.my_instance.messages.upsert
  • evolution.my_instance.qrcode.updated
  • evolution.my_instance.connection.update

Topic Configuration

.env
# Topic prefix for namespacing
KAFKA_TOPIC_PREFIX=evolution

# Number of partitions per topic
KAFKA_NUM_PARTITIONS=1

# Replication factor
KAFKA_REPLICATION_FACTOR=1

# Auto-create topics if they don't exist
KAFKA_AUTO_CREATE_TOPICS=false
In production, set KAFKA_AUTO_CREATE_TOPICS=false and create topics manually with appropriate partition and replication settings for your cluster.

Consumer Configuration

.env
# Consumer group ID for reading events
KAFKA_CONSUMER_GROUP_ID=evolution-api-consumers

SASL Authentication

Secure your Kafka connection with SASL authentication:
.env
# Enable SASL authentication
KAFKA_SASL_ENABLED=true

# SASL mechanism: plain, scram-sha-256, scram-sha-512
KAFKA_SASL_MECHANISM=plain

# SASL credentials
KAFKA_SASL_USERNAME=your_username
KAFKA_SASL_PASSWORD=your_password
Simple username/password authentication:
KAFKA_SASL_MECHANISM=plain
KAFKA_SASL_USERNAME=admin
KAFKA_SASL_PASSWORD=admin-secret

SSL Configuration

Enable SSL/TLS encryption for Kafka connections:
.env
# Enable SSL
KAFKA_SSL_ENABLED=true

# Reject unauthorized certificates
KAFKA_SSL_REJECT_UNAUTHORIZED=true

# Path to CA certificate (optional)
KAFKA_SSL_CA=/path/to/ca-cert.pem

# Path to client certificate (optional)
KAFKA_SSL_CERT=/path/to/client-cert.pem

# Path to client key (optional)
KAFKA_SSL_KEY=/path/to/client-key.pem
For development with self-signed certificates, you can set KAFKA_SSL_REJECT_UNAUTHORIZED=false, but this is not recommended for production.

Available Events

Configure which events are sent to Kafka:
.env
# Application lifecycle
KAFKA_EVENTS_APPLICATION_STARTUP=false
KAFKA_EVENTS_INSTANCE_CREATE=false
KAFKA_EVENTS_INSTANCE_DELETE=false

# Connection events
KAFKA_EVENTS_QRCODE_UPDATED=true
KAFKA_EVENTS_CONNECTION_UPDATE=true

Per-Instance Configuration

Configure Kafka events for a specific instance via API:
curl -X POST https://your-api.com/kafka/set/instance_name \
  -H "apikey: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "kafka": {
      "enabled": true,
      "events": [
        "MESSAGES_UPSERT",
        "MESSAGES_UPDATE",
        "QRCODE_UPDATED",
        "CONNECTION_UPDATE"
      ]
    }
  }'

Message Format

Messages published to Kafka include both headers and body:

Message Headers

{
  "event": "messages.upsert",
  "instance": "my_instance",
  "origin": "WhatsApp",
  "timestamp": "2024-03-04T10:30:00.000Z"
}

Message Body

{
  "event": "messages.upsert",
  "instance": "my_instance",
  "data": {
    "key": {
      "remoteJid": "[email protected]",
      "fromMe": false,
      "id": "3EB0XXXXX"
    },
    "message": {
      "conversation": "Hello from WhatsApp!"
    },
    "messageTimestamp": 1709550600,
    "pushName": "John Doe"
  },
  "server_url": "https://your-evolution-api.com",
  "date_time": "2024-03-04T10:30:00.000Z",
  "sender": "5511999999999",
  "apikey": "instance_api_key",
  "timestamp": 1709550600000
}
The message key is set to the instance name for per-instance topics, or instanceName-event for global topics. This ensures proper partitioning.

Consuming Events

Examples of consuming events from Kafka:
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ 
  groupId: 'evolution-consumers' 
});

const run = async () => {
  await consumer.connect();
  
  // Subscribe to topics
  await consumer.subscribe({
    topics: [
      'evolution.global.messages.upsert',
      'evolution.global.qrcode.updated'
    ],
    fromBeginning: false
  });
  
  // Process messages
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());
      
      console.log({
        topic,
        partition,
        offset: message.offset,
        event: event.event,
        instance: event.instance,
        timestamp: message.timestamp
      });
      
      // Process the event
      await processEvent(event);
    },
  });
};

run().catch(console.error);

Connection Resilience

Evolution API implements automatic reconnection:
  • Initial delay: 5 seconds
  • Maximum attempts: 10
  • Exponential backoff: Delay doubles each attempt
  • Retry on send: Up to 3 attempts per message
The producer is configured with idempotent: true and maxInFlightRequests: 1 to ensure exactly-once semantics and message ordering.

Best Practices

1

Choose Appropriate Partitions

Set KAFKA_NUM_PARTITIONS based on your throughput needs:
  • More partitions = higher parallelism
  • Start with 3-6 partitions per topic
  • Can increase later, but can’t decrease
2

Set Replication Factor

For production:
KAFKA_REPLICATION_FACTOR=3
Protects against broker failures.
3

Use Consumer Groups

Multiple consumers in the same group share the load:
const consumer = kafka.consumer({ 
  groupId: 'evolution-processors' 
});
4

Handle Deserialization Errors

Always wrap JSON parsing in try-catch:
try {
  const event = JSON.parse(message.value.toString());
  await processEvent(event);
} catch (error) {
  console.error('Failed to process message:', error);
  // Log to dead letter topic
}
5

Monitor Lag

Set up monitoring for consumer lag to detect processing issues early.
Kafka requires proper disk space and memory allocation. Monitor your cluster resources to prevent message loss.

Troubleshooting

  1. Verify Kafka brokers are running and accessible
  2. Check firewall rules allow connections to broker ports
  3. Increase KAFKA_CONNECTION_TIMEOUT if network is slow
  4. Verify KAFKA_BROKERS addresses are correct
  1. Verify credentials in .env file
  2. Check SASL mechanism matches Kafka server configuration
  3. Ensure user has proper ACLs in Kafka
  4. Review Kafka server logs for authentication errors
  1. Check that specific events are enabled in configuration
  2. Verify KAFKA_ENABLED=true
  3. If AUTO_CREATE_TOPICS=false, manually create topics
  4. Check Evolution API logs for Kafka errors
  5. Verify instance is connected to WhatsApp
  1. Increase number of consumers in consumer group
  2. Optimize message processing code
  3. Increase number of partitions (requires topic recreation)
  4. Check for slow downstream dependencies
  1. Verify certificate paths are correct
  2. Check certificate validity dates
  3. Ensure CA certificate is properly formatted
  4. For testing, temporarily set KAFKA_SSL_REJECT_UNAUTHORIZED=false

Build docs developers (and LLMs) love