Skip to main content
The Kafka sink publishes observability events to Apache Kafka topics. It provides high-throughput data ingestion with support for partitioning, SASL authentication, compression, and exactly-once semantics.

Configuration

[sinks.kafka]
type = "kafka"
inputs = ["my_source"]

# Kafka brokers
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"

# Topic configuration
topic = "logs-{{ environment }}"

# Encoding
encoding.codec = "json"

# Compression
compression = "snappy"

# Batching
batch.timeout_secs = 1
batch.max_events = 1000

Core Parameters

bootstrap_servers
string
required
Comma-separated list of Kafka bootstrap servers in host:port format.These servers are used to discover the full cluster topology.
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
bootstrap_servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topic
string
required
Kafka topic name to write events to. Supports template syntax for dynamic topic selection.
healthcheck_topic
string
Topic name to use for healthcheck. If omitted, the topic field is used.Useful when topic is templated to prevent healthcheck warnings.
topic = "logs-{{ environment }}"
healthcheck_topic = "logs-production"  # Use a known topic for healthcheck
key_field
string
Event field to use as the Kafka message key. The key determines partition assignment.If unspecified or the field doesn’t exist, Kafka uses round-robin partitioning.
key_field = "user_id"      # Use user_id as partition key
key_field = "customer_id"  # Group by customer
key_field = ".partition_key" # Use nested field

Encoding

encoding.codec
string
required
Encoding format for events. Options:
  • json: JSON encoding
  • text: Plain text
  • avro: Apache Avro
  • logfmt: Logfmt encoding
  • native: Vector’s native encoding
[sinks.kafka.encoding]
codec = "json"
encoding.only_fields
array
Include only specified fields in output.
[sinks.kafka.encoding]
codec = "json"
only_fields = ["timestamp", "message", "level"]
encoding.except_fields
array
Exclude specified fields from output.
[sinks.kafka.encoding]
codec = "json"
except_fields = ["_metadata", "secret_token"]
encoding.timestamp_format
string
default:"rfc3339"
Format for timestamp fields. Options: rfc3339, unix, unix_ms, unix_ns.
[sinks.kafka.encoding]
codec = "json"
timestamp_format = "unix_ms"

Authentication

The Kafka sink supports SASL and TLS authentication.

SASL Authentication

sasl.enabled
boolean
default:"false"
Enable SASL authentication.
sasl.username
string
SASL username.
sasl.password
string
SASL password.
sasl.mechanism
string
SASL mechanism. Common options:
  • PLAIN: Plain text authentication
  • SCRAM-SHA-256: SCRAM with SHA-256
  • SCRAM-SHA-512: SCRAM with SHA-512
For other mechanisms (like Kerberos), use librdkafka_options.
[sinks.kafka.sasl]
enabled = true
username = "${KAFKA_USERNAME}"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-256"

TLS Configuration

tls.enabled
boolean
default:"false"
Enable TLS/SSL encryption.
tls.ca_file
string
Path to CA certificate file for verifying broker certificates.
[sinks.kafka.tls]
enabled = true
ca_file = "/path/to/ca.pem"
tls.crt_file
string
Path to client certificate file for mutual TLS.
tls.key_file
string
Path to client private key file for mutual TLS.
tls.verify_certificate
boolean
default:"true"
Verify broker TLS certificates.
tls.verify_hostname
boolean
default:"true"
Verify broker hostname matches certificate.
[sinks.kafka.tls]
enabled = true
ca_file = "/etc/ssl/certs/kafka-ca.pem"
crt_file = "/etc/ssl/certs/client.pem"
key_file = "/etc/ssl/private/client-key.pem"

Combined SASL and TLS

# SASL over TLS (sasl_ssl)
[sinks.kafka.sasl]
enabled = true
username = "vector"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-512"

[sinks.kafka.tls]
enabled = true
ca_file = "/path/to/ca.pem"

Compression

compression
string
default:"none"
Compression algorithm for messages. Options:
  • none: No compression
  • gzip: Gzip compression (good compatibility)
  • snappy: Snappy compression (fast, recommended)
  • lz4: LZ4 compression (fastest)
  • zstd: Zstandard compression (best ratio)
