Skip to main content
Many stream processing operations need to remember information across multiple events — not just process each event in isolation. Flink’s stateful stream processing model is what makes complex, fault-tolerant applications possible at scale.

What is state?

An operation is stateful when it needs information from previous events to process the current one. Examples include:
  • Pattern detection: storing the sequence of events seen so far while searching for a complex event pattern
  • Windowed aggregations: maintaining running counts, sums, or averages within a time window
  • Machine learning: holding current model parameters while training over a stream of data points
  • Deduplication: keeping a set of already-seen IDs to filter out duplicate events
  • Session tracking: maintaining per-user session data across a series of interactions
By contrast, a simple event parser that transforms each record independently — reading fields and emitting a new record — carries no state. Flink needs to be aware of all state in order to:
  1. Make it fault tolerant via checkpoints and savepoints
  2. Enable rescaling — redistributing state across parallel instances when you change parallelism

Keyed state

Keyed state is the most common form of state in Flink. It is maintained in what can be thought of as an embedded key/value store that is strictly partitioned and distributed together with the streams read by stateful operators. Access to keyed state is only possible on keyed streams — that is, after a keyBy() operation. Each operator instance only has access to the state for keys assigned to it.
 Stream of (userId, action) events

     keyBy(userId)

    ┌────┴──────────────────────────────┐
    │  Keys A–M    │  Keys N–Z          │
    │  ┌─────────┐ │  ┌─────────┐      │
    │  │userId=A │ │  │userId=P │      │
    │  │  state  │ │  │  state  │      │
    │  └─────────┘ │  └─────────┘      │
    └──────────────┴────────────────────┘
      Subtask 1         Subtask 2
Aligning keys of streams and state ensures all state updates are local operations, which guarantees consistency without transaction overhead.

Key groups

Internally, keyed state is organized into Key Groups — the atomic unit by which Flink redistributes state when you rescale a job. Flink creates exactly as many Key Groups as the configured maximum parallelism. When you change the parallelism of a running job (via a savepoint), each parallel instance takes ownership of a contiguous range of Key Groups.

Keyed state types

Flink exposes several state primitives for use inside keyed operators:
Stores a single value of type T per key. Use it to track the latest known value for a key — for example, the last event timestamp or a running count.
// Declare state in open()
ValueStateDescriptor<Long> descriptor =
    new ValueStateDescriptor<>("last-seen", Long.class);
ValueState<Long> lastSeen = getRuntimeContext().getState(descriptor);

// Use in processElement()
Long previous = lastSeen.value(); // null if key is new
lastSeen.update(currentTimestamp);
Stores a list of values of type T per key. Use it when you need to accumulate multiple events before emitting results — for example, collecting all events in a session.
ListStateDescriptor<String> descriptor =
    new ListStateDescriptor<>("events", String.class);
ListState<String> eventList = getRuntimeContext().getListState(descriptor);

// Append to the list
eventList.add(event.getPayload());

// Iterate over all stored values
for (String e : eventList.get()) {
    // process each stored event
}
Stores a map of key-value pairs per stream key. Use it when you need to maintain a dictionary or lookup table scoped to each key — for example, tracking per-feature values for each user.
MapStateDescriptor<String, Integer> descriptor =
    new MapStateDescriptor<>("feature-counts", String.class, Integer.class);
MapState<String, Integer> featureCounts = getRuntimeContext().getMapState(descriptor);

// Increment count for a feature
String feature = event.getFeatureName();
int current = featureCounts.contains(feature)
    ? featureCounts.get(feature) : 0;
featureCounts.put(feature, current + 1);
ReducingState<T> stores a single value that represents the aggregation of all values added to the state. Every call to add() immediately merges the new value into the stored aggregate using a provided ReduceFunction.AggregatingState<IN, OUT> is more flexible — the input and output types may differ, and aggregation uses an AggregateFunction.
// ReducingState example: sum of longs
ReducingStateDescriptor<Long> descriptor =
    new ReducingStateDescriptor<>("sum", Long::sum, Long.class);
ReducingState<Long> sum = getRuntimeContext().getReducingState(descriptor);

sum.add(event.getValue());    // merges immediately
Long currentSum = sum.get();  // retrieve result

Operator state

Operator state (also called non-keyed state) is scoped to an operator instance rather than a key. Each parallel instance of the operator has its own state. This is less common than keyed state but is used by connectors — for example, a Kafka source stores the per-partition offsets it has consumed as operator state. Operator state supports two redistribution modes on rescaling:
  • Even split: the list of state items is split evenly across the new parallel instances
  • Union: every new parallel instance receives a full copy of the union of all state

State backends

A state backend determines how and where operator state is stored at runtime. You can switch backends without changing application logic.
Stores all state in the Java heap memory of the TaskManager as a HashMap. This is the default backend.
  • Fast reads and writes (in-memory)
  • State size limited by available heap memory
  • Checkpoint snapshots are serialized to the configured checkpoint storage (e.g., HDFS or S3)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
