Skip to main content
Kafka Connect is a popular framework for moving data in and out of Apache Kafka via connectors. There are many different connectors available, such as the S3 sink for writing data from Kafka to S3 and Debezium source connectors for writing change data capture records from relational databases to Kafka. It has a straightforward, decentralized, distributed architecture. A cluster consists of a number of worker processes, and a connector runs tasks on these processes to perform the work. Connector deployment is configuration driven, so generally no code needs to be written to run a connector.

Apache Iceberg Sink Connector

The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.

Features

Commit Coordination

Centralized Iceberg commits for consistency

Exactly-Once Delivery

Guarantees no duplicate or lost data

Multi-Table Fan-Out

Route records to different tables dynamically

Auto Table Creation

Automatically create tables and evolve schemas
  • Commit coordination for centralized Iceberg commits
  • Exactly-once delivery semantics
  • Multi-table fan-out
  • Automatic table creation and schema evolution
  • Field name mapping via Iceberg’s column mapping functionality

Installation

The connector zip archive is created as part of the Iceberg build. You can run the build via:
./gradlew -x test -x integrationTest clean build
The zip archive will be found under ./kafka-connect/kafka-connect-runtime/build/distributions. There is one distribution that bundles the Hive Metastore client and related dependencies, and one that does not.
Copy the distribution archive into the Kafka Connect plugins directory on all nodes.

Requirements

The sink relies on KIP-447 for exactly-once semantics. This requires Kafka 2.5 or later.

Configuration

Core Properties

PropertyDescription
iceberg.tablesComma-separated list of destination tables
iceberg.tables.dynamic-enabledSet to true to route to a table specified in routeField instead of using routeRegex, default is false
iceberg.tables.route-fieldFor multi-table fan-out, the name of the field used to route records to tables
iceberg.tables.default-commit-branchDefault branch for commits, main is used if not specified
iceberg.tables.default-id-columnsDefault comma-separated list of columns that identify a row in tables (primary key)
iceberg.tables.default-partition-byDefault comma-separated list of partition field names to use when creating tables
iceberg.tables.auto-create-enabledSet to true to automatically create destination tables, default is false
iceberg.tables.evolve-schema-enabledSet to true to add any missing record fields to the table schema, default is false
iceberg.tables.schema-force-optionalSet to true to set columns as optional during table create and evolution, default is false to respect schema
iceberg.tables.schema-case-insensitiveSet to true to look up table columns by case-insensitive name, default is false for case-sensitive
iceberg.tables.auto-create-props.*Properties set on new tables during auto-create
iceberg.tables.write-props.*Properties passed through to Iceberg writer initialization, these take precedence

Table-Specific Properties

PropertyDescription
iceberg.table.<table-name>.commit-branchTable-specific branch for commits, use iceberg.tables.default-commit-branch if not specified
iceberg.table.<table-name>.id-columnsComma-separated list of columns that identify a row in the table (primary key)
iceberg.table.<table-name>.partition-byComma-separated list of partition fields to use when creating the table
iceberg.table.<table-name>.route-regexThe regex used to match a record’s routeField to a table

Control Topic Properties

PropertyDescription
iceberg.control.topicName of the control topic, default is control-iceberg
iceberg.control.group-id-prefixPrefix for the control consumer group, default is cg-control
iceberg.control.commit.interval-msCommit interval in msec, default is 300,000 (5 min)
iceberg.control.commit.timeout-msCommit timeout interval in msec, default is 30,000 (30 sec)
iceberg.control.commit.threadsNumber of threads to use for commits, default is (cores × 2)

Catalog and Hadoop Properties

PropertyDescription
iceberg.coordinator.transactional.prefixPrefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix
iceberg.catalogName of the catalog, default is iceberg
iceberg.catalog.*Properties passed through to Iceberg catalog initialization
iceberg.hadoop-conf-dirIf specified, Hadoop config files in this directory will be loaded
iceberg.hadoop.*Properties passed through to the Hadoop configuration
iceberg.kafka.*Properties passed through to control topic Kafka client initialization
If iceberg.tables.dynamic-enabled is false (the default) then you must specify iceberg.tables. If iceberg.tables.dynamic-enabled is true then you must specify iceberg.tables.route-field which will contain the name of the table.

