The Kafka sink publishes observability events to Apache Kafka topics. It provides high-throughput data ingestion with support for partitioning, SASL authentication, compression, and exactly-once semantics.
Configuration
[ sinks . kafka ]
type = "kafka"
inputs = [ "my_source" ]
# Kafka brokers
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
# Topic configuration
topic = "logs-{{ environment }}"
# Encoding
encoding.codec = "json"
# Compression
compression = "snappy"
# Batching
batch.timeout_secs = 1
batch.max_events = 1000
Core Parameters
Comma-separated list of Kafka bootstrap servers in host:port format. These servers are used to discover the full cluster topology. bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
bootstrap_servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
Kafka topic name to write events to. Supports template syntax for dynamic topic selection. # Static topic
topic = "logs"
# Date-based topic
topic = "logs-%Y-%m-%d"
# Environment-based routing
topic = "logs-{{ environment }}"
# Service-based routing
topic = "{{ service }}-logs"
# Multi-level routing
topic = "logs-{{ environment }}-{{ service }}"
Topic name to use for healthcheck. If omitted, the topic field is used. Useful when topic is templated to prevent healthcheck warnings. topic = "logs-{{ environment }}"
healthcheck_topic = "logs-production" # Use a known topic for healthcheck
Event field to use as the Kafka message key. The key determines partition assignment. If unspecified or the field doesn’t exist, Kafka uses round-robin partitioning. key_field = "user_id" # Use user_id as partition key
key_field = "customer_id" # Group by customer
key_field = ".partition_key" # Use nested field
Encoding
Encoding format for events. Options:
json: JSON encoding
text: Plain text
avro: Apache Avro
logfmt: Logfmt encoding
native: Vector’s native encoding
[ sinks . kafka . encoding ]
codec = "json"
Include only specified fields in output. [ sinks . kafka . encoding ]
codec = "json"
only_fields = [ "timestamp" , "message" , "level" ]
Exclude specified fields from output. [ sinks . kafka . encoding ]
codec = "json"
except_fields = [ "_metadata" , "secret_token" ]
encoding.timestamp_format
Format for timestamp fields. Options: rfc3339, unix, unix_ms, unix_ns. [ sinks . kafka . encoding ]
codec = "json"
timestamp_format = "unix_ms"
Authentication
The Kafka sink supports SASL and TLS authentication.
SASL Authentication
Enable SASL authentication.
SASL mechanism. Common options:
PLAIN: Plain text authentication
SCRAM-SHA-256: SCRAM with SHA-256
SCRAM-SHA-512: SCRAM with SHA-512
For other mechanisms (like Kerberos), use librdkafka_options.
[ sinks . kafka . sasl ]
enabled = true
username = "${KAFKA_USERNAME}"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-256"
TLS Configuration
Enable TLS/SSL encryption.
Path to CA certificate file for verifying broker certificates. [ sinks . kafka . tls ]
enabled = true
ca_file = "/path/to/ca.pem"
Path to client certificate file for mutual TLS.
Path to client private key file for mutual TLS.
Verify broker TLS certificates.
Verify broker hostname matches certificate.
[ sinks . kafka . tls ]
enabled = true
ca_file = "/etc/ssl/certs/kafka-ca.pem"
crt_file = "/etc/ssl/certs/client.pem"
key_file = "/etc/ssl/private/client-key.pem"
Combined SASL and TLS
# SASL over TLS (sasl_ssl)
[ sinks . kafka . sasl ]
enabled = true
username = "vector"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-512"
[ sinks . kafka . tls ]
enabled = true
ca_file = "/path/to/ca.pem"
Compression
Compression algorithm for messages. Options:
none: No compression
gzip: Gzip compression (good compatibility)
snappy: Snappy compression (fast, recommended)
lz4: LZ4 compression (fastest)
zstd: Zstandard compression (best ratio)
compression = "snappy" # Recommended for most use cases
compression = "zstd" # Best compression ratio
compression = "lz4" # Fastest
Batching
Batching controls are mapped to librdkafka options:
Maximum time to wait before sending a batch. Maps to queue.buffering.max.ms. [ sinks . kafka . batch ]
timeout_secs = 1.0 # Send every second
Maximum number of messages per batch. Maps to batch.num.messages. [ sinks . kafka . batch ]
max_events = 1000
Maximum batch size in bytes. Maps to batch.size. [ sinks . kafka . batch ]
max_bytes = 1048576 # 1MB
# Optimized for throughput
[ sinks . kafka . batch ]
timeout_secs = 5.0
max_events = 10000
max_bytes = 10485760 # 10MB
# Optimized for latency
[ sinks . kafka . batch ]
timeout_secs = 0.1
max_events = 100
max_bytes = 102400 # 100KB
Timeouts
Socket timeout in milliseconds for network requests. socket_timeout_ms = 30000 # 30 seconds
Local message timeout in milliseconds. Maximum time a message can wait in the producer queue. message_timeout_ms = 60000 # 1 minute
Rate Limiting
Time window for rate limiting in seconds.
rate_limit_num
integer
default: "i64::MAX"
Maximum number of requests allowed within the rate limit window. rate_limit_duration_secs = 1
rate_limit_num = 100 # Max 100 requests per second
Event field containing a map of Kafka headers to include with each message.
Then in your events:
{
"message" : "log data" ,
"headers" : {
"trace-id" : "abc123" ,
"span-id" : "def456"
}
}
Advanced librdkafka Options
Pass any librdkafka configuration option directly. See librdkafka configuration . [ sinks . kafka . librdkafka_options ]
" client.id " = "vector-producer"
" message.send.max.retries " = "3"
" enable.idempotence " = "true"
" acks " = "all"
Common options:
[ sinks . kafka . librdkafka_options ]
# Producer reliability
" acks " = "all" # Wait for all replicas
" enable.idempotence " = "true" # Exactly-once semantics
" max.in.flight.requests.per.connection " = "5"
# Performance tuning
" linger.ms " = "100" # Wait up to 100ms to batch
" compression.type " = "snappy"
" buffer.memory " = "67108864" # 64MB buffer
# Client identification
" client.id " = "vector-${HOSTNAME}"
Complete Examples
Basic Configuration
[ sinks . kafka_basic ]
type = "kafka"
inputs = [ "logs" ]
bootstrap_servers = "kafka:9092"
topic = "application-logs"
encoding.codec = "json"
compression = "snappy"
[ sinks . kafka_basic . batch ]
timeout_secs = 1
max_events = 1000
With SASL Authentication
[ sinks . kafka_secure ]
type = "kafka"
inputs = [ "logs" ]
bootstrap_servers = "kafka-1:9093,kafka-2:9093,kafka-3:9093"
topic = "secure-logs"
encoding.codec = "json"
compression = "zstd"
[ sinks . kafka_secure . sasl ]
enabled = true
username = "${KAFKA_USERNAME}"
password = "${KAFKA_PASSWORD}"
mechanism = "SCRAM-SHA-512"
[ sinks . kafka_secure . tls ]
enabled = true
ca_file = "/etc/kafka/ca.pem"
verify_certificate = true
verify_hostname = true
Dynamic Topic Routing
[ sinks . kafka_routing ]
type = "kafka"
inputs = [ "classified_logs" ]
bootstrap_servers = "kafka:9092"
# Route to different topics based on log level
topic = "logs-{{ level }}"
# Use host as partition key for ordering
key_field = "host"
encoding.codec = "json"
compression = "snappy"
High-Throughput Configuration
[ sinks . kafka_high_volume ]
type = "kafka"
inputs = [ "metrics" ]
bootstrap_servers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topic = "metrics"
encoding.codec = "json"
compression = "lz4" # Fastest compression
# Large batches for throughput
[ sinks . kafka_high_volume . batch ]
timeout_secs = 5
max_events = 10000
max_bytes = 10485760 # 10MB
# Performance tuning
[ sinks . kafka_high_volume . librdkafka_options ]
" linger.ms " = "1000"
" batch.size " = "1000000"
" buffer.memory " = "134217728" # 128MB
" compression.type " = "lz4"
Exactly-Once Semantics
[ sinks . kafka_exactly_once ]
type = "kafka"
inputs = [ "critical_events" ]
bootstrap_servers = "kafka:9092"
topic = "critical-events"
encoding.codec = "json"
# Enable idempotence and acknowledgements
[ sinks . kafka_exactly_once . librdkafka_options ]
" enable.idempotence " = "true"
" acks " = "all"
" max.in.flight.requests.per.connection " = "5"
# Use event ID to prevent duplicates
key_field = "event_id"
Multi-Datacenter Setup
# DC1 Kafka cluster
[ sinks . kafka_dc1 ]
type = "kafka"
inputs = [ "dc1_logs" ]
bootstrap_servers = "dc1-kafka-1:9092,dc1-kafka-2:9092"
topic = "logs"
encoding.codec = "json"
compression = "snappy"
# DC2 Kafka cluster
[ sinks . kafka_dc2 ]
type = "kafka"
inputs = [ "dc2_logs" ]
bootstrap_servers = "dc2-kafka-1:9092,dc2-kafka-2:9092"
topic = "logs"
encoding.codec = "json"
compression = "snappy"
Troubleshooting
Connection Issues
If you can’t connect to Kafka:
Verify bootstrap_servers are correct and reachable
Check network connectivity and firewall rules
Ensure Kafka is listening on the specified ports
Verify authentication credentials if SASL is enabled
Check TLS configuration matches broker settings
Authentication Failures
For SASL/TLS errors:
Verify SASL mechanism matches broker configuration
Check username and password are correct
Ensure CA certificate is valid and trusted
Verify client certificates are not expired
Check broker logs for authentication errors
Messages Not Appearing
If messages aren’t in Kafka:
Verify topic exists (create if needed)
Check Vector logs for errors
Ensure batch timeout isn’t too long
Verify topic configuration allows writes
Check Kafka broker logs
Enable compression to reduce network usage
Increase batch size for higher throughput
Tune librdkafka options like linger.ms and batch.size
Use faster compression (lz4 > snappy > gzip > zstd)
Increase buffer.memory for high-volume scenarios
Add more Kafka brokers for horizontal scaling
Partition topics for parallel consumption
Best Practices
Use compression (snappy or lz4 recommended) to reduce network bandwidth
Set appropriate batch sizes to balance latency and throughput
Use partition keys (key_field) for message ordering guarantees
Enable SASL and TLS in production environments
Use dynamic topics for flexible routing based on event data
Configure acks=all for critical data
Enable idempotence for exactly-once semantics
Monitor lag on consumer side
Set realistic timeouts based on network conditions
Use separate topics for different data types or priorities
For Maximum Throughput
compression = "lz4"
[ sinks . kafka . batch ]
timeout_secs = 5
max_events = 10000
max_bytes = 10485760
[ sinks . kafka . librdkafka_options ]
" linger.ms " = "1000"
" batch.size " = "1000000"
" compression.type " = "lz4"
For Minimum Latency
compression = "none"
[ sinks . kafka . batch ]
timeout_secs = 0.1
max_events = 100
max_bytes = 102400
[ sinks . kafka . librdkafka_options ]
" linger.ms " = "0"
For Reliability
[ sinks . kafka . librdkafka_options ]
" enable.idempotence " = "true"
" acks " = "all"
" retries " = "2147483647"
" max.in.flight.requests.per.connection " = "5"
See Also