Skip to main content
Speculative execution is a mechanism to reduce the impact of slow tasks (stragglers) in batch jobs. Stragglers are often caused by hardware problems, high CPU load, or I/O contention on specific nodes. A single slow task can significantly delay the completion of an entire batch stage.

How speculative execution works

1

Slow task detection

Flink periodically monitors the execution time of running tasks. When a sufficient fraction of tasks in a stage have finished, the scheduler computes a baseline execution time (median of finished tasks × a configurable multiplier). Any still-running task that exceeds this baseline is marked as slow.
2

Node blocklisting

Nodes (TaskManagers) hosting slow tasks are identified as problematic and added to a blocklist. No new task attempts are scheduled on blocked nodes until the block duration expires.
3

Speculative attempt launch

New backup task attempts are created for the slow tasks and deployed on nodes that are not blocked. These speculative attempts process the same input data as the original attempt.
4

First-finished wins

The first attempt (original or speculative) to finish is admitted. Its output is passed to downstream tasks. All other running attempts for the same task are cancelled.
The original slow attempt continues running in parallel with the speculative attempt. Flink does not cancel the original attempt preemptively.

Prerequisites

Speculative execution is only supported by the Adaptive Batch Scheduler, which is the default batch scheduler in Flink.

Enabling speculative execution

execution.batch.speculative.enabled: true

Tuning configuration

Scheduler options

# Maximum number of concurrent attempts for one task (original + speculative)
# Default: 2 (original + 1 speculative)
execution.batch.speculative.max-concurrent-executions: 3

# How long a node stays blocked after hosting a slow task
# Default: 1 min
execution.batch.speculative.block-slow-node-duration: 5 min

Slow task detector options

# How often the detector checks for slow tasks (default: 1 s)
slow-task-detector.check-interval: 2 s

# Minimum finished-task ratio before computing the baseline (default: 0.75)
# The detector waits until this fraction of tasks in a stage have finished
slow-task-detector.execution-time.baseline-ratio: 0.75

# Multiplier applied to the median execution time to get the baseline (default: 1.5)
# Tasks taking longer than median × multiplier are considered slow
slow-task-detector.execution-time.baseline-multiplier: 2.0

# Minimum absolute execution time for a task to be a valid baseline (default: 1 min)
# Prevents very fast tasks from triggering speculative execution
slow-task-detector.execution-time.baseline-lower-bound: 30 s

How the baseline is computed

  1. Wait until baseline-ratio fraction of tasks in the stage have finished.
  2. Compute the median execution time of the finished tasks.
  3. Multiply by baseline-multiplier to get the baseline.
  4. Any running task with execution time > baseline and > baseline-lower-bound is marked slow.
Execution time is weighted by input data volume, so tasks processing proportionally more data are not falsely flagged as slow due to data skew.
For source tasks and tasks using Hybrid Shuffle mode, input data volume is unknown. Execution-time weighting does not apply, and more conservative baseline-lower-bound settings are recommended.

Enabling sources for speculative execution

Most sources work with speculative execution without changes. However, if your custom source uses SourceEvent, you must implement SupportsHandleExecutionAttemptSourceEvent on the SplitEnumerator:
public interface SupportsHandleExecutionAttemptSourceEvent {
    void handleSourceEvent(
        int subtaskId,
        int attemptNumber,
        SourceEvent sourceEvent
    );
}
This allows the SplitEnumerator to route source events to the correct attempt. Without this interface, job failures will occur when events arrive from speculative attempts. All built-in Apache Flink source connectors support speculative execution.

Enabling sinks for speculative execution

Sinks are excluded from speculative execution by default for safety reasons. To allow a sink to run speculatively, implement the SupportsConcurrentExecutionAttempts marker interface:
// Marker interface - no methods required
public interface SupportsConcurrentExecutionAttempts {}
This works for Sink, SinkFunction, and OutputFormat implementations.
Even if a Sink implements SupportsConcurrentExecutionAttempts, the Committer portion of a two-phase commit sink is never speculatively executed. Flink disables speculative execution for Committer operators automatically.
If any operator in a task does not support speculative execution, the entire task is marked as non-speculative.

Checking effectiveness

After enabling speculative execution, monitor its impact through: Web UI:
  • Navigate to your job’s vertex detail page and click the SubTasks tab to see speculative attempts listed alongside original attempts.
  • Check the cluster Overview and Task Managers pages to see which TaskManagers are currently blocked.
Metrics: Flink exposes metrics under the speculative-execution scope, including the number of speculative task attempts started and the number of blocked nodes. See the metrics documentation for the full list.

Build docs developers (and LLMs) love