What is state?
An operation is stateful when it needs information from previous events to process the current one. Examples include:- Pattern detection: storing the sequence of events seen so far while searching for a complex event pattern
- Windowed aggregations: maintaining running counts, sums, or averages within a time window
- Machine learning: holding current model parameters while training over a stream of data points
- Deduplication: keeping a set of already-seen IDs to filter out duplicate events
- Session tracking: maintaining per-user session data across a series of interactions
- Make it fault tolerant via checkpoints and savepoints
- Enable rescaling — redistributing state across parallel instances when you change parallelism
Keyed state
Keyed state is the most common form of state in Flink. It is maintained in what can be thought of as an embedded key/value store that is strictly partitioned and distributed together with the streams read by stateful operators. Access to keyed state is only possible on keyed streams — that is, after akeyBy() operation. Each operator instance only has access to the state for keys assigned to it.
Key groups
Internally, keyed state is organized into Key Groups — the atomic unit by which Flink redistributes state when you rescale a job. Flink creates exactly as many Key Groups as the configured maximum parallelism. When you change the parallelism of a running job (via a savepoint), each parallel instance takes ownership of a contiguous range of Key Groups.Keyed state types
Flink exposes several state primitives for use inside keyed operators:ValueState
ValueState
Stores a single value of type
T per key. Use it to track the latest known value for a key — for example, the last event timestamp or a running count.ListState
ListState
Stores a list of values of type
T per key. Use it when you need to accumulate multiple events before emitting results — for example, collecting all events in a session.MapState
MapState
Stores a map of key-value pairs per stream key. Use it when you need to maintain a dictionary or lookup table scoped to each key — for example, tracking per-feature values for each user.
ReducingState and AggregatingState
ReducingState and AggregatingState
ReducingState<T> stores a single value that represents the aggregation of all values added to the state. Every call to add() immediately merges the new value into the stored aggregate using a provided ReduceFunction.AggregatingState<IN, OUT> is more flexible — the input and output types may differ, and aggregation uses an AggregateFunction.Operator state
Operator state (also called non-keyed state) is scoped to an operator instance rather than a key. Each parallel instance of the operator has its own state. This is less common than keyed state but is used by connectors — for example, a Kafka source stores the per-partition offsets it has consumed as operator state. Operator state supports two redistribution modes on rescaling:- Even split: the list of state items is split evenly across the new parallel instances
- Union: every new parallel instance receives a full copy of the union of all state
State backends
A state backend determines how and where operator state is stored at runtime. You can switch backends without changing application logic.HashMapStateBackend
HashMapStateBackend
Stores all state in the Java heap memory of the TaskManager as a Best for jobs with small-to-medium state that fits comfortably in heap memory.
HashMap. This is the default backend.- Fast reads and writes (in-memory)
- State size limited by available heap memory
- Checkpoint snapshots are serialized to the configured checkpoint storage (e.g., HDFS or S3)
EmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend
Stores state in a locally embedded RocksDB instance on each TaskManager. RocksDB is an LSM-tree-based key/value store that spills to disk, so state size is limited only by available disk space rather than heap memory.Best for jobs with very large state (gigabytes to terabytes) or when state must survive TaskManager restarts without a full re-snapshot.
- Supports state sizes far exceeding available JVM heap
- Reads and writes are slower than in-memory (RocksDB uses native memory and disk I/O)
- Supports incremental checkpoints: only changed SST files are uploaded, not the full state
State persistence and checkpointing
Flink implements fault tolerance using a combination of stream replay and checkpointing. A checkpoint marks a specific point in each input stream along with the corresponding state for each operator. When a failure occurs, Flink restores all operator states to the last completed checkpoint and replays records from that point — achieving exactly-once processing semantics.Checkpoint barriers
The core mechanism of Flink’s distributed snapshotting is stream barriers. Barriers are lightweight markers injected into the data stream at sources and flow downstream alongside records.n from all its input streams, it:
- Takes a snapshot of its current state (asynchronously)
- Emits barrier
ndownstream - Continues processing records
n, the checkpoint is considered complete and is reported to the JobManager’s checkpoint coordinator.
Aligned vs. unaligned checkpoints
For operators with multiple input streams, Flink must align barriers before snapshotting: Aligned checkpointing (default):- When barrier
narrives from input stream A but not yet from stream B, the operator buffers records from stream A - Once barrier
narrives from all inputs, the operator snapshots and forwards the barrier - This guarantees exactly-once but adds latency for slow or backpressured inputs
- The operator immediately forwards barrier
ndownstream without waiting for other inputs - All in-flight records (those that overtook the barrier) are included in the checkpoint state
- Lower checkpoint latency for pipelines with persistent backpressure, but higher I/O overhead
Savepoints always use aligned checkpointing. Unaligned checkpoints are only for automatic fault-tolerance checkpoints.
Exactly-once vs. at-least-once
You can configure the checkpointing mode:AT_LEAST_ONCE mode, barrier alignment is skipped. An operator continues processing all inputs even after receiving a barrier from some inputs. On recovery, some records may be processed more than once.
For dataflows with only embarrassingly parallel operations (
map(), flatMap(), filter()) that have no joins or shuffles, exactly-once semantics are achieved even in AT_LEAST_ONCE mode — because no alignment is needed in the first place.Recovery
When a failure occurs:- Flink selects the latest completed checkpoint
k - It re-deploys the entire distributed dataflow
- Each operator is restored to the state it had at checkpoint
k - Sources start reading from position
S_k(e.g., a Kafka partition offset)
EmbeddedRocksDBStateBackend), operators restore from the last full snapshot and then apply incremental updates.
Savepoints
Savepoints are manually triggered checkpoints that you create explicitly to capture a consistent snapshot of the entire job’s state.| Checkpoints | Savepoints | |
|---|---|---|
| Triggered by | Flink automatically | User manually |
| Expiry | Expire when newer checkpoints complete (configurable) | Never expire automatically |
| Purpose | Automatic fault recovery | Planned migrations, upgrades, A/B testing |
| Format | May use incremental format | Always full snapshot, always aligned |
| Portability | Not intended for job upgrades | Designed to survive job changes |
- Upgrade Flink to a new version without losing state
- Update job logic while preserving accumulated state
- Change parallelism — state is redistributed across the new number of parallel instances
- Clone a job — start two versions of the same job from the same state for A/B testing
State in batch programs
Flink executes batch programs as a special case of streaming programs inBATCH execution mode, where streams are bounded. The fault tolerance model is different:
- Batch programs do not use checkpointing for fault tolerance. Recovery happens by fully replaying the bounded input.
- State backends in batch execution mode use simplified in-memory or out-of-core data structures rather than full key/value indexes.

