Skip to main content
The DataStream API supports two runtime execution modes: STREAMING and BATCH. A third mode, AUTOMATIC, lets Flink choose based on whether the job’s sources are bounded.

When to use each mode

STREAMING is the default and the only mode for unbounded jobs. It processes records as soon as they arrive, maintains running state, and uses watermarks to reason about event time. Use it for continuous pipelines that run indefinitely. BATCH is an optimization for bounded jobs — jobs where you know all input up front. It applies strategies borrowed from traditional batch frameworks: sequential task scheduling, blocking network shuffles, and sort-based state management. The final output of a BATCH job equals what STREAMING would produce, but resource usage and failure recovery are more efficient.
You cannot use BATCH mode with unbounded sources. If any source is unbounded, the job must run in STREAMING mode.
AUTOMATIC inspects all sources at submission time and selects BATCH if every source is bounded, otherwise STREAMING.

Configuring execution mode

You can set the mode via the command line or in code. The command-line approach is preferred because it keeps application code configuration-free and lets you reuse the same JAR in both modes.
bin/flink run -Dexecution.runtime-mode=BATCH my-job.jar
bin/flink run -Dexecution.runtime-mode=STREAMING my-job.jar
bin/flink run -Dexecution.runtime-mode=AUTOMATIC my-job.jar

Behavioral differences

Task scheduling and network shuffle

In STREAMING mode, all tasks must be online simultaneously. Records flow directly from upstream tasks to downstream tasks through pipelined (in-memory) channels. This enables low-latency processing of continuous data. In BATCH mode, Flink breaks the job into stages separated by shuffle boundaries (such as keyBy() or rebalance()). Each stage runs completely before the next begins. Intermediate results are materialized to non-ephemeral storage so they can be read after upstream tasks finish. For a job structured like this:
source.name("source")
    .map(...).name("map1")
    .map(...).name("map2")
    .rebalance()
    .map(...).name("map3")
    .keyBy(value -> value)
    .map(...).name("map4")
    .sinkTo(...);
Flink creates three stages in BATCH mode:
StageOperators
1source, map1, map2
2map3
3map4, sink
Stages 1 and 2 are separated by rebalance(), stages 2 and 3 by keyBy(). Stage 1 completes before Stage 2 starts.

State management

In STREAMING mode, state is stored in the configured state backend (HashMapStateBackend or EmbeddableRocksDBStateBackend). Checkpoints persist state to durable storage. In BATCH mode, the state backend configuration is ignored. Flink groups records by key using an external sort and processes all records for a single key before moving to the next. Only one key’s state is live in memory at a time, which dramatically reduces memory requirements for large keyed state.

Order of processing

STREAMING mode makes no ordering guarantees. Records are processed as they arrive. BATCH mode guarantees specific ordering when mixing input types in operators with multiple inputs:
  1. Broadcast inputs are processed first.
  2. Regular (non-keyed) inputs are processed second.
  3. Keyed inputs are processed last, with all records for a single key fully processed before moving to the next key.

Event time and watermarks

In STREAMING mode, Flink uses watermarks as a heuristic for event-time progress. Because events arrive out-of-order, the system cannot know when it has seen all events for a given time period. In BATCH mode, the entire input is available upfront. Flink treats this as “perfect watermarks” — it processes records in event-time order and fires all timers at the end of input. WatermarkAssigner and WatermarkGenerator implementations are ignored, but the TimestampAssigner portion of a WatermarkStrategy still runs to assign timestamps to records.

Processing time

In STREAMING mode, processing time is wall-clock time, and processing-time timers fire at the wall-clock time they were scheduled for. In BATCH mode, processing time does not advance during job execution. All processing-time timers fire at the end of input, as if time was fast-forwarded to infinity once all records have been processed.

Failure recovery

In STREAMING mode, Flink restores from the most recent checkpoint. All running tasks restart from that checkpoint’s state. In BATCH mode, checkpointing is disabled. Instead, Flink uses the materialized intermediate results as recovery points. Only the failed stage and its predecessors are restarted, not the entire job. This is more efficient than checkpoint-based recovery for bounded workloads.

Important limitations in BATCH mode

The following features do not work in BATCH mode:
  • Checkpointing and anything that depends on it (Kafka exactly-once, OnCheckpointRollingPolicy in FileSink)
  • CheckpointListener
  • Rolling aggregations in STREAMING style: reduce() and sum() emit only the final result, not incremental updates
If you need a transactional sink in BATCH mode, use a sink that implements the Unified Sink API (FLIP-143) rather than a legacy SinkFunction with two-phase commit.

Example: reading a bounded file in BATCH mode

BatchWordCount.java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        FileSource<String> source = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///data/words.txt"))
            .build();

        DataStream<Tuple2<String, Integer>> wordCounts = env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "words")
            .flatMap(new Tokenizer())
            .keyBy(t -> t.f0)
            .sum(1);

        wordCounts.print();

        env.execute("Batch Word Count");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split("\\s+")) {
                if (!word.isEmpty()) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}
When running this job in BATCH mode, sum() emits a single final count per word rather than a running total. In STREAMING mode the same code would emit an updated count every time a word is seen.

Writing custom operators for BATCH mode

Custom operators must not assume that watermarks are monotonically increasing across keys. In BATCH mode, the watermark resets to MIN_VALUE between keys. Do not cache the last seen watermark in an operator field and assume it will only grow. Timers fire in key order first, then in timestamp order within each key. Operations that manually change the current key are not supported in BATCH mode.

Build docs developers (and LLMs) love