Skip to main content
Apache Iceberg provides maintenance operations for Flink to optimize table performance and manage storage efficiently.

Batch Mode Maintenance

Rewrite Files Action

Iceberg provides an API to rewrite small files into larger files by submitting Flink batch jobs. The behavior is the same as Spark’s rewriteDataFiles action.
import org.apache.iceberg.flink.actions.Actions;

TableLoader tableLoader = TableLoader.fromCatalog(
    CatalogLoader.hive("my_catalog", configuration, properties),
    TableIdentifier.of("database", "table")
);

Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
        .rewriteDataFiles()
        .execute();
For more details, see RewriteDataFilesAction.

Streaming Mode Maintenance

Overview

The TableMaintenance API in Apache Iceberg empowers Flink jobs to execute maintenance tasks natively, either embedded within existing streaming pipelines or deployed as standalone Flink jobs. This eliminates dependencies on external systems like Spark. Benefits:
  • Streamlined architecture
  • Reduced operational costs
  • Enhanced automation capabilities
  • No dependency on Spark infrastructure

Supported Features

ExpireSnapshots

Removes old snapshots and their files. Internally uses cleanExpiredFiles(true) when committing.
.add(ExpireSnapshots.builder()
    .maxSnapshotAge(Duration.ofDays(7))
    .retainLast(10)
    .deleteBatchSize(1000))

RewriteDataFiles

Compacts small files to optimize file sizes. Supports partial progress commits and limiting maximum rewritten bytes per run.
.add(RewriteDataFiles.builder()
    .targetFileSizeBytes(256 * 1024 * 1024)
    .minFileSizeBytes(32 * 1024 * 1024)
    .partialProgressEnabled(true)
    .partialProgressMaxCommits(5))

DeleteOrphanFiles

Removes files which are not referenced in any metadata files of an Iceberg table.
.add(DeleteOrphanFiles.builder()
    .minAge(Duration.ofDays(3))
    .deleteBatchSize(1000))

Lock Management

The TriggerLockFactory is essential for coordinating maintenance tasks. It prevents concurrent maintenance operations on the same table.
Why locks are needed:
  • Prevents concurrent access conflicts
  • Ensures data consistency
  • Manages resources effectively
  • Avoids duplicate work even with a single job

JDBC Lock Factory

Uses a database table to manage distributed locks:
Map<String, String> jdbcProps = new HashMap<>();
jdbcProps.put("jdbc.user", "flink");
jdbcProps.put("jdbc.password", "flinkpw");
jdbcProps.put("flink-maintenance.lock.jdbc.init-lock-tables", "true");

TriggerLockFactory lockFactory = new JdbcLockFactory(
    "jdbc:postgresql://localhost:5432/iceberg",
    "catalog.db.table",
    jdbcProps
);

ZooKeeper Lock Factory

Uses Apache ZooKeeper for distributed locks:
TriggerLockFactory lockFactory = new ZkLockFactory(
    "localhost:2181",       // ZooKeeper connection string
    "catalog.db.table",     // Lock ID
    60000,                  // sessionTimeoutMs
    15000,                  // connectionTimeoutMs
    3000,                   // baseSleepTimeMs
    3                       // maxRetries
);

Quick Start Example

The following example demonstrates automated maintenance for an Iceberg table:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TableLoader tableLoader = TableLoader.fromCatalog(
    CatalogLoader.hive("my_catalog", configuration, properties),
    TableIdentifier.of("database", "table")
);

Map<String, String> jdbcProps = new HashMap<>();
jdbcProps.put("jdbc.user", "flink");
jdbcProps.put("jdbc.password", "flinkpw");

TriggerLockFactory lockFactory = new JdbcLockFactory(
    "jdbc:postgresql://localhost:5432/iceberg",
    "catalog.db.table",
    jdbcProps
);

