Iceberg supports a comprehensive set of DDL operations in Flink for managing catalogs, databases, and tables.
CREATE CATALOG
Hive Catalog
This creates an Iceberg catalog named hive_catalog that loads tables from Hive metastore:
CREATE CATALOG hive_catalog WITH (
'type' = 'iceberg' ,
'catalog-type' = 'hive' ,
'uri' = 'thrift://localhost:9083' ,
'clients' = '5' ,
'property-version' = '1' ,
'warehouse' = 'hdfs://nn:8020/warehouse/path'
);
Properties:
Property Required Description uri✔️ The Hive metastore’s thrift URI clientsThe Hive metastore client pool size, default value is 2 warehouseThe Hive warehouse location hive-conf-dirPath to a directory containing a hive-site.xml configuration file hadoop-conf-dirPath to a directory containing core-site.xml and hdfs-site.xml
Hadoop Catalog
Iceberg supports a directory-based catalog in HDFS:
CREATE CATALOG hadoop_catalog WITH (
'type' = 'iceberg' ,
'catalog-type' = 'hadoop' ,
'warehouse' = 'hdfs://nn:8020/warehouse/path' ,
'property-version' = '1'
);
Properties:
Property Required Description warehouse✔️ The HDFS directory to store metadata files and data files
Execute the SQL command USE CATALOG hadoop_catalog to set the current catalog.
REST Catalog
This creates an Iceberg catalog that loads tables from a REST catalog:
CREATE CATALOG rest_catalog WITH (
'type' = 'iceberg' ,
'catalog-type' = 'rest' ,
'uri' = 'https://localhost/'
);
Properties:
Property Required Description uri✔️ The URL to the REST Catalog credentialA credential to exchange for a token in the OAuth2 client credentials flow tokenA token which will be used to interact with the server
Custom Catalog
Flink supports loading a custom Iceberg Catalog implementation by specifying the catalog-impl property:
CREATE CATALOG my_catalog WITH (
'type' = 'iceberg' ,
'catalog-impl' = 'com.my.custom.CatalogImpl' ,
'my-additional-catalog-config' = 'my-value'
);
Create via YAML Config
Catalogs can be registered in sql-client-defaults.yaml before starting the SQL client:
catalogs :
- name : my_catalog
type : iceberg
catalog-type : hadoop
warehouse : hdfs://nn:8020/warehouse/path
Create via SQL Files
The Flink SQL Client supports the -i startup option to execute an initialization SQL file:
-- define available catalogs
CREATE CATALOG hive_catalog WITH (
'type' = 'iceberg' ,
'catalog-type' = 'hive' ,
'uri' = 'thrift://localhost:9083' ,
'warehouse' = 'hdfs://nn:8020/warehouse/path'
);
USE CATALOG hive_catalog;
Using -i <init.sql> option to initialize SQL Client session:
/path/to/bin/sql-client.sh -i /path/to/init.sql
CREATE DATABASE
By default, Iceberg will use the default database in Flink. Create a separate database to avoid creating tables under the default database:
CREATE DATABASE iceberg_db ;
USE iceberg_db;
CREATE TABLE
Basic Table
CREATE TABLE ` hive_catalog `. `default` . `sample` (
id BIGINT COMMENT 'unique id' ,
data STRING NOT NULL
) WITH ( 'format-version' = '2' );
Table create commands support the commonly used Flink create clauses including:
PARTITION BY (column1, column2, ...) to configure partitioning (Flink does not yet support hidden partitioning)
COMMENT 'table document' to set a table description
WITH ('key'='value', ...) to set table configuration which will be stored in Iceberg table properties
Specify Table Location
To specify the table location, use WITH ('location'='fully-qualified-uri'):
CREATE TABLE ` hive_catalog `. `default` . `sample` (
id BIGINT COMMENT 'unique id' ,
data STRING NOT NULL
) WITH (
'format-version' = '2' ,
'location' = 'hdfs://nn:8020/custom-path'
);
Computed columns and watermark definitions are not currently supported.
PRIMARY KEY
Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null. It’s required for UPSERT mode .
CREATE TABLE ` hive_catalog `. `default` . `sample` (
id BIGINT COMMENT 'unique id' ,
data STRING NOT NULL ,
PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH ( 'format-version' = '2' );
PARTITIONED BY
To create a partition table, use PARTITIONED BY:
CREATE TABLE ` hive_catalog `. `default` . `sample` (
id BIGINT COMMENT 'unique id' ,
data STRING NOT NULL
)
PARTITIONED BY ( data )
WITH ( 'format-version' = '2' );
Iceberg supports hidden partitioning but Flink doesn’t support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.
CREATE TABLE LIKE
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE:
CREATE TABLE ` hive_catalog `. `default` . `sample` (
id BIGINT COMMENT 'unique id' ,
data STRING
);
CREATE TABLE ` hive_catalog `. `default` . `sample_like`
LIKE `hive_catalog` . `default` . `sample` ;
For more details, refer to the Flink CREATE TABLE documentation .
ALTER TABLE
Alter Table Properties
Iceberg only supports altering table properties:
ALTER TABLE `hive_catalog` . `default` . `sample`
SET ( 'write.format.default' = 'avro' );
Rename Table
ALTER TABLE `hive_catalog` . `default` . `sample`
RENAME TO `hive_catalog` . `default` . `new_sample` ;
DROP TABLE
To delete a table, run:
DROP TABLE `hive_catalog` . `default` . `sample` ;
Examples
Complete Table Creation Flow
-- Create catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'iceberg' ,
'catalog-type' = 'hive' ,
'uri' = 'thrift://localhost:9083' ,
'warehouse' = 'hdfs://nn:8020/warehouse/path'
);
-- Use the catalog
USE CATALOG hive_catalog;
-- Create database
CREATE DATABASE my_db ;
USE my_db;
-- Create table
CREATE TABLE orders (
order_id BIGINT ,
customer_id BIGINT ,
order_date DATE ,
amount DECIMAL ( 10 , 2 ),
PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (order_date)
WITH (
'format-version' = '2' ,
'write.format.default' = 'parquet'
);
Alter Table Example
-- Set table properties
ALTER TABLE orders SET ( 'write.format.default' = 'orc' );
-- Rename table
ALTER TABLE orders RENAME TO customer_orders;
Next Steps
Queries Learn how to query Iceberg tables
Writes Write data to Iceberg tables