Skip to main content
This page describes configuration options for Iceberg catalogs and read/write operations in Flink.

Catalog Configuration

A catalog is created and named by executing the following query:
CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
);

Global Properties

The following properties can be set globally and are not limited to a specific catalog implementation:
PropertyRequiredValuesDescription
type✔️icebergMust be iceberg
catalog-typehive, hadoop, rest, glue, jdbc, nessieThe underlying Iceberg catalog implementation
catalog-implThe fully-qualified class name of a custom catalog implementation
property-versionVersion number to describe the property version. Current version is 1
cache-enabledtrue or falseWhether to enable catalog cache, default is true
cache.expiration-interval-msHow long catalog entries are cached (ms); -1 disables expiration, default is -1

Hive Catalog

The following properties can be set if using the Hive catalog:
PropertyRequiredDescription
uri✔️The Hive metastore’s thrift URI
clientsThe Hive metastore client pool size, default is 2
warehouseThe Hive warehouse location
hive-conf-dirPath to a directory containing a hive-site.xml configuration file
hadoop-conf-dirPath to a directory containing core-site.xml and hdfs-site.xml

Hadoop Catalog

The following properties can be set if using the Hadoop catalog:
PropertyRequiredDescription
warehouse✔️The HDFS directory to store metadata files and data files

REST Catalog

The following properties can be set if using the REST catalog:
PropertyRequiredDescription
uri✔️The URL to the REST Catalog
credentialA credential to exchange for a token in the OAuth2 client credentials flow
tokenA token which will be used to interact with the server

Read Options

Flink read options can be passed in multiple ways:

DataStream API

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L))
    .build()

SQL Hints

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
Priority: Read option > Flink configuration > Table property

Available Read Options

Read OptionFlink ConfigurationTable PropertyDefaultDescription
snapshot-idN/AN/AnullFor time travel in batch mode. Read data from the specified snapshot-id
case-sensitiveconnector.iceberg.case-sensitiveN/AfalseIf true, match column name in a case sensitive way
as-of-timestampN/AN/AnullFor time travel in batch mode. Read data from the most recent snapshot as of the given time (ms)
starting-strategyconnector.iceberg.starting-strategyN/AINCREMENTAL_FROM_LATEST_SNAPSHOTStarting strategy for streaming execution
start-snapshot-timestampN/AN/AnullStart to read data from the most recent snapshot as of the given time (ms)
start-snapshot-idN/AN/AnullStart to read data from the specified snapshot-id
end-snapshot-idN/AN/ALatest snapshotSpecifies the end snapshot
branchN/AN/AmainSpecifies the branch to read from in batch mode
tagN/AN/AnullSpecifies the tag to read from in batch mode
start-tagN/AN/AnullSpecifies the starting tag to read from for incremental reads
end-tagN/AN/AnullSpecifies the ending tag to read from for incremental reads
split-sizeconnector.iceberg.split-sizeread.split.target-size128 MBTarget size when combining input splits
split-lookbackconnector.iceberg.split-file-open-costread.split.planning-lookback10Number of bins to consider when combining input splits
split-file-open-costconnector.iceberg.split-file-open-costread.split.open-file-cost4MBThe estimated cost to open a file
streamingconnector.iceberg.streamingN/AfalseSets whether the current task runs in streaming or batch mode
monitor-intervalconnector.iceberg.monitor-intervalN/A60sMonitor interval to discover splits from new snapshots
include-column-statsconnector.iceberg.include-column-statsN/AfalseLoad column stats with each data file
max-planning-snapshot-countconnector.iceberg.max-planning-snapshot-countN/AInteger.MAX_VALUEMax number of snapshots limited per split enumeration
limitconnector.iceberg.limitN/A-1Limited output number of rows
max-allowed-planning-failuresconnector.iceberg.max-allowed-planning-failuresN/A3Max allowed consecutive failures for scan planning
watermark-columnconnector.iceberg.watermark-columnN/AnullSpecifies the watermark column for watermark generation
watermark-column-time-unitconnector.iceberg.watermark-column-time-unitN/ATimeUnit.MICROSECONDSSpecifies the watermark time unit

Write Options

Flink write options can be passed when configuring the FlinkSink:

DataStream API

FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");

SQL Hints

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;

Available Write Options

Flink OptionDefaultDescription
write-formatTable write.format.defaultFile format to use for this write operation; parquet, avro, or orc
target-file-size-bytesAs per table propertyOverrides this table’s write.target-file-size-bytes
upsert-enabledTable write.upsert.enabledOverrides this table’s write.upsert.enabled
overwrite-enabledfalseOverwrite the table’s data
distribution-modeTable write.distribution-modeOverrides this table’s write.distribution-mode
range-distribution-statistics-typeAutoRange distribution data statistics collection type: Map, Sketch, Auto
range-distribution-sort-key-base-weight0.0Base weight for every sort key relative to target traffic weight per writer task
compression-codecTable write.(fileformat).compression-codecOverrides this table’s compression codec for this write
compression-levelTable write.(fileformat).compression-levelOverrides this table’s compression level for Parquet and Avro
compression-strategyTable write.orc.compression-strategyOverrides this table’s compression strategy for ORC
write-parallelismUpstream operator parallelismOverrides the writer parallelism
uid-suffixAs per table propertyOverrides the uid suffix used in the underlying IcebergSink

Range Distribution Statistics Type

  • Map: Collects accurate sampling count for every single key. Use for low cardinality scenarios (hundreds or thousands)
  • Sketch: Constructs uniform random sampling via reservoir sampling. Use for high cardinality scenarios (millions)
  • Auto: Starts with Map statistics. If cardinality exceeds 10,000, switches to Sketch automatically

Range Distribution Sort Key Base Weight

If sort order contains partition columns, each sort key maps to one partition and data file. This relative weight avoids placing too many small files for sort keys with low traffic. Value of 0.02 means each key has a base weight of 2% of the targeted traffic weight per writer task.
This is only applicable to StatisticsType.Map for low-cardinality scenarios.

Examples

Configure Batch Read

-- Set execution mode to batch
SET execution.runtime-mode = batch;

-- Read from a specific snapshot
SELECT * FROM my_table /*+ OPTIONS('snapshot-id'='3821550127947089987') */;

-- Time travel to a specific timestamp
SELECT * FROM my_table /*+ OPTIONS('as-of-timestamp'='1609459200000') */;

Configure Streaming Read

-- Set execution mode to streaming
SET execution.runtime-mode = streaming;

-- Enable dynamic table options
SET table.dynamic-table-options.enabled=true;

-- Stream from latest snapshot
SELECT * FROM my_table /*+ OPTIONS(
  'streaming'='true',
  'monitor-interval'='10s'
) */;

Configure Write

-- Write with specific format
INSERT INTO my_table /*+ OPTIONS(
  'write-format'='orc',
  'compression-codec'='zstd'
) */
SELECT * FROM source;

-- Write with upsert enabled
INSERT INTO my_table /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;

Next Steps

Queries

Learn about reading data with Flink

Writes

Learn about writing data with Flink

Build docs developers (and LLMs) love