TableMaintenance.forTable(env, tableLoader, lockFactory)
    .uidSuffix("my-maintenance-job")
    .rateLimit(Duration.ofMinutes(10))
    .lockCheckDelay(Duration.ofSeconds(10))
    .add(ExpireSnapshots.builder()
        .scheduleOnCommitCount(10)
        .maxSnapshotAge(Duration.ofMinutes(10))
        .retainLast(5)
        .deleteBatchSize(5)
        .parallelism(8))
    .add(RewriteDataFiles.builder()
        .scheduleOnDataFileCount(10)
        .targetFileSizeBytes(128 * 1024 * 1024)
        .partialProgressEnabled(true)
        .partialProgressMaxCommits(10))
    .append();

env.execute("Table Maintenance Job");

Configuration Options

TableMaintenance Builder

MethodDescriptionDefault
uidSuffix(String)Unique identifier suffix for the jobRandom UUID
rateLimit(Duration)Minimum interval between task executions60 seconds
lockCheckDelay(Duration)Delay for checking lock availability30 seconds
parallelism(int)Default parallelism for maintenance tasksSystem default
maxReadBack(int)Max snapshots to check during initialization100

Common Task Options

MethodDescriptionDefaultType
scheduleOnCommitCount(int)Trigger after N commitsNo schedulingint
scheduleOnDataFileCount(int)Trigger after N data filesNo schedulingint
scheduleOnDataFileSize(long)Trigger after total data file size (bytes)No schedulinglong
scheduleOnPosDeleteFileCount(int)Trigger after N positional delete filesNo schedulingint
scheduleOnPosDeleteRecordCount(long)Trigger after N positional delete recordsNo schedulinglong
scheduleOnEqDeleteFileCount(int)Trigger after N equality delete filesNo schedulingint
scheduleOnEqDeleteRecordCount(long)Trigger after N equality delete recordsNo schedulinglong
scheduleOnInterval(Duration)Trigger after time intervalNo schedulingDuration

ExpireSnapshots Options

MethodDescriptionDefaultType
maxSnapshotAge(Duration)Maximum age of snapshots to retain5 daysDuration
retainLast(int)Minimum number of snapshots to retain1int
deleteBatchSize(int)Number of files to delete in each batch1000int
planningWorkerPoolSize(int)Number of worker threads for planningShared poolint
cleanExpiredMetadata(boolean)Remove expired metadata filesfalseboolean

RewriteDataFiles Options

MethodDescriptionDefaultType
targetFileSizeBytes(long)Target size for rewritten filesTable property or 512MBlong
minFileSizeBytes(long)Minimum size for compaction75% of targetlong
maxFileSizeBytes(long)Maximum size for compaction180% of targetlong
minInputFiles(int)Minimum files to trigger rewrite5int
deleteFileThreshold(int)Min delete-file count per data fileInteger.MAX_VALUEint
rewriteAll(boolean)Rewrite all data filesfalseboolean
maxFileGroupSizeBytes(long)Maximum total size of a file group100GBlong
partialProgressEnabled(boolean)Enable partial progress commitsfalseboolean
partialProgressMaxCommits(int)Maximum commits for partial progress10int
maxRewriteBytes(long)Maximum bytes to rewrite per executionLong.MAX_VALUElong

DeleteOrphanFiles Options

MethodDescriptionDefaultType
location(string)Location to start recursive listingTable locationString
usePrefixListing(boolean)Use prefix-based file listingFalseboolean
minAge(Duration)Remove orphan files created before this timestamp3 days agoDuration
planningWorkerPoolSize(int)Number of worker threads for planningShared poolint

Post-Commit Integration

Automatic execution of maintenance tasks after data is committed using addPostCommitTopology(...):
Map<String, String> flinkConf = new HashMap<>();

flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), LockConfig.JdbcLockConfig.JDBC);
flinkConf.put(LockConfig.JdbcLockConfig.JDBC_URI_OPTION.key(), 
    "jdbc:postgresql://localhost:5432/iceberg");
flinkConf.put(LockConfig.LOCK_ID_OPTION.key(), "catalog.db.table");

IcebergSink.forRowData(dataStream)
    .table(table)
    .tableLoader(tableLoader)
    .setAll(flinkConf)
    .append();

