Skip to main content
Iceberg supports a comprehensive set of DDL operations in Flink for managing catalogs, databases, and tables.

CREATE CATALOG

Hive Catalog

This creates an Iceberg catalog named hive_catalog that loads tables from Hive metastore:
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);
Properties:
PropertyRequiredDescription
uri✔️The Hive metastore’s thrift URI
clientsThe Hive metastore client pool size, default value is 2
warehouseThe Hive warehouse location
hive-conf-dirPath to a directory containing a hive-site.xml configuration file
hadoop-conf-dirPath to a directory containing core-site.xml and hdfs-site.xml

Hadoop Catalog

Iceberg supports a directory-based catalog in HDFS:
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nn:8020/warehouse/path',
  'property-version'='1'
);
Properties:
PropertyRequiredDescription
warehouse✔️The HDFS directory to store metadata files and data files
Execute the SQL command USE CATALOG hadoop_catalog to set the current catalog.

REST Catalog

This creates an Iceberg catalog that loads tables from a REST catalog:
CREATE CATALOG rest_catalog WITH (
  'type'='iceberg',
  'catalog-type'='rest',
  'uri'='https://localhost/'
);
Properties:
PropertyRequiredDescription
uri✔️The URL to the REST Catalog
credentialA credential to exchange for a token in the OAuth2 client credentials flow
tokenA token which will be used to interact with the server

Custom Catalog

Flink supports loading a custom Iceberg Catalog implementation by specifying the catalog-impl property:
CREATE CATALOG my_catalog WITH (
  'type'='iceberg',
  'catalog-impl'='com.my.custom.CatalogImpl',
  'my-additional-catalog-config'='my-value'
);

Create via YAML Config

Catalogs can be registered in sql-client-defaults.yaml before starting the SQL client:
catalogs:
  - name: my_catalog
    type: iceberg
    catalog-type: hadoop
    warehouse: hdfs://nn:8020/warehouse/path

Create via SQL Files

The Flink SQL Client supports the -i startup option to execute an initialization SQL file:
-- define available catalogs
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

USE CATALOG hive_catalog;
Using -i <init.sql> option to initialize SQL Client session:
/path/to/bin/sql-client.sh -i /path/to/init.sql

CREATE DATABASE

By default, Iceberg will use the default database in Flink. Create a separate database to avoid creating tables under the default database:
CREATE DATABASE iceberg_db;
USE iceberg_db;

CREATE TABLE

Basic Table

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL
) WITH ('format-version'='2');
Table create commands support the commonly used Flink create clauses including:
  • PARTITION BY (column1, column2, ...) to configure partitioning (Flink does not yet support hidden partitioning)
  • COMMENT 'table document' to set a table description
  • WITH ('key'='value', ...) to set table configuration which will be stored in Iceberg table properties

Specify Table Location

To specify the table location, use WITH ('location'='fully-qualified-uri'):
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL
) WITH (
    'format-version'='2',
    'location'='hdfs://nn:8020/custom-path'
);
Computed columns and watermark definitions are not currently supported.

PRIMARY KEY

Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null. It’s required for UPSERT mode.
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) WITH ('format-version'='2');

PARTITIONED BY

To create a partition table, use PARTITIONED BY:
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL
)
PARTITIONED BY (data)
WITH ('format-version'='2');
Iceberg supports hidden partitioning but Flink doesn’t support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.

CREATE TABLE LIKE

To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE:
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE `hive_catalog`.`default`.`sample_like` 
  LIKE `hive_catalog`.`default`.`sample`;
For more details, refer to the Flink CREATE TABLE documentation.

ALTER TABLE

Alter Table Properties

Iceberg only supports altering table properties:
ALTER TABLE `hive_catalog`.`default`.`sample` 
  SET ('write.format.default'='avro');

Rename Table

ALTER TABLE `hive_catalog`.`default`.`sample` 
  RENAME TO `hive_catalog`.`default`.`new_sample`;

DROP TABLE

To delete a table, run:
DROP TABLE `hive_catalog`.`default`.`sample`;

Examples

Complete Table Creation Flow

-- Create catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

-- Use the catalog
USE CATALOG hive_catalog;

-- Create database
CREATE DATABASE my_db;
USE my_db;

-- Create table
CREATE TABLE orders (
    order_id BIGINT,
    customer_id BIGINT,
    order_date DATE,
    amount DECIMAL(10, 2),
    PRIMARY KEY(order_id) NOT ENFORCED
) PARTITIONED BY (order_date)
WITH (
    'format-version'='2',
    'write.format.default'='parquet'
);

Alter Table Example

-- Set table properties
ALTER TABLE orders SET ('write.format.default'='orc');

-- Rename table
ALTER TABLE orders RENAME TO customer_orders;

Next Steps

Queries

Learn how to query Iceberg tables

Writes

Write data to Iceberg tables

Build docs developers (and LLMs) love