compression = "snappy"  # Recommended for most use cases
compression = "zstd"    # Best compression ratio
compression = "lz4"     # Fastest

Batching

Batching controls are mapped to librdkafka options:
batch.timeout_secs
float
Maximum time to wait before sending a batch. Maps to queue.buffering.max.ms.
[sinks.kafka.batch]
timeout_secs = 1.0  # Send every second
batch.max_events
integer
Maximum number of messages per batch. Maps to batch.num.messages.
[sinks.kafka.batch]
max_events = 1000
batch.max_bytes
integer
Maximum batch size in bytes. Maps to batch.size.
[sinks.kafka.batch]
max_bytes = 1048576  # 1MB
# Optimized for throughput
[sinks.kafka.batch]
timeout_secs = 5.0
max_events = 10000
max_bytes = 10485760  # 10MB

# Optimized for latency
[sinks.kafka.batch]
timeout_secs = 0.1
max_events = 100
max_bytes = 102400  # 100KB

Timeouts

socket_timeout_ms
integer
default:"60000"
Socket timeout in milliseconds for network requests.
socket_timeout_ms = 30000  # 30 seconds
message_timeout_ms
integer
default:"300000"
Local message timeout in milliseconds. Maximum time a message can wait in the producer queue.
message_timeout_ms = 60000  # 1 minute

Rate Limiting

rate_limit_duration_secs
integer
default:"1"
Time window for rate limiting in seconds.
rate_limit_num
integer
default:"i64::MAX"
Maximum number of requests allowed within the rate limit window.
rate_limit_duration_secs = 1
rate_limit_num = 100  # Max 100 requests per second

Headers

headers_key
string
Event field containing a map of Kafka headers to include with each message.
headers_key = "headers"
Then in your events:
{
  "message": "log data",
  "headers": {
    "trace-id": "abc123",
    "span-id": "def456"
  }
}

Advanced librdkafka Options

librdkafka_options
object
Pass any librdkafka configuration option directly. See librdkafka configuration.
[sinks.kafka.librdkafka_options]
"client.id" = "vector-producer"
"message.send.max.retries" = "3"
"enable.idempotence" = "true"
"acks" = "all"
Common options:
[sinks.kafka.librdkafka_options]
# Producer reliability
"acks" = "all"  # Wait for all replicas
"enable.idempotence" = "true"  # Exactly-once semantics
"max.in.flight.requests.per.connection" = "5"

# Performance tuning
"linger.ms" = "100"  # Wait up to 100ms to batch
"compression.type" = "snappy"
"buffer.memory" = "67108864"  # 64MB buffer

# Client identification
"client.id" = "vector-${HOSTNAME}"

Complete Examples

Basic Configuration

[sinks.kafka_basic]
type = "kafka"
inputs = ["logs"]

bootstrap_servers = "kafka:9092"
topic = "application-logs"

encoding.codec = "json"
compression = "snappy"

[sinks.kafka_basic.batch]
timeout_secs = 1
max_events = 1000

With SASL Authentication

[sinks.kafka_secure]
type = "kafka"
inputs = ["logs"]

bootstrap_servers = "kafka-1:9093,kafka-2:9093,kafka-3:9093"
topic = "secure-logs"

encoding.codec = "json"
compression = "zstd"

[sinks.kafka_secure.sasl]
enabled = true
username = "${KAFKA_USERNAME}"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-512"

[sinks.kafka_secure.tls]
enabled = true
ca_file = "/etc/kafka/ca.pem"
verify_certificate = true
verify_hostname = true

Dynamic Topic Routing

[sinks.kafka_routing]
type = "kafka"
inputs = ["classified_logs"]

bootstrap_servers = "kafka:9092"
# Route to different topics based on log level
topic = "logs-{{ level }}"

# Use host as partition key for ordering
key_field = "host"

encoding.codec = "json"
compression = "snappy"

High-Throughput Configuration

[sinks.kafka_high_volume]
type = "kafka"
inputs = ["metrics"]

bootstrap_servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topic = "metrics"

