When a task fails, Flink needs to restart the failed task—and possibly other affected tasks—to return the job to a running state. Two independent mechanisms control this process:
- Restart strategy: decides whether and when tasks are restarted
- Failover strategy: decides which tasks are restarted
Restart strategies
You can set a default restart strategy for the entire cluster in config.yaml, and override it per-job programmatically.
If checkpointing is disabled, Flink defaults to the none (no restart) strategy. If checkpointing is enabled and no restart strategy is configured, Flink defaults to the exponential-delay strategy.
Fixed delay
Retries the job a fixed number of times with a fixed wait time between attempts. If the maximum number of attempts is exceeded, the job fails permanently.
restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3);
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ofSeconds(10));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
config = Configuration()
config.set_string('restart-strategy.type', 'fixed-delay')
config.set_string('restart-strategy.fixed-delay.attempts', '3')
config.set_string('restart-strategy.fixed-delay.delay', '10000 ms')
env = StreamExecutionEnvironment.get_execution_environment(config)
Exponential delay (default when checkpointing is enabled)
Starts with a short delay and exponentially increases the wait time between retries up to a configured maximum. After the job runs stably for a configured threshold period, the delay resets to its initial value.
This is the recommended default strategy because:
- Short initial delays allow fast recovery from transient failures
- Growing delays prevent thundering-herd problems when an external system (e.g., Kafka) is unhealthy
- Jitter ensures multiple jobs do not all retry simultaneously
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 1.4
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
restart-strategy.exponential-delay.attempts-before-reset-backoff: 10
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "exponential-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF,
Duration.ofSeconds(10));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF,
Duration.ofMinutes(2));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER,
1.4);
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD,
Duration.ofMinutes(10));
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR, 0.1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
Example delay progression with initial-backoff: 1s, backoff-multiplier: 2, max-backoff: 10s:
| Retry | Delay |
|---|
| 1st | 1 s |
| 2nd | 2 s |
| 3rd | 4 s |
| 4th | 8 s |
| 5th and beyond | 10 s (capped at max-backoff) |
With jitter-factor: 0.1, each delay is randomised ±10% to prevent multiple jobs from retrying at exactly the same time.
Failure rate
Restarts the job after failures, but gives up permanently once the failure rate (failures per time interval) exceeds a configured threshold.
restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
config.set(
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 3);
config.set(
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
Duration.ofMinutes(5));
config.set(
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
Duration.ofSeconds(10));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
No restart
The job fails immediately on any task failure, with no retry attempts.
restart-strategy.type: none
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
Failover strategies
The failover strategy determines which tasks must be restarted when a task fails. Configure it in config.yaml:
jobmanager.execution.failover-strategy: region # or: full
Restart all
Value: full
Restarts every task in the job on any failure. This is the simplest strategy but causes the most disruption: a single task failure stops and restarts the entire pipeline.
Restart pipelined region
Value: region (recommended)
Groups tasks into regions based on pipelined data exchanges. When a task fails, Flink computes the smallest set of regions that must be restarted to recover from the failure. Tasks connected only via batch exchanges are in separate regions and are not restarted unnecessarily.
The regions to restart are determined as follows:
Restart the failed region
The region containing the failed task is always restarted.
Restart regions with unavailable inputs
If any region that needs to be restarted requires data from a result partition that is no longer available, the region producing that partition is also restarted.
Restart downstream consumer regions
All regions that consume output from a region being restarted are also restarted, to guarantee data consistency in the presence of non-deterministic operators.
For streaming jobs, all operators are connected via pipelined exchanges, so the regional failover is equivalent to full failover. The benefit is largest for jobs that mix streaming and batch-style exchanges, or for SQL jobs with blocking shuffles.
Recovery process
When Flink recovers from a failure:
Detect the failure
The JobManager detects a failed task via the heartbeat mechanism.
Cancel affected tasks
All tasks in the affected region(s) are cancelled and their slots are released.
Wait for the restart delay
The configured restart strategy waits the appropriate delay before proceeding.
Restore from checkpoint
Flink locates the latest completed checkpoint and sends state handles to the restarting tasks.
Resume processing
Tasks restore their state from the checkpoint and resume consuming from the corresponding stream positions.
Enable checkpointing before testing failure recovery. Without checkpoints, Flink can still restart tasks but cannot restore state, which means it replays all data from the beginning of the source.