Skip to main content
The kafka source reads data from Apache Kafka topics using consumer groups. It supports automatic offset management, acknowledgements, and configurable decoding.

Configuration

[sources.my_kafka]
type = "kafka"
bootstrap_servers = "localhost:9092"
topics = ["logs"]
group_id = "vector"

Parameters

bootstrap_servers
string
required
Comma-separated list of Kafka bootstrap servers in host:port format.
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
topics
array
required
Kafka topic names to consume from. Supports regex patterns starting with ^.
topics = ["topic-1", "topic-2", "^(prefix1|prefix2)-.+"]
group_id
string
required
Kafka consumer group ID.
group_id = "vector-consumer-group"
auto_offset_reset
string
default:"largest"
Where to start reading if no offset exists for the consumer group.Options: smallest, earliest, beginning, largest, latest, end, error
auto_offset_reset = "earliest"
session_timeout_ms
integer
default:"10000"
Kafka session timeout in milliseconds.
session_timeout_ms = 30000
socket_timeout_ms
integer
default:"60000"
Timeout for network requests in milliseconds.
socket_timeout_ms = 30000
fetch_wait_max_ms
integer
default:"100"
Maximum time the broker may wait to fill the response in milliseconds.
fetch_wait_max_ms = 500
commit_interval_ms
integer
default:"5000"
Frequency that consumer offsets are committed in milliseconds.
commit_interval_ms = 10000
key_field
string
default:"message_key"
Field name for the Kafka message key.
key_field = "key"
topic_key
string
default:"topic"
Field name for the Kafka topic name.
topic_key = "kafka_topic"
partition_key
string
default:"partition"
Field name for the Kafka partition.
partition_key = "kafka_partition"
offset_key
string
default:"offset"
Field name for the Kafka message offset.
offset_key = "kafka_offset"
headers_key
string
default:"headers"
Field name for Kafka message headers.
headers_key = "kafka_headers"
librdkafka_options
object
Advanced librdkafka configuration options.
[sources.my_kafka.librdkafka_options]
"client.id" = "vector"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
framing
object
Framing configuration for splitting byte streams into messages.
[sources.my_kafka.framing]
method = "newline_delimited"
decoding
object
Decoding configuration for parsing messages.
[sources.my_kafka.decoding]
codec = "json"
acknowledgements
boolean
default:"false"
Enable end-to-end acknowledgements.
acknowledgements = true
sasl
object
SASL authentication configuration.
[sources.my_kafka.sasl]
enabled = true
mechanism = "PLAIN"
username = "user"
password = "pass"
tls
object
TLS configuration for encrypted connections.
[sources.my_kafka.tls]
enabled = true
ca_file = "/path/to/ca.pem"

Output Schema

The Kafka source produces log events with the following fields:
FieldTypeDescription
messagestringThe decoded message payload
timestamptimestampKafka message timestamp
topicstringKafka topic name
partitionintegerKafka partition
offsetintegerKafka message offset
message_keystringKafka message key (if present)
headersobjectKafka message headers (if present)
source_typestringAlways “kafka”

Examples

Basic Consumer

[sources.kafka_logs]
type = "kafka"
bootstrap_servers = "localhost:9092"
topics = ["application-logs"]
group_id = "vector-logs-consumer"
auto_offset_reset = "earliest"

Multiple Topics with Pattern Matching

[sources.kafka_multi]
type = "kafka"
bootstrap_servers = "kafka1:9092,kafka2:9092,kafka3:9092"
topics = [
  "^prod-.+-logs$",
  "metrics",
  "traces"
]
group_id = "vector-observability"

JSON Decoding

[sources.kafka_json]
type = "kafka"
bootstrap_servers = "localhost:9092"
topics = ["json-logs"]
group_id = "vector"

[sources.kafka_json.decoding]
codec = "json"

SASL Authentication

[sources.kafka_secure]
type = "kafka"
bootstrap_servers = "kafka.example.com:9093"
topics = ["secure-logs"]
group_id = "vector"

[sources.kafka_secure.sasl]
enabled = true
mechanism = "SCRAM-SHA-256"
username = "vector-user"
password = "${KAFKA_PASSWORD}"

[sources.kafka_secure.tls]
enabled = true
ca_file = "/etc/ssl/certs/kafka-ca.pem"

Custom Field Mapping

[sources.kafka_custom]
type = "kafka"
bootstrap_servers = "localhost:9092"
topics = ["logs"]
group_id = "vector"
key_field = "kafka_key"
topic_key = "kafka_topic_name"
partition_key = "kafka_part"
offset_key = "kafka_off"
headers_key = "metadata"

Advanced librdkafka Configuration

[sources.kafka_tuned]
type = "kafka"
bootstrap_servers = "localhost:9092"
topics = ["high-throughput"]
group_id = "vector"

[sources.kafka_tuned.librdkafka_options]
"fetch.min.bytes" = "1048576"
"fetch.wait.max.ms" = "100"
"queued.min.messages" = "100000"
"enable.auto.commit" = "true"
"auto.commit.interval.ms" = "5000"

How It Works

Consumer Groups

The Kafka source uses consumer groups for scalability and fault tolerance. Multiple Vector instances with the same group_id automatically coordinate to distribute partition consumption.

Offset Management

Offsets are automatically committed based on the commit_interval_ms setting. When acknowledgements are enabled, offsets are only committed after successful delivery to all sinks.

Rebalancing

When consumers join or leave the group, Kafka automatically rebalances partition assignments. The source handles rebalancing gracefully with configurable drain timeouts.

Message Ordering

Messages within a partition are processed in order. Across partitions, ordering is not guaranteed.

Performance

  • Highly scalable with parallel partition consumption
  • Performance depends on message size, network latency, and broker configuration
  • Tune fetch_wait_max_ms and librdkafka_options for optimal throughput
  • Consumer lag metrics available via metrics.topic_lag_metric

Best Practices

  1. Use unique group_id values for different Vector deployments
  2. Set auto_offset_reset to earliest to avoid data loss on new topics
  3. Enable acknowledgements for mission-critical data
  4. Monitor consumer lag using Vector’s internal metrics
  5. Use topic regex patterns carefully to avoid unexpected consumption
  6. Configure appropriate session and socket timeouts for your network
  7. Test rebalance behavior under failure scenarios

Build docs developers (and LLMs) love