The Kafka connector lets you read from and write to Apache Kafka topics using Flink SQL and the Table API. It is maintained in a separate repository:
Repository: apache/flink-connector-kafka
The Kafka connector is externalized from the main Flink repository. Add the flink-sql-connector-kafka dependency to your project to use it.
Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink-connector-kafka.version}</version>
</dependency>
Source table
The Kafka connector creates an unbounded scan source that continuously reads records from one or more Kafka topics.
CREATE TABLE user_events (
user_id BIGINT,
item_id BIGINT,
category STRING,
behavior STRING,
event_ts TIMESTAMP(3) METADATA FROM 'timestamp',
proctime AS PROCTIME(),
WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-consumer-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
Scan startup modes
The scan.startup.mode option controls where the Kafka consumer begins reading:
| Mode | Description |
|---|
earliest-offset | Start from the earliest available offset in each partition. |
latest-offset | Start from the latest offset, reading only new records. |
group-offsets | Resume from the committed offset of the consumer group. Falls back to latest-offset if no committed offset exists. |
timestamp | Start from the first offset whose timestamp is greater than or equal to scan.startup.timestamp-millis. |
specific-offsets | Start from a per-partition offset specified in scan.startup.specific-offsets. |
The Kafka connector exposes the following metadata fields:
| Key | Type | Description |
|---|
topic | STRING NOT NULL | Name of the topic the record belongs to. |
partition | INT NOT NULL | Partition ID. |
headers | MAP<STRING, BYTES> | Kafka record headers. |
leader-epoch | INT | Leader epoch of the partition. |
offset | BIGINT NOT NULL | Offset within the partition. |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Kafka record timestamp. |
timestamp-type | STRING NOT NULL | Timestamp type: NoTimestampType, CreateTime, or LogAppendTime. |
Sink table
The Kafka connector creates a streaming sink that appends records to a Kafka topic.
CREATE TABLE order_events_sink (
order_id BIGINT,
product STRING,
amount DECIMAL(10, 2),
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'order_events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
Insert into the sink from a source table:
INSERT INTO order_events_sink
SELECT order_id, product, amount, created_at
FROM orders_source
WHERE amount > 0;
CREATE TABLE avro_events (
user_id BIGINT,
message STRING,
event_ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'avro_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'avro-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'avro'
)
To configure a separate key format (for example, for upsert semantics), use the key.format and value.format options:
CREATE TABLE keyed_events (
user_id BIGINT,
payload STRING
) WITH (
'connector' = 'kafka',
'topic' = 'keyed_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'key.fields' = 'user_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
Key connector options
| Option | Required | Default | Description |
|---|
connector | Yes | — | Must be 'kafka'. |
topic | Yes (source) | — | Topic name(s) to read from. Use semicolons to separate multiple topics. |
topic-pattern | No | — | Java regex pattern to match topic names dynamically. |
properties.bootstrap.servers | Yes | — | Comma-separated list of Kafka broker addresses. |
properties.group.id | Yes (source) | — | Consumer group ID. |
scan.startup.mode | No | group-offsets | Where to start reading. |
scan.startup.timestamp-millis | No | — | Start timestamp in epoch milliseconds. Used with scan.startup.mode = 'timestamp'. |
scan.startup.specific-offsets | No | — | Per-partition start offsets, e.g. partition:0,offset:42;partition:1,offset:300. |
format | Yes (if key.format / value.format are not set) | — | Format for both key and value. |
key.format | No | — | Format for the message key. |
key.fields | No | — | Semicolon-separated list of fields to include in the key. |
value.format | No | — | Format for the message value. |
value.fields-include | No | ALL | Which fields to include in the value: ALL or EXCEPT_KEY. |
sink.partitioner | No | default | Partitioning strategy for the sink: default, fixed, round-robin, or a custom class name. |
sink.delivery-guarantee | No | at-least-once | Delivery guarantee for the sink: at-least-once or exactly-once. Exactly-once requires Kafka transactions. |
sink.transactional-id-prefix | No | — | Required when sink.delivery-guarantee = 'exactly-once'. Prefix for Kafka transactional IDs. |
Exactly-once sink
To use exactly-once delivery from the Kafka sink, enable Kafka transactions and Flink checkpointing:
CREATE TABLE exactly_once_sink (
id BIGINT,
value STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'flink-txn'
)
Exactly-once delivery requires that checkpointing is enabled in your Flink job and that the Kafka broker is version 0.11 or later. Uncommitted transactions remain visible to consumers with isolation level read_uncommitted—set your downstream consumer’s isolation.level to read_committed.