Skip to main content
Apache Iceberg supports both Apache Flink’s DataStream API and Table API for streaming and batch analytics.
For a general setup path, start with the Quickstart

Feature Support

Iceberg provides comprehensive Flink integration with both SQL and DataStream APIs:
FeatureSupportNotes
SQL create catalog✔️
SQL create database✔️
SQL create table✔️
SQL create table like✔️
SQL alter table✔️Only support altering table properties, column and partition changes are not supported
SQL drop table✔️
SQL select✔️Support both streaming and batch mode
SQL insert into✔️Support both streaming and batch mode
SQL insert overwrite✔️
DataStream read✔️
DataStream append✔️
DataStream overwrite✔️
Metadata tables✔️
Rewrite files action✔️

Getting Started

Prerequisites

Download Flink from the Apache download page. Iceberg uses Scala 2.12 when compiling the Apache iceberg-flink-runtime jar, so it’s recommended to use Flink bundled with Scala 2.12.
FLINK_VERSION=1.17.0
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
Start the Flink SQL client with the Iceberg runtime jar:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Below works for 1.15 or less
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.15-1.4.0.jar shell

# 1.16 or above has a regression in loading external jar via -j option.
# Put iceberg-flink-runtime-1.16-1.4.0.jar in flink/lib dir
./bin/sql-client.sh embedded shell
By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client.
Install the Apache Flink dependency using pip:
pip install apache-flink==1.17.0
Add the Iceberg runtime jar:
import os
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.17-1.4.0.jar")

env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
Create a StreamTableEnvironment and execute Flink SQL:
from pyflink.table import StreamTableEnvironment

table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
    'type'='iceberg',
    'catalog-impl'='com.my.custom.CatalogImpl',
    'my-additional-catalog-config'='my-value'
)
""")
Run a query:
(table_env
    .sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5")
    .execute()
    .print())

Type Conversion

Iceberg’s integration for Flink automatically converts between Flink and Iceberg types.
FlinkIcebergNotes
booleanboolean
tinyintinteger
smallintinteger
integerinteger
bigintlong
floatfloat
doubledouble
charstring
varcharstring
stringstring
binarybinary
varbinaryfixed
decimaldecimal
datedate
timetime
timestamptimestamp without timezone
timestamp_ltztimestamp with timezone
arraylist
mapmap
multisetmap
rowstruct
rawNot supported
intervalNot supported
structuredNot supported
timestamp with zoneNot supported
distinctNot supported
nullNot supported
symbolNot supported
logicalNot supported
IcebergFlinkNotes
booleanboolean
structrow
listarray
mapmap
integerinteger
longbigint
floatfloat
doubledouble
datedate
timetime
timestamp without timezonetimestamp(6)
timestamp with timezonetimestamp_ltz(6)
stringvarchar(2147483647)
uuidbinary(16)
fixed(N)binary(N)
binaryvarbinary(2147483647)
decimal(P, S)decimal(P, S)
nanosecond timestamptimestamp(9)
nanosecond timestamp with timezonetimestamp_ltz(9)
unknownnull
variantNot supported
geometryNot supported
geographyNot supported

Next Steps

Connector Setup

Configure Flink connector for Iceberg tables

DDL Operations

Create and manage Iceberg tables with Flink DDL

Queries

Read data from Iceberg tables using Flink

Writes

Write data to Iceberg tables with Flink

Build docs developers (and LLMs) love