Skip to main content
State lets operators remember information across events. In a streaming word count, the running count per word is state. In fraud detection, the history of recent transactions per user is state. Flink manages state reliably — it is checkpointed along with stream positions so the application recovers correctly after failures. There are two categories of state:
  • Keyed state: State that is scoped to a key, accessed only inside a KeyedStream. Each key has its own isolated state. This is the most common type.
  • Operator state: State that is scoped to a parallel operator instance (subtask), not a key. Used primarily in sources and sinks.

Keyed state

To use keyed state, first partition your stream with keyBy():
DataStream<Transaction> transactions = ...;
KeyedStream<Transaction, String> byUserId = transactions.keyBy(t -> t.getUserId());
State is accessed through the RuntimeContext inside a RichFunction. Get the RuntimeContext via getRuntimeContext(). Declare state in the open() method so Flink registers it before the function processes any elements.

ValueState

Stores a single value per key. Use it for running counts, last-seen values, or any scalar per-key data.
CountTransactions.java
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.util.Collector;

public class CountTransactions extends RichFlatMapFunction<Transaction, String> {

    private ValueState<Long> count;

    @Override
    public void open(OpenContext ctx) throws Exception {
        ValueStateDescriptor<Long> descriptor =
            new ValueStateDescriptor<>("tx-count", Long.class);
        count = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Transaction tx, Collector<String> out) throws Exception {
        Long current = count.value();
        long newCount = (current == null ? 0L : current) + 1;
        count.update(newCount);

        if (newCount % 100 == 0) {
            out.collect("User " + tx.getUserId() + " reached " + newCount + " transactions");
        }
    }
}

transactions.keyBy(Transaction::getUserId)
            .flatMap(new CountTransactions())
            .print();
Key methods:
  • value() — returns the current value (null if never set)
  • update(T) — sets the value
  • clear() — removes the value

ListState

Stores an ordered list of elements per key.
ListStateDescriptor<String> descriptor =
    new ListStateDescriptor<>("recent-events", String.class);
ListState<String> recentEvents = getRuntimeContext().getListState(descriptor);

// In processElement:
recentEvents.add(event.toString());

Iterable<String> all = recentEvents.get();
recentEvents.update(newList);   // replace the entire list
recentEvents.clear();

MapState

Stores a key-value map per stream key. Efficient for per-key lookup tables.
MapStateDescriptor<String, Double> descriptor =
    new MapStateDescriptor<>("per-category-totals", String.class, Double.class);
MapState<String, Double> totals = getRuntimeContext().getMapState(descriptor);

// In processElement:
String category = event.getCategory();
double current = totals.contains(category) ? totals.get(category) : 0.0;
totals.put(category, current + event.getAmount());

for (Map.Entry<String, Double> entry : totals.entries()) {
    System.out.println(entry.getKey() + " -> " + entry.getValue());
}

ReducingState

Stores a single value that is the running aggregation of all added values. Use a ReduceFunction with the same input and output type.
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
    "running-sum",
    Long::sum,      // ReduceFunction<Long>
    Long.class
);
ReducingState<Long> runningSum = getRuntimeContext().getReducingState(descriptor);

// In processElement:
runningSum.add(event.getAmount());  // internally calls reduce(current, amount)
Long total = runningSum.get();

AggregatingState

Like ReducingState but the accumulator type can differ from the input and output types. Uses AggregateFunction<IN, ACC, OUT>.
AggregatingStateDescriptor<Long, double[], Double> descriptor =
    new AggregatingStateDescriptor<>(
        "running-average",
        new AverageAggregateFunction(), // AggregateFunction<Long, double[], Double>
        double[].class
    );
AggregatingState<Long, Double> avgState = getRuntimeContext().getAggregatingState(descriptor);

// In processElement:
avgState.add(event.getAmount());
Double avg = avgState.get();

Complete example: count-based windowed average

CountWindowAverage.java
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

// Emits the average every 2 elements per key
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private ValueState<Tuple2<Long, Long>> sumState; // (count, sum)

    @Override
    public void open(OpenContext ctx) throws Exception {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
            "count-sum",
            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
        );
        sumState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> current = sumState.value();
        if (current == null) {
            current = Tuple2.of(0L, 0L);
        }

        current.f0 += 1;       // increment count
        current.f1 += input.f1; // add value

        sumState.update(current);

        if (current.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, current.f1 / current.f0));
            sumState.clear();
        }
    }
}

env.fromElements(
        Tuple2.of(1L, 3L),
        Tuple2.of(1L, 5L),
        Tuple2.of(1L, 7L)
    )
    .keyBy(t -> t.f0)
    .flatMap(new CountWindowAverage())
    .print();
// Output: (1, 4) after elements 3 and 5; (1, 7) is buffered waiting for a fourth element

State TTL (time-to-live)

State TTL automatically expires and removes keyed state entries after a configured duration. Use it to avoid unbounded state growth when old keys are no longer active.
StateTtlExample.java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofHours(24))
    // Reset TTL clock on every write (default)
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    // Also reset on reads
    // .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    // Never return expired values (default)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<UserProfile> descriptor =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

ValueState<UserProfile> userProfile = getRuntimeContext().getState(descriptor);
TTL is based on processing time only. Event-time TTL is not supported.

Cleanup strategies

Expired values are removed lazily when accessed (the value() call returns null). You can also configure background cleanup:
Checks a batch of state entries on each state access:
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofMinutes(30))
    .cleanupIncrementally(10, true) // check 10 entries; also trigger per record
    .build();

Operator state

Operator state (non-keyed state) is bound to a single parallel subtask, not a key. It is used mainly in sources and sinks to track offsets or buffer records.

CheckpointedFunction

Implement CheckpointedFunction to manage operator state:
OperatorStateExample.java
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;

public class BufferingSink implements SinkFunction<String>, CheckpointedFunction {

    private final int threshold;
    private List<String> bufferedElements = new ArrayList<>();
    private ListState<String> checkpointedState;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            flush();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Save buffered elements to state so they survive a failure
        checkpointedState.update(bufferedElements);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
            "buffered-elements", String.class
        );
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (String element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    private void flush() throws Exception {
        // write bufferedElements to external system
        bufferedElements.clear();
    }
}

Redistribution on rescaling

When you change a job’s parallelism, Flink redistributes operator state across the new number of subtasks:
  • getListState — even-split redistribution: the state list is split evenly across subtasks.
  • getUnionListState — union redistribution: every subtask receives the complete list. Be careful with large lists as this can cause OOM.
// Even split (each subtask gets a portion)
ListState<Long> splitState = context.getOperatorStateStore().getListState(descriptor);

// Union (each subtask gets everything)
ListState<Long> unionState = context.getOperatorStateStore().getUnionListState(descriptor);

State backends

The state backend determines how state is stored locally on TaskManagers:
  • HashMapStateBackend (default): Stores state as Java objects in the JVM heap. Fast for small-to-medium state.
  • EmbeddedRocksDBStateBackend: Stores state in RocksDB on disk. Supports very large state (terabytes) with incremental checkpointing.
Configure the backend in code:
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental checkpoints
Or in conf/config.yaml:
state.backend.type: rocksdb
state.backend.incremental: true

Build docs developers (and LLMs) love