Skip to main content
Traditional batch job execution in Flink requires users to configure parallelism before the job runs. The Adaptive Batch Scheduler eliminates this requirement by dynamically determining execution plans at runtime, based on actual intermediate data characteristics.

Background

In real-world batch workloads, data volumes and distributions are difficult to predict accurately before execution. Static execution plan optimizers must rely on incomplete or inaccurate statistics, which can lead to poor parallelism choices, skewed data distribution, and suboptimal join strategies. The Adaptive Batch Scheduler addresses this by incrementally building the execution plan as the job runs, using actual observed data sizes and distributions at each stage to optimize the next stage.

Enabling the Adaptive Batch Scheduler

The Adaptive Batch Scheduler is the default batch scheduler in Flink. No configuration is required unless you have explicitly switched to a different scheduler. If you have overridden the scheduler, you can restore the default:
# Default for batch jobs; do not set this unless you changed it
jobmanager.scheduler: AdaptiveBatch
The Adaptive Batch Scheduler only applies to batch jobs. Streaming jobs use the default scheduler or the Adaptive Scheduler.

Automatically deciding parallelism

The Adaptive Batch Scheduler can determine the parallelism for each operator based on the size of its input data. This means operators closer to large data sources get higher parallelism, while operators processing filtered-down data use lower parallelism.

Enabling auto-parallelism

Auto-parallelism is enabled by default:
# Toggle auto-parallelism (default: true)
execution.batch.adaptive.auto-parallelism.enabled: true

# Lower bound for auto-determined parallelism (default: 1)
execution.batch.adaptive.auto-parallelism.min-parallelism: 1

# Upper bound for auto-determined parallelism
# Falls back to parallelism.default if not set
execution.batch.adaptive.auto-parallelism.max-parallelism: 128

# Target average data volume per task (default: 16mb)
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: 128mb

# Default parallelism for source operators or upper bound for source parallelism inference
execution.batch.adaptive.auto-parallelism.default-source-parallelism: 32

Usage rules

  • The scheduler only auto-determines parallelism for operators where you have not explicitly set parallelism.
  • If you call operator.setParallelism(n), that operator uses n and is excluded from auto-tuning.
  • To get the best results across an entire pipeline, avoid setting parallelism on individual operators.

Dynamic source parallelism inference

Sources can implement the DynamicParallelismInference interface to help the scheduler decide their parallelism:
public interface DynamicParallelismInference {
    int inferParallelism(Context context);
}
The Context provides the upper bound for inferred parallelism, the expected average data size per task, and dynamic filtering information. If a source does not implement this interface, execution.batch.adaptive.auto-parallelism.default-source-parallelism is used.

Performance tuning

# Decouple network memory requirements from parallelism
# (recommended for large-scale jobs)
taskmanager.network.memory.buffers-per-channel: 0

# Set max-parallelism to the worst-case value you expect to need
# Excessive values degrade hash shuffle and network performance
execution.batch.adaptive.auto-parallelism.max-parallelism: 128

Automatic data distribution balancing

The scheduler automatically balances data across downstream subtasks, ensuring each subtask processes roughly the same data volume. This applies to point-wise connections (Rescale) and all-to-all connections (Hash, Rebalance, Custom). This feature requires auto-parallelism to be enabled and works without additional configuration.
Automatic balancing does not fully resolve single-key hotspots: if one key contains far more data than all others, that key cannot be split across subtasks without breaking correctness. Use Adaptive Skewed Join Optimization for those cases.

Adaptive Broadcast Join

In batch joins, broadcasting the smaller table to all nodes can be significantly faster than shuffling both tables. The Adaptive Batch Scheduler supports runtime Broadcast Join decisions based on actual observed data sizes.
# Control when Broadcast Join is applied
# Options: COMPILE_TIME_ONLY, RUNTIME_ONLY, AUTO (default)
table.optimizer.adaptive-broadcast-join.strategy: AUTO

# Maximum table size that can be broadcast (default: 1mb; increase for large TM memory)
table.optimizer.join.broadcast-threshold: 10mb

Supported join types for broadcasting

Join typeLeft input can broadcastRight input can broadcast
InnerYesYes
LeftOuterNoYes
RightOuterYesNo
FullOuterNoNo
SemiNoYes
AntiNoYes
Adaptive Broadcast Join does not support Join operators inside MultiInput operators. It also cannot be enabled simultaneously with Batch Job Recovery Progress.

Adaptive Skewed Join Optimization

When certain keys appear much more frequently than others in a Join, some tasks end up processing far more data, slowing down the entire job. The Adaptive Skewed Join Optimization dynamically splits skewed partitions to distribute the load.
# Strategy: none, auto (default), or forced
table.optimizer.skewed-join-optimization.strategy: auto

# Minimum data amount that triggers skew optimization
table.optimizer.skewed-join-optimization.skewed-threshold: 256mb

# Target ratio: reduce max/median data ratio below this value
table.optimizer.skewed-join-optimization.skewed-factor: 3.0

Supported join types for skew splitting

Join typeLeft input can splitRight input can split
InnerYesYes
LeftOuterYesNo
RightOuterNoYes
FullOuterNoNo
SemiYesNo
AntiYesNo
Skewed Join Optimization requires auto-parallelism to be enabled.

Limitations

  • AdaptiveBatchScheduler only: All adaptive features require the Adaptive Batch Scheduler.
  • Blocking shuffle modes only: Supported shuffle modes are ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, and ALL_EXCHANGES_HYBRID_SELECTIVE.
  • FileInputFormat sources not supported: Use the FileSystem DataStream Connector or FileSystem SQL Connector instead of legacy readFile() / createInput(FileInputFormat, ...) APIs.
  • Web UI broadcast metrics discrepancy: For broadcast results, the byte/record counts on the Web UI may not match between upstream and downstream tasks. This is expected behavior.

Build docs developers (and LLMs) love