Enabling checkpointing
Checkpointing is disabled by default. Enable it by callingenableCheckpointing on the StreamExecutionEnvironment:
CheckpointConfig:
| Option | Default | Description |
|---|---|---|
checkpointInterval | (required) | How often to trigger a checkpoint, in milliseconds |
checkpointingMode | EXACTLY_ONCE | Consistency mode: EXACTLY_ONCE or AT_LEAST_ONCE |
minPauseBetweenCheckpoints | 0 | Minimum time between end of last checkpoint and start of next |
maxConcurrentCheckpoints | 1 | Maximum number of in-progress checkpoints |
checkpointTimeout | 600,000 ms | Time after which an in-progress checkpoint is aborted |
Checkpoint storage
When checkpointing is enabled, state must be persisted somewhere. Flink provides two checkpoint storage options:JobManagerCheckpointStorage
Stores checkpoint snapshots in the JobManager’s JVM heap. Suitable only for small state and local development.
FileSystemCheckpointStorage
Writes state snapshots to files on a distributed filesystem (HDFS, S3, GCS, etc.). Required for production.
FileSystemCheckpointStorage. Otherwise it falls back to JobManagerCheckpointStorage.
JobManagerCheckpointStorage
TheJobManagerCheckpointStorage stores checkpoint snapshots in the JobManager heap. It is limited to small state sizes and is not suitable for high-availability setups.
Limitations:
- Each individual state is limited to 5 MB by default (configurable in the constructor)
- The aggregate state must fit in the JobManager’s memory
- State cannot be larger than the network frame size
JobManagerCheckpointStorage for:
- Local development and debugging
- Jobs with very little state (e.g., only stateless operators or a Kafka consumer with no windowing)
FileSystemCheckpointStorage
TheFileSystemCheckpointStorage writes state snapshots into files on a configured filesystem. Only minimal metadata is kept in the JobManager’s memory.
- config.yaml
- Java (per-job)
- Java (with write buffer)
FileSystemCheckpointStorage for:
- All production deployments
- All high-availability setups
- Any job where state must survive JobManager restarts
Checkpoint directory structure
Flink organises checkpoint data under the configured directory as follows:Retained checkpoints
By default, checkpoints are deleted when a job is cancelled. You can configure checkpoints to be retained so that you can restart a job from them later:| Mode | Behaviour |
|---|---|
RETAIN_ON_CANCELLATION | Checkpoint is kept when the job is cancelled. You must clean it up manually. |
DELETE_ON_CANCELLATION | Checkpoint is deleted when the job is cancelled. It remains available only if the job fails. |
Restoring from a checkpoint
You can restart a job from a retained checkpoint using the Flink CLI, exactly as you would with a savepoint::checkpointMetaDataPath is the path to the _metadata file inside the checkpoint directory. The JobManager must be able to access all data files referenced by this metadata file.
Checkpoints are not self-contained in the same way savepoints are. If you move checkpoint data, the JobManager must still be able to read all referenced files from their original paths. For a fully portable snapshot, use a savepoint instead.

