Skip to main content
Iceberg supports batch and streaming writes with Apache Flink’s DataStream API and Table API. The Flink Iceberg sink guarantees exactly-once semantics.

Writing with SQL

Iceberg supports both INSERT INTO and INSERT OVERWRITE.

INSERT INTO

To append new data to a table with a Flink streaming job, use INSERT INTO:
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');

INSERT INTO `hive_catalog`.`default`.`sample` 
  SELECT id, data FROM other_kafka_table;

INSERT OVERWRITE

To replace data in the table with the result of a query, use INSERT OVERWRITE in batch job. Overwrites are atomic operations for Iceberg tables.
Flink streaming jobs do not support INSERT OVERWRITE.
Partitions that have rows produced by the SELECT query will be replaced:
INSERT OVERWRITE sample VALUES (1, 'a');
Iceberg also supports overwriting given partitions by the select values:
INSERT OVERWRITE `hive_catalog`.`default`.`sample` 
  PARTITION(data='a') SELECT 6;
For a partitioned table, when all partition columns are set in the PARTITION clause, it’s a static partition insert. If only some partition columns are set, it’s a dynamic partition insert. For an unpartitioned table, its data will be completely overwritten by INSERT OVERWRITE.

UPSERT

Iceberg supports UPSERT based on the primary key when writing data into v2 table format. There are two ways to enable upsert:

Table-Level Property

Enable the UPSERT mode as table-level property write.upsert.enabled:
CREATE TABLE `hive_catalog`.`default`.`sample` (
    `id` INT COMMENT 'unique id',
    `data` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
    'format-version'='2',
    'write.upsert.enabled'='true'
);

Write Option

Enable UPSERT mode using upsert-enabled in the write options:
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;
OVERWRITE and UPSERT modes are mutually exclusive and cannot be enabled at the same time. When using UPSERT mode with a partitioned table, source columns of corresponding partition fields must be included in the equality fields.

Writing with DataStream

Appending Data

Flink supports writing DataStream<RowData> and DataStream<Row> to Iceberg tables:
StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable(
    "hdfs://nn:8020/warehouse/path", 
    hadoopConf
);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .append();

env.execute("Test Iceberg DataStream");

Overwrite Data

Set the overwrite flag in FlinkSink builder to overwrite data:
StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable(
    "hdfs://nn:8020/warehouse/path",
    hadoopConf
);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .overwrite(true)
    .append();

env.execute("Test Iceberg DataStream");

Upsert Data

Set the upsert flag in FlinkSink builder to upsert data. The table must use v2 table format and have a primary key:
StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable(
    "hdfs://nn:8020/warehouse/path",
    hadoopConf
);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .upsert(true)
    .append();

env.execute("Test Iceberg DataStream");

Branch Writes

Writing to branches in Iceberg tables is supported via the toBranch API:
FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .toBranch("audit-branch")
    .append();
For more information on branches, refer to branching.

Write Metrics

The Flink Iceberg sink provides metrics for monitoring write operations.

Writer Metrics

Parallel writer metrics are added under the sub group of IcebergStreamWriter. Tags:
  • table: full table name (like iceberg.my_db.my_table)
  • subtask_index: writer subtask index starting from 0
Metric NameTypeDescription
lastFlushDurationMsGaugeDuration (in ms) that writer subtasks take to flush and upload files during checkpoint
flushedDataFilesCounterNumber of data files flushed and uploaded
flushedDeleteFilesCounterNumber of delete files flushed and uploaded
flushedReferencedDataFilesCounterNumber of data files referenced by the flushed delete files
dataFilesSizeHistogramHistogramHistogram distribution of data file sizes (in bytes)
deleteFilesSizeHistogramHistogramHistogram distribution of delete file sizes (in bytes)

Committer Metrics

Committer metrics are added under the sub group of IcebergFilesCommitter. Tags:
  • table: full table name (like iceberg.my_db.my_table)
Metric NameTypeDescription
lastCheckpointDurationMsGaugeDuration (in ms) that the committer operator checkpoints its state
lastCommitDurationMsGaugeDuration (in ms) that the Iceberg table commit takes
committedDataFilesCountCounterNumber of data files committed
committedDataFilesRecordCountCounterNumber of records in the committed data files
committedDataFilesByteCountCounterNumber of bytes in the committed data files
committedDeleteFilesCountCounterNumber of delete files committed
committedDeleteFilesRecordCountCounterNumber of records in the committed delete files
committedDeleteFilesByteCountCounterNumber of bytes in the committed delete files
elapsedSecondsSinceLastSuccessfulCommitGaugeElapsed time (in seconds) since last successful Iceberg commit
elapsedSecondsSinceLastSuccessfulCommit is an ideal alerting metric to detect failed or missing Iceberg commits. If the checkpoint interval is 5 minutes, set up an alert with a rule like elapsedSecondsSinceLastSuccessfulCommit > 3600 to detect issues.

Write Options

Flink write options are passed when configuring the FlinkSink:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
For Flink SQL, write options can be passed via SQL hints:
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;
Check out all the options at write-options.

Distribution Mode

Flink streaming writer supports both HASH and RANGE distribution modes.

Hash Distribution

HASH distribution shuffles data by partition key (partitioned table) or equality fields (non-partitioned table). Limitations:
  • Doesn’t handle skewed data well
  • Can result in unbalanced traffic distribution if cardinality is low
  • Writer parallelism is limited to the cardinality of the hash key

Range Distribution (Experimental)

RANGE distribution shuffles data by partition key or sort order via a custom range partitioner. It collects traffic statistics to evenly distribute traffic to writer tasks.
Range distribution only shuffles the data via range partitioner. Rows are not sorted within a data file.

Use Cases

  • Tables partitioned by event time with skewed distribution
  • Tables partitioned by country code with varying traffic
  • Tables where queries include predicates on non-partition columns

Usage

Enable range distribution in Java:
FlinkSink.forRowData(input)
    .distributionMode(DistributionMode.RANGE)
    .rangeDistributionStatisticsType(StatisticsType.Auto)
    .rangeDistributionSortKeyBaseWeight(0.0d)
    .append();

Sink V2 Implementation

In Flink 1.15, the SinkV2 interface was introduced. A new IcebergSink implementation based on SinkV2 is available.

Writing with SQL

To turn on SinkV2 based implementation in SQL:
SET table.exec.iceberg.use-v2-sink = true;

Writing with DataStream

To use SinkV2 based implementation, replace FlinkSink with IcebergSink:
Differences from FlinkSink:
  • The RANGE distribution mode is not yet available for IcebergSink
  • Use uidSuffix instead of uidPrefix

Important Notes

Flink streaming write jobs rely on snapshot summary to keep the last committed checkpoint ID. Therefore, expiring snapshots and deleting orphan files could corrupt the state of the Flink job. Make sure to:
  • Keep the last snapshot created by the Flink job (identified by the flink.job-id property)
  • Only delete orphan files that are old enough

Next Steps

Configuration

Configure write options for Flink

Maintenance

Maintain Iceberg tables with Flink

Build docs developers (and LLMs) love