Skip to main content
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, giving your application the same semantics as a failure-free execution. When a failure occurs, Flink restarts from the most recent completed checkpoint. Flink’s checkpointing is based on the Chandy-Lamport distributed snapshot algorithm. Checkpoint barriers flow through the data stream from sources to sinks; when all operators have processed a barrier, a consistent snapshot of the entire job’s state is written to durable storage.

Enabling checkpointing

Checkpointing is disabled by default. Enable it by calling enableCheckpointing on the StreamExecutionEnvironment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000);
You can also configure checkpointing options on the CheckpointConfig:
CheckpointConfig config = env.getCheckpointConfig();

// Use exactly-once semantics (default)
config.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);

// Minimum 30 seconds between checkpoints
config.setMinPauseBetweenCheckpoints(30_000);

// Maximum of one checkpoint in progress at a time
config.setMaxConcurrentCheckpoints(1);

// Checkpoint must complete within 10 minutes or is aborted
config.setCheckpointTimeout(600_000);
OptionDefaultDescription
checkpointInterval(required)How often to trigger a checkpoint, in milliseconds
checkpointingModeEXACTLY_ONCEConsistency mode: EXACTLY_ONCE or AT_LEAST_ONCE
minPauseBetweenCheckpoints0Minimum time between end of last checkpoint and start of next
maxConcurrentCheckpoints1Maximum number of in-progress checkpoints
checkpointTimeout600,000 msTime after which an in-progress checkpoint is aborted
Set minPauseBetweenCheckpoints to prevent the system from spending too much time checkpointing when state is large and individual checkpoints take a long time. A value of 30–60 seconds works well for most production jobs.

Checkpoint storage

When checkpointing is enabled, state must be persisted somewhere. Flink provides two checkpoint storage options:

JobManagerCheckpointStorage

Stores checkpoint snapshots in the JobManager’s JVM heap. Suitable only for small state and local development.

FileSystemCheckpointStorage

Writes state snapshots to files on a distributed filesystem (HDFS, S3, GCS, etc.). Required for production.
If you configure a checkpoint directory, Flink automatically uses FileSystemCheckpointStorage. Otherwise it falls back to JobManagerCheckpointStorage.

JobManagerCheckpointStorage

The JobManagerCheckpointStorage stores checkpoint snapshots in the JobManager heap. It is limited to small state sizes and is not suitable for high-availability setups. Limitations:
  • Each individual state is limited to 5 MB by default (configurable in the constructor)
  • The aggregate state must fit in the JobManager’s memory
  • State cannot be larger than the network frame size
// Allow up to 10 MB of state in JobManager
new JobManagerCheckpointStorage(10 * 1024 * 1024);
Use JobManagerCheckpointStorage for:
  • Local development and debugging
  • Jobs with very little state (e.g., only stateless operators or a Kafka consumer with no windowing)

FileSystemCheckpointStorage

The FileSystemCheckpointStorage writes state snapshots into files on a configured filesystem. Only minimal metadata is kept in the JobManager’s memory.
execution.checkpointing.dir: hdfs://namenode:8020/flink/checkpoints
Use FileSystemCheckpointStorage for:
  • All production deployments
  • All high-availability setups
  • Any job where state must survive JobManager restarts

Checkpoint directory structure

Flink organises checkpoint data under the configured directory as follows:
/user-defined-checkpoint-dir
    /{job-id}/
        shared/        # State shared across multiple checkpoints (RocksDB SST files)
        taskowned/     # State that must never be dropped by the JobManager
        chk-1/         # Checkpoint 1 data
        chk-2/         # Checkpoint 2 data
        chk-3/
        ...
The checkpoint directory layout is not a public API and may change between Flink releases.

Retained checkpoints

By default, checkpoints are deleted when a job is cancelled. You can configure checkpoints to be retained so that you can restart a job from them later:
CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointRetention(
    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
);
The two retention modes are:
ModeBehaviour
RETAIN_ON_CANCELLATIONCheckpoint is kept when the job is cancelled. You must clean it up manually.
DELETE_ON_CANCELLATIONCheckpoint is deleted when the job is cancelled. It remains available only if the job fails.

Restoring from a checkpoint

You can restart a job from a retained checkpoint using the Flink CLI, exactly as you would with a savepoint:
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
The :checkpointMetaDataPath is the path to the _metadata file inside the checkpoint directory. The JobManager must be able to access all data files referenced by this metadata file.
Checkpoints are not self-contained in the same way savepoints are. If you move checkpoint data, the JobManager must still be able to read all referenced files from their original paths. For a fully portable snapshot, use a savepoint instead.

Build docs developers (and LLMs) love