Skip to main content
Flink stores operator state—windows, key/value state, and user-defined checkpointed state—in a state backend. The state backend controls how state is represented in memory and how it is persisted to durable storage during checkpointing.

Available state backends

Flink ships with two production-ready state backends:

HashMapStateBackend

Stores state as Java objects on the JVM heap. Fast random access; limited by available JVM heap memory.

EmbeddedRocksDBStateBackend

Stores state in a local RocksDB instance on the TaskManager’s disk. Supports state much larger than JVM heap; supports incremental checkpoints.
If you do not configure a state backend, Flink uses HashMapStateBackend by default.

HashMapStateBackend

The HashMapStateBackend holds state as Java objects in hash tables on the JVM heap. All reads and writes are direct in-memory operations, making it the fastest state backend for low-latency access. Best for:
  • Jobs with state that fits comfortably in JVM heap
  • Low-latency requirements where serialization overhead must be avoided
  • Jobs with many small states spread across a large key space
Limitations:
  • State size is bounded by available JVM heap memory
  • Large state increases GC pressure and pause times
  • Objects are not reusable (state must not be mutated after being handed to Flink)
When using HashMapStateBackend, set managed memory to zero so that the maximum heap is available for your state.

EmbeddedRocksDBStateBackend

The EmbeddedRocksDBStateBackend stores in-flight data in a RocksDB database in the TaskManager’s local data directories. State is serialized to byte arrays on reads and writes, which adds CPU overhead compared to heap-based access. Best for:
  • Jobs with very large state that exceeds available JVM heap
  • Long windows (hours or days)
  • All high-availability production setups with large state
Limitations:
  • Each read and write requires (de-)serialization, making it slower than HashMapStateBackend
  • Maximum key size and value size are each limited to 2 GB (RocksDB JNI limitation)
  • ListState that uses merge operations can silently accumulate values exceeding 2 GB and fail on the next read
RocksDB always performs asynchronous snapshots. It is the only built-in backend that supports incremental checkpoints.

ForStStateBackend (experimental)

The ForStStateBackend is based on the ForSt project (an extension of RocksDB) and is designed for disaggregated state management. It can store SST files on remote filesystems such as HDFS or S3, enabling state sizes that exceed local TaskManager disk capacity.
The ForStStateBackend is currently experimental and is not recommended for production deployments.

Choosing the right state backend

ConsiderationHashMapStateBackendEmbeddedRocksDBStateBackend
State sizeFits in JVM heapAny size (disk-backed)
Read/write latencyLowest (in-memory)Higher (serialization + disk I/O)
ThroughputHighestLower
Incremental checkpointsNoYes
Large windowsNot recommendedYes
Rescale supportYesYes
Use HashMapStateBackend when your state fits in memory and you need the lowest latency. Use EmbeddedRocksDBStateBackend when state is large, when you want incremental checkpoints, or when running long-window aggregations.
In Flink 1.13 and later, savepoints use a unified canonical format. You can switch state backends by taking a savepoint with the old backend, then restoring with the new one—provided both run on Flink 1.13 or later.

Configuring the state backend

Cluster-wide default

# config.yaml
state.backend.type: hashmap    # or: rocksdb, forst
execution.checkpointing.dir: hdfs://namenode:8020/flink/checkpoints

Per-job configuration

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
To use EmbeddedRocksDBStateBackend in your IDE or when not relying on the cluster distribution, add the Maven dependency:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

RocksDB incremental checkpoints

Incremental checkpoints record only the state changes since the last completed checkpoint, rather than producing a full snapshot each time. This can dramatically reduce checkpoint size and duration for large state. Enable incremental checkpoints:
execution.checkpointing.incremental: true
When incremental checkpoints are enabled, the “Checkpointed Data Size” shown in the Web UI represents only the delta for that checkpoint, not the total state size.
Recovery from an incremental checkpoint may be faster or slower than from a full checkpoint depending on your bottleneck:
  • CPU/IOPs bottleneck: incremental is faster (no need to re-build RocksDB tables from scratch)
  • Network bandwidth bottleneck: incremental may be slower (must fetch all accumulated deltas)

RocksDB memory management

Flink manages RocksDB memory by default, allocating memory from the TaskManager’s managed memory budget. This ensures RocksDB stays within the memory limits enforced by the container runtime (YARN, Kubernetes, Docker).
# Default: RocksDB memory is managed by Flink's managed memory budget
state.backend.rocksdb.memory.managed: true
Flink uses a shared block cache and write buffer manager across all RocksDB instances within a single slot. You can tune the split between write and read paths:
# Fraction of managed memory used for write buffers (default: 0.5)
state.backend.rocksdb.memory.write-buffer-ratio: 0.5

# Fraction of block cache reserved for index and filter blocks (default: 0.1)
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

RocksDB timers

Timers (for event-time windows and ProcessFunction callbacks) are stored in RocksDB by default, which scales well for large numbers of timers. For jobs with few timers, storing them on the JVM heap can improve performance:
# Store timers on JVM heap instead of RocksDB (use when timer count is small)
state.backend.rocksdb.timer-service.factory: heap
Heap-based timers combined with RocksDB do not support asynchronous snapshots for timer state. Use this option only when you have a small, bounded number of timers.

Advanced RocksDB tuning

For expert-level tuning, implement a RocksDBOptionsFactory to control column family options:
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

    @Override
    public DBOptions createDBOptions(DBOptions currentOptions,
                                     Collection<AutoCloseable> handlesToClose) {
        return currentOptions
            .setIncreaseParallelism(4)
            .setUseFsync(false);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions,
                                                   Collection<AutoCloseable> handlesToClose) {
        return currentOptions.setTableFormatConfig(
            new BlockBasedTableConfig()
                .setBlockRestartInterval(16));
    }

    @Override
    public RocksDBOptionsFactory configure(ReadableConfig configuration) {
        return this;
    }
}
Register the factory:
state.backend.rocksdb.options-factory: com.example.MyOptionsFactory
When state.backend.rocksdb.memory.managed is enabled, any block cache and write buffer settings in your RocksDBOptionsFactory are overridden by the managed memory configuration.

Changelog state backend

The changelog feature reduces checkpoint latency by continuously uploading state changes to durable storage, then only uploading the relevant portion of the changelog at each checkpoint boundary. Enable it:
state.changelog.enabled: true
state.changelog.storage: filesystem
state.changelog.dstl.dfs.base-path: s3://my-bucket/changelog
Changelog is compatible with both HashMapStateBackend and EmbeddedRocksDBStateBackend.

Build docs developers (and LLMs) love