Best for jobs with small-to-medium state that fits comfortably in heap memory.
Stores state in a locally embedded RocksDB instance on each TaskManager. RocksDB is an LSM-tree-based key/value store that spills to disk, so state size is limited only by available disk space rather than heap memory.
  • Supports state sizes far exceeding available JVM heap
  • Reads and writes are slower than in-memory (RocksDB uses native memory and disk I/O)
  • Supports incremental checkpoints: only changed SST files are uploaded, not the full state
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// Enable incremental checkpoints
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
Best for jobs with very large state (gigabytes to terabytes) or when state must survive TaskManager restarts without a full re-snapshot.
You can also configure the state backend in conf/config.yaml with state.backend.type: hashmap or state.backend.type: rocksdb so you don’t need to modify application code.

State persistence and checkpointing

Flink implements fault tolerance using a combination of stream replay and checkpointing. A checkpoint marks a specific point in each input stream along with the corresponding state for each operator. When a failure occurs, Flink restores all operator states to the last completed checkpoint and replays records from that point — achieving exactly-once processing semantics.
Checkpointing is disabled by default. You must explicitly enable it in your application.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000);

Checkpoint barriers

The core mechanism of Flink’s distributed snapshotting is stream barriers. Barriers are lightweight markers injected into the data stream at sources and flow downstream alongside records.
  Source stream:
  ... [record] [record] [barrier n] [record] [record] [barrier n+1] ...

                    Separates snapshot n
                    from snapshot n+1
Each barrier carries the ID of the snapshot it belongs to. When an operator receives snapshot barrier n from all its input streams, it:
  1. Takes a snapshot of its current state (asynchronously)
  2. Emits barrier n downstream
  3. Continues processing records
Once the sink acknowledges that it has received barrier n, the checkpoint is considered complete and is reported to the JobManager’s checkpoint coordinator.

Aligned vs. unaligned checkpoints

For operators with multiple input streams, Flink must align barriers before snapshotting: Aligned checkpointing (default):
  • When barrier n arrives from input stream A but not yet from stream B, the operator buffers records from stream A
  • Once barrier n arrives from all inputs, the operator snapshots and forwards the barrier
  • This guarantees exactly-once but adds latency for slow or backpressured inputs
Unaligned checkpointing (available since Flink 1.11):
  • The operator immediately forwards barrier n downstream without waiting for other inputs
  • All in-flight records (those that overtook the barrier) are included in the checkpoint state
  • Lower checkpoint latency for pipelines with persistent backpressure, but higher I/O overhead
env.enableCheckpointing(60_000);
env.getCheckpointConfig().enableUnalignedCheckpoints();
Savepoints always use aligned checkpointing. Unaligned checkpoints are only for automatic fault-tolerance checkpoints.

Exactly-once vs. at-least-once

You can configure the checkpointing mode:
import org.apache.flink.streaming.api.CheckpointingMode;

env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingConsistencyMode(
    CheckpointingMode.EXACTLY_ONCE  // default
    // or CheckpointingMode.AT_LEAST_ONCE for lower latency
);
In AT_LEAST_ONCE mode, barrier alignment is skipped. An operator continues processing all inputs even after receiving a barrier from some inputs. On recovery, some records may be processed more than once.
For dataflows with only embarrassingly parallel operations (map(), flatMap(), filter()) that have no joins or shuffles, exactly-once semantics are achieved even in AT_LEAST_ONCE mode — because no alignment is needed in the first place.

Recovery

When a failure occurs:
  1. Flink selects the latest completed checkpoint k
  2. It re-deploys the entire distributed dataflow
  3. Each operator is restored to the state it had at checkpoint k
  4. Sources start reading from position S_k (e.g., a Kafka partition offset)
If incremental checkpointing is used (with EmbeddedRocksDBStateBackend), operators restore from the last full snapshot and then apply incremental updates.

Savepoints

Savepoints are manually triggered checkpoints that you create explicitly to capture a consistent snapshot of the entire job’s state.
CheckpointsSavepoints
Triggered byFlink automaticallyUser manually
ExpiryExpire when newer checkpoints complete (configurable)Never expire automatically
PurposeAutomatic fault recoveryPlanned migrations, upgrades, A/B testing
FormatMay use incremental formatAlways full snapshot, always aligned
PortabilityNot intended for job upgradesDesigned to survive job changes
You can create a savepoint via the CLI or REST API:
# Trigger a savepoint for a running job
./bin/flink savepoint <jobId> [targetDirectory]

# Restore a job from a savepoint
./bin/flink run \
  --fromSavepoint /path/to/savepoint \
  ./my-flink-job.jar
Savepoints are commonly used to:
  • Upgrade Flink to a new version without losing state
  • Update job logic while preserving accumulated state
  • Change parallelism — state is redistributed across the new number of parallel instances
  • Clone a job — start two versions of the same job from the same state for A/B testing
To restore state reliably from a savepoint, operators must have stable UIDs assigned. Without explicit UIDs, Flink auto-generates them from the job graph structure, which may change if you modify the job.
DataStream<String> stream = env
    .addSource(new KafkaSource<>(...)).uid("kafka-source")
    .map(new MyMapper()).uid("my-mapper")
    .keyBy(record -> record.getUserId());

State in batch programs

Flink executes batch programs as a special case of streaming programs in BATCH execution mode, where streams are bounded. The fault tolerance model is different:
  • Batch programs do not use checkpointing for fault tolerance. Recovery happens by fully replaying the bounded input.
  • State backends in batch execution mode use simplified in-memory or out-of-core data structures rather than full key/value indexes.
This tradeoff makes regular batch processing cheaper (no checkpoint overhead) at the cost of potentially longer recovery times.

Build docs developers (and LLMs) love