SQL Examples

Enable maintenance using SQL:
-- Enable Iceberg V2 Sink and compaction
SET 'table.exec.iceberg.use.v2.sink' = 'true';
SET 'compaction-enabled' = 'true';

-- Configure JDBC lock
SET 'flink-maintenance.lock.type' = 'jdbc';
SET 'flink-maintenance.lock.lock-id' = 'catalog.db.table';
SET 'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg';
SET 'flink-maintenance.lock.jdbc.init-lock-tables' = 'true';

INSERT INTO db.tbl SELECT ...;
Or specify options in table DDL:
CREATE TABLE db.tbl (
  id BIGINT,
  data STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = 'my_catalog',
  'compaction-enabled' = 'true',
  'flink-maintenance.lock.type' = 'jdbc',
  'flink-maintenance.lock.lock-id' = 'catalog.db.table',
  'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg'
);

Best Practices

Resource Management

  • Use dedicated slot sharing groups for maintenance tasks
  • Set appropriate parallelism based on cluster resources
  • Enable checkpointing for fault tolerance

Scheduling Strategy

  • Avoid too frequent executions with rateLimit
  • Use scheduleOnCommitCount for write-heavy tables
  • Use scheduleOnDataFileCount for fine-grained control

Performance Tuning

  • Adjust deleteBatchSize based on storage performance
  • Enable partialProgressEnabled for large rewrite operations
  • Set reasonable maxRewriteBytes limits
  • Set appropriate maxFileGroupSizeBytes for parallel processing

Troubleshooting

OutOfMemoryError during file deletion

Cause: Large number of files being deleted in a single batch. Solution: Reduce the batch size:
.deleteBatchSize(500) // Reduce from default 1000

Lock conflicts

Cause: Multiple jobs attempting maintenance simultaneously. Solution: Increase lock check delay and rate limit:
.lockCheckDelay(Duration.ofMinutes(1))
.rateLimit(Duration.ofMinutes(10))

Slow rewrite operations

Cause: Too much data being rewritten in a single run. Solution: Enable partial progress and limit bytes:
.partialProgressEnabled(true)
.partialProgressMaxCommits(3)
.maxRewriteBytes(1L * 1024 * 1024 * 1024) // 1GB

Complete Production Example

public class TableMaintenanceJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);

        TableLoader tableLoader = TableLoader.fromCatalog(
            CatalogLoader.hive("my_catalog", configuration),
            TableIdentifier.of("database", "table")
        );

        Map<String, String> jdbcProps = new HashMap<>();
        jdbcProps.put("jdbc.user", "flink");
        jdbcProps.put("jdbc.password", "flinkpw");
        jdbcProps.put("flink-maintenance.lock.jdbc.init-lock-tables", "true");

        TriggerLockFactory lockFactory = new JdbcLockFactory(
            "jdbc:postgresql://localhost:5432/iceberg",
            "catalog.db.table",
            jdbcProps
        );

        TableMaintenance.forTable(env, tableLoader, lockFactory)
            .uidSuffix("production-maintenance")
            .rateLimit(Duration.ofMinutes(15))
            .lockCheckDelay(Duration.ofSeconds(30))
            .parallelism(4)
            .add(ExpireSnapshots.builder()
                .maxSnapshotAge(Duration.ofDays(7))
                .retainLast(10))
            .add(RewriteDataFiles.builder()
                .targetFileSizeBytes(256 * 1024 * 1024)
                .minFileSizeBytes(32 * 1024 * 1024)
                .scheduleOnDataFileCount(20)
                .partialProgressEnabled(true)
                .partialProgressMaxCommits(5)
                .maxRewriteBytes(2L * 1024 * 1024 * 1024)
                .parallelism(6))
            .add(DeleteOrphanFiles.builder()
                .minAge(Duration.ofDays(5)))
            .append();

        env.execute("Iceberg Table Maintenance");
    }
}

Next Steps

Configuration

Configure Flink for Iceberg

Writes

Learn about writing data with Flink

Build docs developers (and LLMs) love