Skip to main content
Traditionally, Flink job parallelism is fixed at submission time. Elastic scaling changes this: Flink can adjust parallelism at runtime as resources are added or removed, without manual intervention. Flink provides two schedulers for elastic scaling:
  • Adaptive Scheduler: for streaming jobs, adjusts parallelism within a running cluster
  • Adaptive Batch Scheduler: for batch jobs, see Adaptive Batch Execution

Adaptive Scheduler

The Adaptive Scheduler monitors available task slots and adjusts job parallelism automatically:
  • If fewer slots are available than the configured parallelism (e.g., a TaskManager failed), the scheduler reduces parallelism and keeps the job running.
  • When new slots become available, the job scales back up to the configured parallelism.
  • In Reactive Mode, the configured parallelism is treated as infinity: the job always uses all available resources.

Enabling the Adaptive Scheduler

jobmanager.scheduler: adaptive
The Adaptive Scheduler is for streaming jobs only. Batch jobs use the Adaptive Batch Scheduler automatically.

Adaptive Scheduler configuration

# How long to wait for sufficient resources before failing the job
# -1 means wait forever (default for Reactive Mode)
jobmanager.adaptive-scheduler.resource-wait-timeout: 5 min

# How long to wait for resources to stabilize before scheduling
# (useful when TaskManagers connect gradually)
jobmanager.adaptive-scheduler.resource-stabilization-timeout: 10 s

# Minimum aggregate parallelism increase required to trigger a scale-up
jobmanager.adaptive-scheduler.min-parallelism-increase: 1

# Minimum time between two scaling operations (default: 30s)
jobmanager.adaptive-scheduler.scaling-interval.min: 30 s

# If set, force a rescale after this duration when new resources arrive,
# even if min-parallelism-increase is not satisfied (disabled by default)
jobmanager.adaptive-scheduler.scaling-interval.max: 5 min

Limitations

  • Streaming jobs only. Batch jobs use the Adaptive Batch Scheduler.
  • No partial failover: when a task fails, the entire job restarts (not just the affected region). This increases recovery time for embarrassingly parallel jobs compared to the default scheduler.
  • Each scaling event triggers a job restart, incrementing task attempt counters.

Reactive Mode

Reactive Mode is a special configuration of the Adaptive Scheduler designed for single-job application clusters. In Reactive Mode:
  • The configured parallelism is ignored; Flink uses all available slots.
  • Adding a TaskManager scales up; removing one scales down.
  • Job restarts use the latest completed checkpoint — no manual savepoint is needed.

Getting started with Reactive Mode

# Copy job to lib/
cp ./examples/streaming/TopSpeedWindowing.jar lib/

# Start the application in Reactive Mode with checkpointing enabled
./bin/standalone-job.sh start \
    -Dscheduler-mode=reactive \
    -Dexecution.checkpointing.interval=10s \
    -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

# Start a TaskManager
./bin/taskmanager.sh start
Scale up by starting an additional TaskManager:
./bin/taskmanager.sh start
Scale down by stopping one:
./bin/taskmanager.sh stop
Flink automatically rescales the job and restores from the latest checkpoint when the cluster composition changes.

Reactive Mode configuration

scheduler-mode: reactive
When Reactive Mode is enabled:
  • resource-wait-timeout defaults to -1 (wait forever for resources).
  • resource-stabilization-timeout defaults to 0 (start as soon as enough resources are available).
Max parallelism: The scheduler respects per-operator max parallelism settings. If you do not set a max parallelism, default rules apply. Excessively high max parallelism values can hurt performance because Flink allocates internal structures proportionally.
// Set max parallelism in code
DataStream<String> stream = env.addSource(source)
    .setMaxParallelism(256);

Recommendations for Reactive Mode

Always enable periodic checkpointing for stateful jobs in Reactive Mode. Without checkpointing, a rescaling event causes the job to lose all state.
execution.checkpointing.interval: 30 s
  • Set a restart strategy. If no restart strategy is configured and a failure occurs, Reactive Mode fails the job instead of restarting.
  • When scaling down, Flink waits for a heartbeat timeout (~50 seconds by default) before redeploying at lower parallelism if the TaskManager process was killed forcibly (SIGKILL). Use SIGTERM to allow a clean shutdown.
  • Reduce heartbeat.timeout cautiously — setting it too low can cause spurious failures during GC pauses or network hiccups.

Reactive Mode limitations

  • Only supported in standalone Application mode (including Docker and standalone Kubernetes Application clusters).
  • Not supported with active resource providers (native Kubernetes, YARN).
  • Not supported for session clusters.
  • Inherits all Adaptive Scheduler limitations.

Externalized Declarative Resource Management

Starting with Flink 1.18, you can dynamically update the parallelism bounds of a running job via the REST API without restarting it:
PUT /jobs/<job-id>/resource-requirements

{
    "<vertex-id>": {
        "parallelism": {
            "lowerBound": 3,
            "upperBound": 8
        }
    }
}
This is useful for:
  • Session clusters where multiple jobs compete for resources and you need per-job resource control.
  • Application clusters on native Kubernetes where you want Flink to scale up/down automatically while retaining explicit parallelism control.
You can also use the Flink Web UI’s task list to click the scale-up/scale-down buttons interactively.
Externalized Declarative Resource Management is an MVP feature. The Flink community actively seeks user feedback via the mailing lists.

Build docs developers (and LLMs) love