encoding.codec = "json"
compression = "lz4"  # Fastest compression

# Large batches for throughput
[sinks.kafka_high_volume.batch]
timeout_secs = 5
max_events = 10000
max_bytes = 10485760  # 10MB

# Performance tuning
[sinks.kafka_high_volume.librdkafka_options]
"linger.ms" = "1000"
"batch.size" = "1000000"
"buffer.memory" = "134217728"  # 128MB
"compression.type" = "lz4"

Exactly-Once Semantics

[sinks.kafka_exactly_once]
type = "kafka"
inputs = ["critical_events"]

bootstrap_servers = "kafka:9092"
topic = "critical-events"

encoding.codec = "json"

# Enable idempotence and acknowledgements
[sinks.kafka_exactly_once.librdkafka_options]
"enable.idempotence" = "true"
"acks" = "all"
"max.in.flight.requests.per.connection" = "5"

# Use event ID to prevent duplicates
key_field = "event_id"

Multi-Datacenter Setup

# DC1 Kafka cluster
[sinks.kafka_dc1]
type = "kafka"
inputs = ["dc1_logs"]
bootstrap_servers = "dc1-kafka-1:9092,dc1-kafka-2:9092"
topic = "logs"
encoding.codec = "json"
compression = "snappy"

# DC2 Kafka cluster
[sinks.kafka_dc2]
type = "kafka"
inputs = ["dc2_logs"]
bootstrap_servers = "dc2-kafka-1:9092,dc2-kafka-2:9092"
topic = "logs"
encoding.codec = "json"
compression = "snappy"

Troubleshooting

Connection Issues

If you can’t connect to Kafka:
  1. Verify bootstrap_servers are correct and reachable
  2. Check network connectivity and firewall rules
  3. Ensure Kafka is listening on the specified ports
  4. Verify authentication credentials if SASL is enabled
  5. Check TLS configuration matches broker settings

Authentication Failures

For SASL/TLS errors:
  1. Verify SASL mechanism matches broker configuration
  2. Check username and password are correct
  3. Ensure CA certificate is valid and trusted
  4. Verify client certificates are not expired
  5. Check broker logs for authentication errors

Messages Not Appearing

If messages aren’t in Kafka:
  1. Verify topic exists (create if needed)
  2. Check Vector logs for errors
  3. Ensure batch timeout isn’t too long
  4. Verify topic configuration allows writes
  5. Check Kafka broker logs

Performance Issues

  1. Enable compression to reduce network usage
  2. Increase batch size for higher throughput
  3. Tune librdkafka options like linger.ms and batch.size
  4. Use faster compression (lz4 > snappy > gzip > zstd)
  5. Increase buffer.memory for high-volume scenarios
  6. Add more Kafka brokers for horizontal scaling
  7. Partition topics for parallel consumption

Best Practices

  1. Use compression (snappy or lz4 recommended) to reduce network bandwidth
  2. Set appropriate batch sizes to balance latency and throughput
  3. Use partition keys (key_field) for message ordering guarantees
  4. Enable SASL and TLS in production environments
  5. Use dynamic topics for flexible routing based on event data
  6. Configure acks=all for critical data
  7. Enable idempotence for exactly-once semantics
  8. Monitor lag on consumer side
  9. Set realistic timeouts based on network conditions
  10. Use separate topics for different data types or priorities

Performance Tuning

For Maximum Throughput

compression = "lz4"

[sinks.kafka.batch]
timeout_secs = 5
max_events = 10000
max_bytes = 10485760

[sinks.kafka.librdkafka_options]
"linger.ms" = "1000"
"batch.size" = "1000000"
"compression.type" = "lz4"

For Minimum Latency

compression = "none"

[sinks.kafka.batch]
timeout_secs = 0.1
max_events = 100
max_bytes = 102400

[sinks.kafka.librdkafka_options]
"linger.ms" = "0"

For Reliability

[sinks.kafka.librdkafka_options]
"enable.idempotence" = "true"
"acks" = "all"
"retries" = "2147483647"
"max.in.flight.requests.per.connection" = "5"

See Also

Build docs developers (and LLMs) love