How checkpointing works
Checkpointing uses Chandy-Lamport distributed snapshots. Flink injects special checkpoint barrier records into the stream. When a barrier reaches an operator, the operator snapshots its current state and forwards the barrier downstream. The checkpoint completes when all operators have snapshotted their state and the barriers have reached all sinks. Prerequisites for reliable checkpointing:- Replayable sources: The source must be able to re-read records since the last checkpoint position (e.g., Kafka topic offsets, file byte positions).
- Durable state storage: Checkpoint snapshots must be written to a persistent file system (HDFS, S3, etc.) rather than JobManager memory.
Enabling checkpointing
CallenableCheckpointing(interval) on the environment. The interval is the time in milliseconds between the start of consecutive checkpoints.
CheckpointingSetup.java
Full configuration example
CheckpointingConfig.java
Exactly-once vs. at-least-once
Both modes use the same checkpointing mechanism. The difference is how checkpoint barriers are aligned:| Mode | Alignment | Latency | Guarantees |
|---|---|---|---|
EXACTLY_ONCE | Barriers aligned before snapshot | Higher under backpressure | Each record affects state exactly once after recovery |
AT_LEAST_ONCE | No alignment — snapshot taken immediately | Lower | Records may be replayed and affect state more than once |
EXACTLY_ONCE. Consider AT_LEAST_ONCE only when you need very low checkpoint latency (< a few milliseconds) and your application can tolerate duplicate processing.
Checkpoint storage
By default, checkpoints are stored in the JobManager’s JVM heap. This is unsafe for production — the state is lost if the JobManager restarts. Configure filesystem storage:conf/config.yaml for cluster-wide defaults:
config.yaml
Restart strategies
When a task fails, Flink restarts it according to the configured restart strategy. The strategy controls how many retries are attempted and how long to wait between them.Fixed-delay restart
Retries up to a maximum number of times, waiting a fixed delay between each attempt.Exponential-delay restart
Increases the delay between retries exponentially, up to a maximum. This is the default when checkpointing is enabled.Failure-rate restart
Allows up to N failures within a time interval. If the failure rate is exceeded, the job fails.No restart
The job fails immediately on the first task failure. Use for batch jobs where you prefer fast failure over retries.Default restart strategy
If no strategy is set in application code, Flink uses the cluster-level default fromconf/config.yaml. When checkpointing is enabled and no restart strategy is configured, Flink defaults to exponential-delay restart:
config.yaml
no restart (the job fails immediately on the first task failure). You can override the default cluster-wide:
config.yaml
Savepoints
A savepoint is a manually triggered checkpoint. Unlike regular checkpoints, savepoints are never automatically deleted. You use them to:- Upgrade application code without losing state
- Change parallelism of a running job
- Migrate a job to a new cluster
Unaligned checkpoints
Under heavy backpressure, aligned checkpoints can take a long time because barriers wait for all in-flight records to be processed before the snapshot is taken. Unaligned checkpoints include in-flight records in the checkpoint itself, so barriers no longer need to align. This reduces checkpoint duration but increases checkpoint size.EXACTLY_ONCE mode and at most one concurrent checkpoint.

