Background
In real-world batch workloads, data volumes and distributions are difficult to predict accurately before execution. Static execution plan optimizers must rely on incomplete or inaccurate statistics, which can lead to poor parallelism choices, skewed data distribution, and suboptimal join strategies. The Adaptive Batch Scheduler addresses this by incrementally building the execution plan as the job runs, using actual observed data sizes and distributions at each stage to optimize the next stage.Enabling the Adaptive Batch Scheduler
The Adaptive Batch Scheduler is the default batch scheduler in Flink. No configuration is required unless you have explicitly switched to a different scheduler. If you have overridden the scheduler, you can restore the default:The Adaptive Batch Scheduler only applies to batch jobs. Streaming jobs use the default scheduler or the Adaptive Scheduler.
Automatically deciding parallelism
The Adaptive Batch Scheduler can determine the parallelism for each operator based on the size of its input data. This means operators closer to large data sources get higher parallelism, while operators processing filtered-down data use lower parallelism.Enabling auto-parallelism
Auto-parallelism is enabled by default:Usage rules
- The scheduler only auto-determines parallelism for operators where you have not explicitly set parallelism.
- If you call
operator.setParallelism(n), that operator usesnand is excluded from auto-tuning. - To get the best results across an entire pipeline, avoid setting parallelism on individual operators.
Dynamic source parallelism inference
Sources can implement theDynamicParallelismInference interface to help the scheduler decide their parallelism:
Context provides the upper bound for inferred parallelism, the expected average data size per task, and dynamic filtering information. If a source does not implement this interface, execution.batch.adaptive.auto-parallelism.default-source-parallelism is used.
Performance tuning
Automatic data distribution balancing
The scheduler automatically balances data across downstream subtasks, ensuring each subtask processes roughly the same data volume. This applies to point-wise connections (Rescale) and all-to-all connections (Hash, Rebalance, Custom). This feature requires auto-parallelism to be enabled and works without additional configuration.Automatic balancing does not fully resolve single-key hotspots: if one key contains far more data than all others, that key cannot be split across subtasks without breaking correctness. Use Adaptive Skewed Join Optimization for those cases.
Adaptive Broadcast Join
In batch joins, broadcasting the smaller table to all nodes can be significantly faster than shuffling both tables. The Adaptive Batch Scheduler supports runtime Broadcast Join decisions based on actual observed data sizes.Supported join types for broadcasting
| Join type | Left input can broadcast | Right input can broadcast |
|---|---|---|
| Inner | Yes | Yes |
| LeftOuter | No | Yes |
| RightOuter | Yes | No |
| FullOuter | No | No |
| Semi | No | Yes |
| Anti | No | Yes |
Adaptive Skewed Join Optimization
When certain keys appear much more frequently than others in a Join, some tasks end up processing far more data, slowing down the entire job. The Adaptive Skewed Join Optimization dynamically splits skewed partitions to distribute the load.Supported join types for skew splitting
| Join type | Left input can split | Right input can split |
|---|---|---|
| Inner | Yes | Yes |
| LeftOuter | Yes | No |
| RightOuter | No | Yes |
| FullOuter | No | No |
| Semi | Yes | No |
| Anti | Yes | No |
Limitations
- AdaptiveBatchScheduler only: All adaptive features require the Adaptive Batch Scheduler.
- Blocking shuffle modes only: Supported shuffle modes are
ALL_EXCHANGES_BLOCKING,ALL_EXCHANGES_HYBRID_FULL, andALL_EXCHANGES_HYBRID_SELECTIVE. - FileInputFormat sources not supported: Use the FileSystem DataStream Connector or FileSystem SQL Connector instead of legacy
readFile()/createInput(FileInputFormat, ...)APIs. - Web UI broadcast metrics discrepancy: For broadcast results, the byte/record counts on the Web UI may not match between upstream and downstream tasks. This is expected behavior.

