Apache Iceberg Sink Connector
The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.Features
Commit Coordination
Centralized Iceberg commits for consistency
Exactly-Once Delivery
Guarantees no duplicate or lost data
Multi-Table Fan-Out
Route records to different tables dynamically
Auto Table Creation
Automatically create tables and evolve schemas
- Commit coordination for centralized Iceberg commits
- Exactly-once delivery semantics
- Multi-table fan-out
- Automatic table creation and schema evolution
- Field name mapping via Iceberg’s column mapping functionality
Installation
The connector zip archive is created as part of the Iceberg build. You can run the build via:./kafka-connect/kafka-connect-runtime/build/distributions. There is one distribution that bundles the Hive Metastore client and related dependencies, and one that does not.
Requirements
The sink relies on KIP-447 for exactly-once semantics. This requires Kafka 2.5 or later.
Configuration
Core Properties
| Property | Description |
|---|---|
iceberg.tables | Comma-separated list of destination tables |
iceberg.tables.dynamic-enabled | Set to true to route to a table specified in routeField instead of using routeRegex, default is false |
iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
iceberg.tables.auto-create-enabled | Set to true to automatically create destination tables, default is false |
iceberg.tables.evolve-schema-enabled | Set to true to add any missing record fields to the table schema, default is false |
iceberg.tables.schema-force-optional | Set to true to set columns as optional during table create and evolution, default is false to respect schema |
iceberg.tables.schema-case-insensitive | Set to true to look up table columns by case-insensitive name, default is false for case-sensitive |
iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
Table-Specific Properties
| Property | Description |
|---|---|
iceberg.table.<table-name>.commit-branch | Table-specific branch for commits, use iceberg.tables.default-commit-branch if not specified |
iceberg.table.<table-name>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
iceberg.table.<table-name>.partition-by | Comma-separated list of partition fields to use when creating the table |
iceberg.table.<table-name>.route-regex | The regex used to match a record’s routeField to a table |
Control Topic Properties
| Property | Description |
|---|---|
iceberg.control.topic | Name of the control topic, default is control-iceberg |
iceberg.control.group-id-prefix | Prefix for the control consumer group, default is cg-control |
iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
iceberg.control.commit.threads | Number of threads to use for commits, default is (cores × 2) |
Catalog and Hadoop Properties
| Property | Description |
|---|---|
iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
iceberg.catalog | Name of the catalog, default is iceberg |
iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
If
iceberg.tables.dynamic-enabled is false (the default) then you must specify iceberg.tables. If iceberg.tables.dynamic-enabled is true then you must specify iceberg.tables.route-field which will contain the name of the table.Catalog Configuration
Theiceberg.catalog.* properties are required for connecting to the Iceberg catalog. The core catalog types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie, JDBC, Hive and BigQuery Metastore.
To set the catalog type, you can set iceberg.catalog.type to rest, hive, or hadoop. For other catalog types, you need to instead set iceberg.catalog.catalog-impl to the name of the catalog class.
REST Example
Hive Example
Use the distribution that includes the HMS client (or include the HMS client yourself). Use
S3FileIO when using S3 for storage and GCSFileIO when using GCS (the default is HadoopFileIO with HiveCatalog).Glue Example
Nessie Example
BigQuery Metastore Example
Cloud Storage Configuration
Azure ADLS Configuration
When using ADLS, Azure requires the passing ofAZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_CLIENT_SECRET for its Java SDK. If you’re running Kafka Connect in a container, be sure to inject those values as environment variables.
Example Azure environment variables
Example Azure environment variables
CLIENT_IDis the Application ID from App RegistrationsTENANT_IDis from your Azure Tenant PropertiesCLIENT_SECRETis created within “Certificates & Secrets”
Google GCS Configuration
By default, Application Default Credentials (ADC) will be used to connect to GCS. Details on how ADC works can be found in the Google Cloud documentation.Hadoop Configuration
When using HDFS or Hive, the sink will initialize the Hadoop configuration. First, config files from the classpath are loaded. Next, ificeberg.hadoop-conf-dir is specified, config files are loaded from that location. Finally, any iceberg.hadoop.* properties from the sink config are applied.
When merging these, the order of precedence is: sink config > config dir > classpath.
Examples
Initial Setup
Create control topic
If your Kafka cluster has
auto.create.topics.enable set to true (the default), then the control topic will be automatically created. If not, create it manually:Clusters running on Confluent Cloud have
auto.create.topics.enable set to false by default.Single Destination Table
This example writes all incoming records to a single table.Multi-Table Fan-Out (Static Routing)
This example writes records withtype set to list to the table default.events_list, and writes records with type set to create to the table default.events_create. Other records will be skipped.
Multi-Table Fan-Out (Dynamic Routing)
This example writes to tables with names from the value in thedb_table field. If a table with the name does not exist, then the record will be skipped. For example, if the record’s db_table field is set to default.events_list, then the record is written to the default.events_list table.
SMTs for the Apache Iceberg Sink Connector
This project contains some SMTs (Single Message Transforms) that could be useful when transforming Kafka data for use by the Iceberg sink connector.CopyValue
This SMT is experimental.
CopyValue SMT copies a value from one field to a new field.
Configuration:
| Property | Description |
|---|---|
source.field | Source field name |
target.field | Target field name |
DmsTransform
This SMT is experimental.
DmsTransform SMT transforms an AWS DMS formatted message for use by the sink’s CDC feature. It will promote the data element fields to top level and add the following metadata fields: _cdc.op, _cdc.ts, and _cdc.source.
DebeziumTransform
This SMT is experimental.
DebeziumTransform SMT transforms a Debezium formatted message for use by the sink’s CDC feature. It will promote the before or after element fields to top level and add the following metadata fields: _cdc.op, _cdc.ts, _cdc.offset, _cdc.source, _cdc.target, and _cdc.key.
Configuration:
| Property | Description |
|---|---|
cdc.target.pattern | Pattern to use for setting the CDC target field value, default is {db}.{table} |
JsonToMapTransform
This SMT is experimental.
JsonToMapTransform SMT parses Strings as JSON object payloads to infer schemas. The iceberg-kafka-connect connector for schema-less data (e.g. the Map produced by the Kafka supplied JsonConverter) is to convert Maps into Iceberg Structs.
This is fine when the JSON is well-structured, but when you have JSON objects with dynamically changing keys, it will lead to an explosion of columns in the Iceberg table due to schema evolutions.
Configuration:
| Property | Description (default value) |
|---|---|
json.root | (false) Boolean value to start at root |
KafkaMetadataTransform
This SMT is experimental.
KafkaMetadata injects topic, partition, offset, timestamp which are properties of the Kafka message.
Configuration:
| Property | Description (default value) |
|---|---|
field_name | (_kafka_metadata) prefix for fields |
nested | (false) if true, nests data on a struct else adds to top level as prefixed fields |
external_field | (none) appends a constant key,value to the metadata (e.g. cluster name) |
MongoDebeziumTransform
This SMT is experimental.
MongoDebeziumTransform SMT transforms a Mongo Debezium formatted message with before/after BSON strings into before/after typed Structs that the DebeziumTransform SMT expects.
It does not (yet) support renaming columns if MongoDB column is not supported by your underlying catalog type.
| Property | Description |
|---|---|
array_handling_mode | array or document to set array handling mode |
array (the default) will encode arrays as the array datatype. It is user’s responsibility to ensure that all elements for a given array instance are of the same type. This option is a restricting one but offers easy processing of arrays by downstream clients.
Value document will convert the array into a struct of structs in the similar way as done by BSON serialization. The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array. Every element is then passed as the value for the given field.