Skip to main content
Flink provides exactly-once or at-least-once fault tolerance for stateful streaming applications through checkpointing — periodic snapshots of all operator state and stream positions. When a task fails, Flink restores from the most recent successful checkpoint and replays the records that arrived after it.

How checkpointing works

Checkpointing uses Chandy-Lamport distributed snapshots. Flink injects special checkpoint barrier records into the stream. When a barrier reaches an operator, the operator snapshots its current state and forwards the barrier downstream. The checkpoint completes when all operators have snapshotted their state and the barriers have reached all sinks. Prerequisites for reliable checkpointing:
  1. Replayable sources: The source must be able to re-read records since the last checkpoint position (e.g., Kafka topic offsets, file byte positions).
  2. Durable state storage: Checkpoint snapshots must be written to a persistent file system (HDFS, S3, etc.) rather than JobManager memory.

Enabling checkpointing

Call enableCheckpointing(interval) on the environment. The interval is the time in milliseconds between the start of consecutive checkpoints.
CheckpointingSetup.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Checkpoint every 1 second
env.enableCheckpointing(1_000);

Full configuration example

CheckpointingConfig.java
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000);

CheckpointConfig config = env.getCheckpointConfig();

// Exactly-once semantics (default)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Guarantee at least 500ms of stream progress between checkpoints
config.setMinPauseBetweenCheckpoints(500);

// Abort checkpoints that take longer than 5 minutes
config.setCheckpointTimeout(300_000);

// Tolerate up to 2 consecutive checkpoint failures before failing the job
config.setTolerableCheckpointFailureNumber(2);

// Only allow one checkpoint to be in progress at a time
config.setMaxConcurrentCheckpoints(1);

// Retain checkpoints when the job is cancelled (useful for debugging)
config.setExternalizedCheckpointRetention(
    CheckpointConfig.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
);

// Enable unaligned checkpoints (reduces checkpoint time under backpressure)
config.enableUnalignedCheckpoints();

// Configure checkpoint storage (use filesystem in production)
Configuration storageConfig = new Configuration();
storageConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
storageConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///flink-checkpoints");
env.configure(storageConfig);

Exactly-once vs. at-least-once

Both modes use the same checkpointing mechanism. The difference is how checkpoint barriers are aligned:
ModeAlignmentLatencyGuarantees
EXACTLY_ONCEBarriers aligned before snapshotHigher under backpressureEach record affects state exactly once after recovery
AT_LEAST_ONCENo alignment — snapshot taken immediatelyLowerRecords may be replayed and affect state more than once
Most applications should use EXACTLY_ONCE. Consider AT_LEAST_ONCE only when you need very low checkpoint latency (< a few milliseconds) and your application can tolerate duplicate processing.

Checkpoint storage

By default, checkpoints are stored in the JobManager’s JVM heap. This is unsafe for production — the state is lost if the JobManager restarts. Configure filesystem storage:
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://my-bucket/flink/checkpoints");
env.configure(config);
Or configure it in conf/config.yaml for cluster-wide defaults:
config.yaml
state.checkpoints.dir: hdfs:///flink/checkpoints

Restart strategies

When a task fails, Flink restarts it according to the configured restart strategy. The strategy controls how many retries are attempted and how long to wait between them.

Fixed-delay restart

Retries up to a maximum number of times, waiting a fixed delay between each attempt.
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import java.time.Duration;

Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3);
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

Exponential-delay restart

Increases the delay between retries exponentially, up to a maximum. This is the default when checkpointing is enabled.
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "exponential-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF, Duration.ofSeconds(1));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF, Duration.ofMinutes(2));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER, 1.4);
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD, Duration.ofMinutes(10));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR, 0.1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

Failure-rate restart

Allows up to N failures within a time interval. If the failure rate is exceeded, the job fails.
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 3);
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.ofMinutes(5));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY, Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

No restart

The job fails immediately on the first task failure. Use for batch jobs where you prefer fast failure over retries.
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

Default restart strategy

If no strategy is set in application code, Flink uses the cluster-level default from conf/config.yaml. When checkpointing is enabled and no restart strategy is configured, Flink defaults to exponential-delay restart:
config.yaml
restart-strategy:
  type: exponential-delay
  exponential-delay:
    initial-backoff: 1 s
    max-backoff: 2 min
    backoff-multiplier: 1.4
    reset-backoff-threshold: 10 min
    jitter-factor: 0.1
If checkpointing is not enabled, the default is no restart (the job fails immediately on the first task failure). You can override the default cluster-wide:
config.yaml
restart-strategy:
  type: fixed-delay
  fixed-delay:
    attempts: 3
    delay: 10 s

Savepoints

A savepoint is a manually triggered checkpoint. Unlike regular checkpoints, savepoints are never automatically deleted. You use them to:
  • Upgrade application code without losing state
  • Change parallelism of a running job
  • Migrate a job to a new cluster
Trigger a savepoint via the CLI:
# Trigger savepoint without stopping the job
./bin/flink savepoint <job-id> hdfs:///flink/savepoints

# Stop a streaming job gracefully, creating a savepoint on the way
./bin/flink stop --savepointPath hdfs:///flink/savepoints <job-id>
Resume from a savepoint:
./bin/flink run --fromSavepoint hdfs:///flink/savepoints/savepoint-<id> my-job.jar

Unaligned checkpoints

Under heavy backpressure, aligned checkpoints can take a long time because barriers wait for all in-flight records to be processed before the snapshot is taken. Unaligned checkpoints include in-flight records in the checkpoint itself, so barriers no longer need to align. This reduces checkpoint duration but increases checkpoint size.
env.getCheckpointConfig().enableUnalignedCheckpoints();
Unaligned checkpoints only work with EXACTLY_ONCE mode and at most one concurrent checkpoint.

Checkpointing with finished tasks

Starting with Flink 1.14, checkpointing can continue even after some tasks have finished processing all their input (e.g., in mixed bounded/unbounded pipelines). This is enabled by default since Flink 1.15. Tasks that have finished do not contribute to subsequent checkpoints. Sinks using two-phase commit wait for a final checkpoint before the task exits, to ensure all committed transactions are included.

Build docs developers (and LLMs) love