Skip to main content
A savepoint is a consistent snapshot of a streaming job’s execution state, created manually or as part of a stop operation. Unlike checkpoints, savepoints are designed to be durable, portable, and to survive job upgrades. You use savepoints to:
  • Stop and resume a job, preserving all state
  • Fork a job into two separate instances from the same point in time
  • Update job logic or Flink version while retaining state
  • Rescale parallelism up or down
Savepoints consist of two parts: binary state files on stable storage (HDFS, S3, etc.) and a small metadata file containing pointers to those state files.

Savepoints vs checkpoints

CheckpointsSavepoints
Triggered byFlink automaticallyUser explicitly
PurposeAutomatic fault tolerancePlanned maintenance and upgrades
LifetimeDeleted on job cancel (unless retained)Kept until you delete them
PortabilityNot self-contained; paths may be absoluteSelf-contained; can be moved
FormatState-backend specificCanonical (default) or native

Assigning operator IDs

Before relying on savepoints, assign stable UIDs to all stateful operators in your job. Flink uses these UIDs to map state from a savepoint back to the correct operator after a restart.
DataStream<String> stream = env
    .addSource(new StatefulSource())
    .uid("source-id")
    .shuffle()
    .map(new StatefulMapper())
    .uid("mapper-id")
    .print();
If you do not assign UIDs, Flink auto-generates them from your job graph. Those auto-generated IDs change whenever you restructure the job, which will break savepoint compatibility.
Always assign explicit UIDs to stateful operators before going to production. Without them, any structural change to your job graph—even adding or removing a stateless operator—can make it impossible to restore from an existing savepoint.
A savepoint holds a map of Operator ID → State:
Operator ID  | State
-------------+---------------------------
source-id    | State of StatefulSource
mapper-id    | State of StatefulMapper
Stateless operators are not included in the savepoint.

Triggering a savepoint

$ bin/flink savepoint :jobId [:targetDirectory]
This triggers a savepoint for the running job and prints the path to the created savepoint. You can specify a format:
# Canonical format (default, cross-backend compatible)
$ bin/flink savepoint --type canonical :jobId [:targetDirectory]

# Native format (faster, backend-specific)
$ bin/flink savepoint --type native :jobId [:targetDirectory]
For large state where the client may time out, use detached mode:
$ bin/flink savepoint :jobId [:targetDirectory] -detached
In detached mode the client returns a trigger ID immediately. Poll the REST API to check completion status.

Triggering on YARN

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

Stopping a job with a savepoint

To atomically take a savepoint and stop the job:
$ bin/flink stop --type canonical --savepointPath [:targetDirectory] :jobId
This is the safest way to stop a job because the savepoint and the job stop are a single atomic operation. The job will not stop until the savepoint has been successfully written.
Starting with Flink 1.15, intermediate savepoints (triggered with flink savepoint, not flink stop --savepointPath) are not used for recovery and do not commit transactional side effects. Use flink stop --savepointPath when you need side effects (e.g., Kafka transactions) to be committed at exactly the savepoint boundary.

Savepoint directory structure

Flink writes savepoints under the configured or specified directory:
/savepoints/
    savepoint-:shortjobid-:savepointid/
        _metadata      # Metadata file with pointers to state files
        ...            # State files
Savepoints are self-contained and relocatable. You can move the entire savepoint directory and restore from the new location.

Savepoint format

A unified format that works across all state backends. You can take a savepoint with one backend and restore with a different backend. This is the most compatible format and is recommended for upgrades and long-term storage.

Resuming from a savepoint

$ bin/flink run -s :savepointPath [:runArgs]
You can pass either the savepoint directory or the _metadata file path.

Allowing non-restored state

If you removed an operator from your job, Flink will fail to restore by default because there is state in the savepoint that cannot be mapped to any operator. Allow this with the -n flag:
$ bin/flink run -s :savepointPath -n [:runArgs]
Using --allowNonRestoredState (-n) can silently drop state. Verify carefully that dropped state is intentional and that remaining state maps correctly to the right operators.

Claim mode

The claim mode controls who takes ownership of the savepoint files after restore.
Flink does not take ownership of the savepoint files. You retain full control. You can start multiple jobs from the same savepoint. Flink forces the first checkpoint after restore to be a full checkpoint, then returns to incremental checkpointing as configured.
$ bin/flink run -s :savepointPath -claimMode NO_CLAIM [:runArgs]
Flink takes ownership of the savepoint and treats it like a regular checkpoint. Flink may delete it once it is no longer needed for recovery. Do not manually delete the savepoint or start multiple jobs from it.
$ bin/flink run -s :savepointPath -claimMode CLAIM [:runArgs]
Flink never deletes the initial checkpoint but ownership is ambiguous. Deprecated since Flink 1.15 and will be removed in Flink 2.0. Use NO_CLAIM or CLAIM instead.

Disposing a savepoint

$ bin/flink savepoint -d :savepointPath
You can also delete savepoint files manually via filesystem commands. Each savepoint is self-contained, so deleting one savepoint directory does not affect other savepoints or checkpoints.

Configuration

Set a default savepoint directory to avoid specifying it on every trigger command:
execution.checkpointing.savepoint-dir: hdfs:///flink/savepoints
The savepoint target directory must be accessible by both JobManagers and TaskManagers, such as a distributed filesystem (HDFS, S3, GCS). A local path will not work in a multi-node cluster.

FAQ

Yes, as a rule of thumb. Strictly speaking, only stateful operators need UIDs because savepoints only contain state for stateful operators. In practice, assign UIDs to all operators because built-in operators such as the Window operator are stateful, and it is not always obvious which built-in operators hold state.
The new operator starts with empty state. This is equivalent to a stateless operator on first run.
By default, the restore fails because there is unmatched state in the savepoint. Use --allowNonRestoredState (-n) to skip the unmapped state.
Yes. Specify the new parallelism in the run command. Flink redistributes the keyed state across the new number of tasks.
Yes. Savepoints are self-contained. Move the entire savepoint directory and restore from the new path. Two exceptions apply: (1) if S3 entropy injection is enabled, the savepoint contains absolute paths and cannot be moved; (2) if the job uses task-owned state (e.g., GenericWriteAheadLog sink).

Build docs developers (and LLMs) love