Skip to main content
For Flink applications to run reliably at large scale, two conditions must be met:
  1. The application must be able to take checkpoints reliably and within the checkpoint interval
  2. There must be enough resources to catch up with the input stream after a failure
This page covers practical techniques for achieving both.

Monitoring checkpoints

Before tuning, establish a baseline by monitoring checkpoint behaviour in the Flink Web UI under the Checkpoints tab. The two most important metrics are:
  • Time to first barrier: The time from triggering the checkpoint until operators receive their first checkpoint barrier. High values indicate back-pressure upstream.
  • Alignment duration: The time between receiving the first and last checkpoint barrier at an operator. High values with aligned exactly-once checkpoints indicate back-pressure or data skew.
Both values should remain low. Consistently high values indicate the system is operating under sustained back-pressure, not just a transient spike.

Tuning checkpoint intervals and pauses

When individual checkpoints take longer than the checkpoint interval, Flink starts the next checkpoint immediately after the previous one completes. This creates a feedback loop where the system spends all its time checkpointing. Break this loop by setting a minimum pause between checkpoints:
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000); // 30 seconds
This ensures at least 30 seconds of normal processing between the end of one checkpoint and the start of the next, regardless of how long checkpoints take.
Do not allow multiple concurrent checkpoints for jobs with large state. Concurrent checkpoints multiply the resource usage (network bandwidth, disk I/O) and usually make the situation worse.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // default, keep it

Incremental checkpoints with RocksDB

The most impactful single change for large RocksDB state is enabling incremental checkpoints. Instead of uploading the entire state on every checkpoint, Flink uploads only the SST files that changed since the last checkpoint.
execution.checkpointing.incremental: true
Incremental checkpoints build on RocksDB’s internal compaction mechanism. Old checkpoint data is automatically subsumed as new incremental checkpoints are taken, so the checkpoint history does not grow indefinitely. Recovery time trade-offs:
  • If recovery is CPU/IOPs bound: incremental is faster (no need to rebuild all SST tables from scratch)
  • If recovery is network bound: incremental may be slower (must fetch all accumulated deltas)

Tuning RocksDB memory

RocksDB performance is heavily influenced by memory allocation. By default, Flink allocates RocksDB memory from the TaskManager’s managed memory budget.

Increase managed memory

The first step for any RocksDB performance problem is to increase the managed memory fraction:
taskmanager.memory.managed.fraction: 0.5  # default is 0.4; increase for large state jobs
With multi-GB TaskManagers, you can often increase this to 0.5 or 0.6 without impacting job logic.

Tune write buffer vs. block cache ratio

If you see frequent RocksDB MemTable flushes (write-side bottleneck), increase the write buffer ratio:
# Fraction of managed memory allocated to write buffers (default: 0.5)
state.backend.rocksdb.memory.write-buffer-ratio: 0.5

# Fraction of block cache reserved for index and filter blocks (default: 0.1)
# Do not set this to 0 — index/filter blocks compete with data blocks for cache space
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

Understand per-state memory

RocksDB allocates one ColumnFamily per state per operator. Each ColumnFamily needs its own write buffers. Jobs with many states therefore need proportionally more memory to achieve the same write performance. To diagnose this, compare:
  • Number of states × operators × tasks in your job
  • Current managed memory per slot
If the ratio is unfavourable, consider increasing managed memory or reducing the number of distinct states.

Expert: per-column-family options

For jobs with many states that suffer from frequent MemTable flushes, use a custom RocksDBOptionsFactory to increase background flush threads and reduce arena block size:
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

    @Override
    public DBOptions createDBOptions(DBOptions currentOptions,
                                     Collection<AutoCloseable> handlesToClose) {
        // More background flush threads for jobs with many column families
        return currentOptions.setMaxBackgroundFlushes(4);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions,
                                                   Collection<AutoCloseable> handlesToClose) {
        // Reduce arena block size from default 8MB to 1MB when there are many states
        return currentOptions.setArenaBlockSize(1024 * 1024);
    }

    @Override
    public OptionsFactory configure(ReadableConfig configuration) {
        return this;
    }
}

Timers: RocksDB vs. heap

Timers are stored in RocksDB by default. For jobs with a small, bounded number of timers (no windows, minimal ProcessFunction timer usage), storing timers on the JVM heap can reduce checkpoint overhead:
state.backend.rocksdb.timer-service.factory: heap
Heap-based timers do not support asynchronous snapshots. Only use this option when timer count is small and predictable.

Task-local recovery

In a typical failure scenario, a failed task is rescheduled to the same TaskManager. However, it still reads its state from the distributed store (HDFS, S3), which can be slow for large state. Task-local recovery keeps a secondary copy of state on local disk. On recovery, Flink first tries to restore from the local copy (fast), and transparently falls back to the distributed store if the local copy is unavailable. Enable task-local recovery:
state.backend.local-recovery: true
For EmbeddedRocksDBStateBackend with incremental checkpoints, the local copy reuses RocksDB’s native checkpoint mechanism. The local copy shares active SST files via hard links, so it consumes no additional disk space for those files.
Task-local recovery only covers keyed state. Unaligned checkpoints do not currently support task-local recovery.

Checkpoint compression

Flink supports Snappy compression for checkpoints and savepoints. Compression is off by default:
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
Compression works at the key-group granularity, so individual key groups can be decompressed independently during rescaling.
Compression has no effect on incremental RocksDB checkpoints, which already use Snappy internally via RocksDB’s native format.

Capacity planning

When planning resources for a Flink job with large state:
  1. Baseline throughput: establish the resources needed to run without back-pressure during normal operation. Always measure with checkpointing enabled, since checkpointing consumes CPU and network bandwidth.
  2. Recovery headroom: provision extra resources to allow catch-up after a failure. The required headroom depends on how long state recovery takes and how fast the source accumulates lag during recovery.
  3. Maximum parallelism: set setMaxParallelism() to a value high enough to allow future scaling, since this cannot be changed after a job has started without discarding state.
  4. Window spikes: operators downstream of large windows experience bursty load when the window fires. Ensure downstream parallelism can handle the peak output rate.

Build docs developers (and LLMs) love