State lets operators remember information across events. In a streaming word count, the running count per word is state. In fraud detection, the history of recent transactions per user is state. Flink manages state reliably — it is checkpointed along with stream positions so the application recovers correctly after failures.
There are two categories of state:
- Keyed state: State that is scoped to a key, accessed only inside a
KeyedStream. Each key has its own isolated state. This is the most common type.
- Operator state: State that is scoped to a parallel operator instance (subtask), not a key. Used primarily in sources and sinks.
Keyed state
To use keyed state, first partition your stream with keyBy():
DataStream<Transaction> transactions = ...;
KeyedStream<Transaction, String> byUserId = transactions.keyBy(t -> t.getUserId());
State is accessed through the RuntimeContext inside a RichFunction. Get the RuntimeContext via getRuntimeContext(). Declare state in the open() method so Flink registers it before the function processes any elements.
ValueState
Stores a single value per key. Use it for running counts, last-seen values, or any scalar per-key data.
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.util.Collector;
public class CountTransactions extends RichFlatMapFunction<Transaction, String> {
private ValueState<Long> count;
@Override
public void open(OpenContext ctx) throws Exception {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("tx-count", Long.class);
count = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Transaction tx, Collector<String> out) throws Exception {
Long current = count.value();
long newCount = (current == null ? 0L : current) + 1;
count.update(newCount);
if (newCount % 100 == 0) {
out.collect("User " + tx.getUserId() + " reached " + newCount + " transactions");
}
}
}
transactions.keyBy(Transaction::getUserId)
.flatMap(new CountTransactions())
.print();
Key methods:
value() — returns the current value (null if never set)
update(T) — sets the value
clear() — removes the value
ListState
Stores an ordered list of elements per key.
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("recent-events", String.class);
ListState<String> recentEvents = getRuntimeContext().getListState(descriptor);
// In processElement:
recentEvents.add(event.toString());
Iterable<String> all = recentEvents.get();
recentEvents.update(newList); // replace the entire list
recentEvents.clear();
MapState
Stores a key-value map per stream key. Efficient for per-key lookup tables.
MapStateDescriptor<String, Double> descriptor =
new MapStateDescriptor<>("per-category-totals", String.class, Double.class);
MapState<String, Double> totals = getRuntimeContext().getMapState(descriptor);
// In processElement:
String category = event.getCategory();
double current = totals.contains(category) ? totals.get(category) : 0.0;
totals.put(category, current + event.getAmount());
for (Map.Entry<String, Double> entry : totals.entries()) {
System.out.println(entry.getKey() + " -> " + entry.getValue());
}
ReducingState
Stores a single value that is the running aggregation of all added values. Use a ReduceFunction with the same input and output type.
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
"running-sum",
Long::sum, // ReduceFunction<Long>
Long.class
);
ReducingState<Long> runningSum = getRuntimeContext().getReducingState(descriptor);
// In processElement:
runningSum.add(event.getAmount()); // internally calls reduce(current, amount)
Long total = runningSum.get();
AggregatingState
Like ReducingState but the accumulator type can differ from the input and output types. Uses AggregateFunction<IN, ACC, OUT>.
AggregatingStateDescriptor<Long, double[], Double> descriptor =
new AggregatingStateDescriptor<>(
"running-average",
new AverageAggregateFunction(), // AggregateFunction<Long, double[], Double>
double[].class
);
AggregatingState<Long, Double> avgState = getRuntimeContext().getAggregatingState(descriptor);
// In processElement:
avgState.add(event.getAmount());
Double avg = avgState.get();
Complete example: count-based windowed average
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
// Emits the average every 2 elements per key
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private ValueState<Tuple2<Long, Long>> sumState; // (count, sum)
@Override
public void open(OpenContext ctx) throws Exception {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"count-sum",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
);
sumState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> current = sumState.value();
if (current == null) {
current = Tuple2.of(0L, 0L);
}
current.f0 += 1; // increment count
current.f1 += input.f1; // add value
sumState.update(current);
if (current.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, current.f1 / current.f0));
sumState.clear();
}
}
}
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L)
)
.keyBy(t -> t.f0)
.flatMap(new CountWindowAverage())
.print();
// Output: (1, 4) after elements 3 and 5; (1, 7) is buffered waiting for a fourth element
State TTL (time-to-live)
State TTL automatically expires and removes keyed state entries after a configured duration. Use it to avoid unbounded state growth when old keys are no longer active.
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import java.time.Duration;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofHours(24))
// Reset TTL clock on every write (default)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// Also reset on reads
// .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
// Never return expired values (default)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);
ValueState<UserProfile> userProfile = getRuntimeContext().getState(descriptor);
TTL is based on processing time only. Event-time TTL is not supported.
Cleanup strategies
Expired values are removed lazily when accessed (the value() call returns null). You can also configure background cleanup:
Checks a batch of state entries on each state access:StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofMinutes(30))
.cleanupIncrementally(10, true) // check 10 entries; also trigger per record
.build();
Hooks into RocksDB compaction to remove expired entries:StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofMinutes(30))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
Excludes expired entries from checkpoint snapshots, reducing snapshot size:StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofMinutes(30))
.cleanupFullSnapshot()
.build();
Not available with incremental checkpointing in RocksDB.
Operator state
Operator state (non-keyed state) is bound to a single parallel subtask, not a key. It is used mainly in sources and sinks to track offsets or buffer records.
CheckpointedFunction
Implement CheckpointedFunction to manage operator state:
OperatorStateExample.java
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
public class BufferingSink implements SinkFunction<String>, CheckpointedFunction {
private final int threshold;
private List<String> bufferedElements = new ArrayList<>();
private ListState<String> checkpointedState;
public BufferingSink(int threshold) {
this.threshold = threshold;
}
@Override
public void invoke(String value, Context context) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
flush();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Save buffered elements to state so they survive a failure
checkpointedState.update(bufferedElements);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
"buffered-elements", String.class
);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (String element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
private void flush() throws Exception {
// write bufferedElements to external system
bufferedElements.clear();
}
}
Redistribution on rescaling
When you change a job’s parallelism, Flink redistributes operator state across the new number of subtasks:
getListState — even-split redistribution: the state list is split evenly across subtasks.
getUnionListState — union redistribution: every subtask receives the complete list. Be careful with large lists as this can cause OOM.
// Even split (each subtask gets a portion)
ListState<Long> splitState = context.getOperatorStateStore().getListState(descriptor);
// Union (each subtask gets everything)
ListState<Long> unionState = context.getOperatorStateStore().getUnionListState(descriptor);
State backends
The state backend determines how state is stored locally on TaskManagers:
HashMapStateBackend (default): Stores state as Java objects in the JVM heap. Fast for small-to-medium state.
EmbeddedRocksDBStateBackend: Stores state in RocksDB on disk. Supports very large state (terabytes) with incremental checkpointing.
Configure the backend in code:
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental checkpoints
Or in conf/config.yaml:
state.backend.type: rocksdb
state.backend.incremental: true