Catalog Configuration

The iceberg.catalog.* properties are required for connecting to the Iceberg catalog. The core catalog types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie, JDBC, Hive and BigQuery Metastore.
JDBC drivers are not included in the default distribution, so you will need to include those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client, otherwise you will need to include that yourself.
To set the catalog type, you can set iceberg.catalog.type to rest, hive, or hadoop. For other catalog types, you need to instead set iceberg.catalog.catalog-impl to the name of the catalog class.

REST Example

"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog-service",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse>"

Hive Example

Use the distribution that includes the HMS client (or include the HMS client yourself). Use S3FileIO when using S3 for storage and GCSFileIO when using GCS (the default is HadoopFileIO with HiveCatalog).
"iceberg.catalog.type": "hive",
"iceberg.catalog.uri": "thrift://hive:9083",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.s3.access-key-id": "<AWS access>",
"iceberg.catalog.s3.secret-access-key": "<AWS secret>"

Glue Example

"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"

Nessie Example

"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
"iceberg.catalog.uri": "http://localhost:19120/api/v2",
"iceberg.catalog.ref": "main",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"

BigQuery Metastore Example

"iceberg.catalog.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
"iceberg.catalog.gcp.bigquery.project-id": "my-project",
"iceberg.catalog.gcp.bigquery.location": "us-east1",
"iceberg.catalog.warehouse": "gs://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO",
"iceberg.tables.auto-create-props.bq_connection": "projects/my-project/locations/us-east1/connections/my-connection"
Depending on your setup, you may need to also set iceberg.catalog.s3.endpoint, iceberg.catalog.s3.staging-dir, or iceberg.catalog.s3.path-style-access. See the Iceberg docs for full details on configuring catalogs.

Cloud Storage Configuration

Azure ADLS Configuration

When using ADLS, Azure requires the passing of AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_CLIENT_SECRET for its Java SDK. If you’re running Kafka Connect in a container, be sure to inject those values as environment variables.
AZURE_CLIENT_ID=e564f687-7b89-4b48-80b8-111111111111
AZURE_TENANT_ID=95f2f365-f5b7-44b1-88a1-111111111111
AZURE_CLIENT_SECRET="XXX"
Where:
  • CLIENT_ID is the Application ID from App Registrations
  • TENANT_ID is from your Azure Tenant Properties
  • CLIENT_SECRET is created within “Certificates & Secrets”
Ensure the App Registration is granted the Role Assignment “Storage Blob Data Contributor” in your Storage Account’s Access Control (IAM).
Connector configuration:
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog:8181",
"iceberg.catalog.warehouse": "abfss://[email protected]/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
"iceberg.catalog.include-credentials": "true"

Google GCS Configuration

By default, Application Default Credentials (ADC) will be used to connect to GCS. Details on how ADC works can be found in the Google Cloud documentation.
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog:8181",
"iceberg.catalog.warehouse": "gs://bucket-name/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO"

Hadoop Configuration

When using HDFS or Hive, the sink will initialize the Hadoop configuration. First, config files from the classpath are loaded. Next, if iceberg.hadoop-conf-dir is specified, config files are loaded from that location. Finally, any iceberg.hadoop.* properties from the sink config are applied.
When merging these, the order of precedence is: sink config > config dir > classpath.

Examples

Initial Setup

1

Create source topic

This assumes the source topic already exists and is named events.
2

Create control topic

If your Kafka cluster has auto.create.topics.enable set to true (the default), then the control topic will be automatically created. If not, create it manually:
bin/kafka-topics.sh \
  --command-config command-config.props \
  --bootstrap-server ${CONNECT_BOOTSTRAP_SERVERS} \
  --create \
  --topic control-iceberg \
  --partitions 1
Clusters running on Confluent Cloud have auto.create.topics.enable set to false by default.
3

Configure Iceberg catalog

Configuration properties with the prefix iceberg.catalog. will be passed to Iceberg catalog initialization. See the Iceberg docs for details on how to configure a particular catalog.

Single Destination Table

