Skip to main content
The Kafka connector lets you read from and write to Apache Kafka topics using Flink SQL and the Table API. It is maintained in a separate repository: Repository: apache/flink-connector-kafka
The Kafka connector is externalized from the main Flink repository. Add the flink-sql-connector-kafka dependency to your project to use it.

Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-connector-kafka</artifactId>
  <version>${flink-connector-kafka.version}</version>
</dependency>

Source table

The Kafka connector creates an unbounded scan source that continuously reads records from one or more Kafka topics.
CREATE TABLE user_events (
  user_id    BIGINT,
  item_id    BIGINT,
  category   STRING,
  behavior   STRING,
  event_ts   TIMESTAMP(3) METADATA FROM 'timestamp',
  proctime   AS PROCTIME(),
  WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
  'connector'                  = 'kafka',
  'topic'                      = 'user_events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id'        = 'flink-consumer-group',
  'scan.startup.mode'          = 'earliest-offset',
  'format'                     = 'json'
)

Scan startup modes

The scan.startup.mode option controls where the Kafka consumer begins reading:
ModeDescription
earliest-offsetStart from the earliest available offset in each partition.
latest-offsetStart from the latest offset, reading only new records.
group-offsetsResume from the committed offset of the consumer group. Falls back to latest-offset if no committed offset exists.
timestampStart from the first offset whose timestamp is greater than or equal to scan.startup.timestamp-millis.
specific-offsetsStart from a per-partition offset specified in scan.startup.specific-offsets.

Metadata columns

The Kafka connector exposes the following metadata fields:
KeyTypeDescription
topicSTRING NOT NULLName of the topic the record belongs to.
partitionINT NOT NULLPartition ID.
headersMAP<STRING, BYTES>Kafka record headers.
leader-epochINTLeader epoch of the partition.
offsetBIGINT NOT NULLOffset within the partition.
timestampTIMESTAMP_LTZ(3) NOT NULLKafka record timestamp.
timestamp-typeSTRING NOT NULLTimestamp type: NoTimestampType, CreateTime, or LogAppendTime.

Sink table

The Kafka connector creates a streaming sink that appends records to a Kafka topic.
CREATE TABLE order_events_sink (
  order_id   BIGINT,
  product    STRING,
  amount     DECIMAL(10, 2),
  created_at TIMESTAMP(3)
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'order_events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'                       = 'json'
)
Insert into the sink from a source table:
INSERT INTO order_events_sink
SELECT order_id, product, amount, created_at
FROM orders_source
WHERE amount > 0;

Using Avro format

CREATE TABLE avro_events (
  user_id   BIGINT,
  message   STRING,
  event_ts  TIMESTAMP(3)
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'avro_events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id'          = 'avro-group',
  'scan.startup.mode'            = 'latest-offset',
  'format'                       = 'avro'
)

Key and value formats

To configure a separate key format (for example, for upsert semantics), use the key.format and value.format options:
CREATE TABLE keyed_events (
  user_id  BIGINT,
  payload  STRING
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'keyed_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format'                   = 'json',
  'key.fields'                   = 'user_id',
  'value.format'                 = 'json',
  'value.fields-include'         = 'EXCEPT_KEY'
)

Key connector options

OptionRequiredDefaultDescription
connectorYesMust be 'kafka'.
topicYes (source)Topic name(s) to read from. Use semicolons to separate multiple topics.
topic-patternNoJava regex pattern to match topic names dynamically.
properties.bootstrap.serversYesComma-separated list of Kafka broker addresses.
properties.group.idYes (source)Consumer group ID.
scan.startup.modeNogroup-offsetsWhere to start reading.
scan.startup.timestamp-millisNoStart timestamp in epoch milliseconds. Used with scan.startup.mode = 'timestamp'.
scan.startup.specific-offsetsNoPer-partition start offsets, e.g. partition:0,offset:42;partition:1,offset:300.
formatYes (if key.format / value.format are not set)Format for both key and value.
key.formatNoFormat for the message key.
key.fieldsNoSemicolon-separated list of fields to include in the key.
value.formatNoFormat for the message value.
value.fields-includeNoALLWhich fields to include in the value: ALL or EXCEPT_KEY.
sink.partitionerNodefaultPartitioning strategy for the sink: default, fixed, round-robin, or a custom class name.
sink.delivery-guaranteeNoat-least-onceDelivery guarantee for the sink: at-least-once or exactly-once. Exactly-once requires Kafka transactions.
sink.transactional-id-prefixNoRequired when sink.delivery-guarantee = 'exactly-once'. Prefix for Kafka transactional IDs.

Exactly-once sink

To use exactly-once delivery from the Kafka sink, enable Kafka transactions and Flink checkpointing:
CREATE TABLE exactly_once_sink (
  id    BIGINT,
  value STRING
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'output_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'                       = 'json',
  'sink.delivery-guarantee'      = 'exactly-once',
  'sink.transactional-id-prefix' = 'flink-txn'
)
Exactly-once delivery requires that checkpointing is enabled in your Flink job and that the Kafka broker is version 0.11 or later. Uncommitted transactions remain visible to consumers with isolation level read_uncommitted—set your downstream consumer’s isolation.level to read_committed.

Build docs developers (and LLMs) love