This example writes all incoming records to a single table.
1

Create the destination table

CREATE TABLE default.events (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts))
2

Configure connector

This example config connects to an Iceberg REST catalog:
{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables": "default.events",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

Multi-Table Fan-Out (Static Routing)

This example writes records with type set to list to the table default.events_list, and writes records with type set to create to the table default.events_create. Other records will be skipped.
1

Create two destination tables

CREATE TABLE default.events_list (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts));

CREATE TABLE default.events_create (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts));
2

Configure connector with routing

{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables": "default.events_list,default.events_create",
    "iceberg.tables.route-field": "type",
    "iceberg.table.default.events_list.route-regex": "list",
    "iceberg.table.default.events_create.route-regex": "create",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

Multi-Table Fan-Out (Dynamic Routing)

This example writes to tables with names from the value in the db_table field. If a table with the name does not exist, then the record will be skipped. For example, if the record’s db_table field is set to default.events_list, then the record is written to the default.events_list table.
{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables.dynamic-enabled": "true",
    "iceberg.tables.route-field": "db_table",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs (Single Message Transforms) that could be useful when transforming Kafka data for use by the Iceberg sink connector.

CopyValue

This SMT is experimental.
The CopyValue SMT copies a value from one field to a new field. Configuration:
PropertyDescription
source.fieldSource field name
target.fieldTarget field name
Example:
"transforms": "copyId",
"transforms.copyId.type": "org.apache.iceberg.connect.transforms.CopyValue",
"transforms.copyId.source.field": "id",
"transforms.copyId.target.field": "id_copy"

DmsTransform

This SMT is experimental.
The DmsTransform SMT transforms an AWS DMS formatted message for use by the sink’s CDC feature. It will promote the data element fields to top level and add the following metadata fields: _cdc.op, _cdc.ts, and _cdc.source.

DebeziumTransform

This SMT is experimental.
The DebeziumTransform SMT transforms a Debezium formatted message for use by the sink’s CDC feature. It will promote the before or after element fields to top level and add the following metadata fields: _cdc.op, _cdc.ts, _cdc.offset, _cdc.source, _cdc.target, and _cdc.key. Configuration:
PropertyDescription
cdc.target.patternPattern to use for setting the CDC target field value, default is {db}.{table}

JsonToMapTransform

This SMT is experimental.
The JsonToMapTransform SMT parses Strings as JSON object payloads to infer schemas. The iceberg-kafka-connect connector for schema-less data (e.g. the Map produced by the Kafka supplied JsonConverter) is to convert Maps into Iceberg Structs. This is fine when the JSON is well-structured, but when you have JSON objects with dynamically changing keys, it will lead to an explosion of columns in the Iceberg table due to schema evolutions.
You must use the stringConverter as the value.converter setting for your connector, not jsonConverter. It expects JSON objects ({...}) in those strings. Message keys, tombstones, and headers are not transformed and are passed along as-is by the SMT.
Configuration:
PropertyDescription (default value)
json.root(false) Boolean value to start at root

KafkaMetadataTransform

This SMT is experimental.
The KafkaMetadata injects topic, partition, offset, timestamp which are properties of the Kafka message. Configuration:
PropertyDescription (default value)
field_name(_kafka_metadata) prefix for fields
nested(false) if true, nests data on a struct else adds to top level as prefixed fields
external_field(none) appends a constant key,value to the metadata (e.g. cluster name)

MongoDebeziumTransform

This SMT is experimental.
The MongoDebeziumTransform SMT transforms a Mongo Debezium formatted message with before/after BSON strings into before/after typed Structs that the DebeziumTransform SMT expects.
It does not (yet) support renaming columns if MongoDB column is not supported by your underlying catalog type.
Configuration:
PropertyDescription
array_handling_modearray or document to set array handling mode
Value array (the default) will encode arrays as the array datatype. It is user’s responsibility to ensure that all elements for a given array instance are of the same type. This option is a restricting one but offers easy processing of arrays by downstream clients. Value document will convert the array into a struct of structs in the similar way as done by BSON serialization. The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array. Every element is then passed as the value for the given field.

Build docs developers (and LLMs) love