Skip to main content
Operators transform one or more DataStream instances into new ones. You chain them together to build your processing pipeline. Flink evaluates the entire chain lazily when you call env.execute().

Basic transformations

map

Transforms each element one-to-one and returns a new element, potentially of a different type.
DataStream<Integer> numbers = env.fromData(1, 2, 3, 4, 5);

DataStream<String> strings = numbers.map(n -> "value-" + n);

// With an anonymous class for complex logic
DataStream<Integer> doubled = numbers.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) {
        return value * 2;
    }
});

flatMap

Transforms each element into zero, one, or more output elements. Use a Collector to emit them.
DataStream<String> sentences = env.fromData("hello world", "apache flink rocks");

DataStream<String> words = sentences.flatMap(
    (String sentence, Collector<String> out) -> {
        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }
).returns(String.class);

filter

Keeps only elements for which the function returns true.
DataStream<Integer> numbers = env.fromData(1, 2, 3, 4, 5, 6);

DataStream<Integer> evens = numbers.filter(n -> n % 2 == 0);
// result: 2, 4, 6

Keyed streams

keyBy

Partitions a stream by key. All elements with the same key are routed to the same parallel task. keyBy returns a KeyedStream, which is required for keyed state and keyed operators like reduce and window.
DataStream<Tuple2<String, Integer>> wordCounts = ...;

KeyedStream<Tuple2<String, Integer>, String> keyed =
    wordCounts.keyBy(tuple -> tuple.f0);
A type cannot serve as a key if:
  1. It is a POJO that does not override hashCode() and relies on Object.hashCode().
  2. It is an array.

reduce

Applies a rolling reduce on a KeyedStream. Combines the current element with the previously reduced value and emits the new value after each element.
KeyedStream<Tuple2<String, Integer>, String> keyed = wordCounts.keyBy(t -> t.f0);

DataStream<Tuple2<String, Integer>> summedCounts = keyed.reduce(
    (a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1)
);
In BATCH mode, reduce emits only the final accumulated value per key. In STREAMING mode, it emits an updated value for each incoming element.

Windowing

window

Groups elements of a KeyedStream into windows. Windows collect elements and trigger computation when a condition is met (time elapsed, element count reached, etc.).
KeyedStream<SensorReading, String> byId = readings.keyBy(r -> r.sensorId);

// Tumbling 5-second event-time windows
WindowedStream<SensorReading, String, TimeWindow> windowed =
    byId.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)));

// Apply an aggregate
DataStream<Double> avgTemp = windowed.aggregate(new AverageAggregator());
Common window assigners:
AssignerDescription
TumblingEventTimeWindows.of(size)Non-overlapping fixed-size event-time windows
TumblingProcessingTimeWindows.of(size)Non-overlapping fixed-size processing-time windows
SlidingEventTimeWindows.of(size, slide)Overlapping sliding windows
EventTimeSessionWindows.withGap(gap)Session windows, closed after a period of inactivity
GlobalWindows.create()A single window containing all elements (requires custom trigger)

windowAll

Opens windows on a non-keyed DataStream. All records are collected into a single task — this is non-parallel. Use it only for small-volume aggregations:
DataStream<Tuple2<String, Integer>> top10 = stream
    .windowAll(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
    .apply(new Top10WindowFunction());

Window functions

After defining a window, you apply a function to the contents:
// Runs incrementally; holds only the current aggregate in state
windowedStream.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));

Stream merging and joining

union

Merges two or more streams of the same type into one. Elements from all streams interleave:
DataStream<Event> stream1 = ...;
DataStream<Event> stream2 = ...;
DataStream<Event> stream3 = ...;

DataStream<Event> merged = stream1.union(stream2, stream3);

connect

Connects two streams of potentially different types. The streams share state but are processed by separate map1/map2 (or flatMap1/flatMap2) methods:
DataStream<Integer> numbers = env.fromData(1, 2, 3);
DataStream<String> labels  = env.fromData("a", "b", "c");

ConnectedStreams<Integer, String> connected = numbers.connect(labels);

DataStream<String> result = connected.map(new CoMapFunction<Integer, String, String>() {
    @Override
    public String map1(Integer value) {
        return "number:" + value;
    }

    @Override
    public String map2(String value) {
        return "label:" + value;
    }
});

Window join

Joins two streams within a time window on a common key:
stream1.join(stream2)
    .where(e -> e.id)
    .equalTo(e -> e.id)
    .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
    .apply((left, right) -> new JoinedEvent(left, right));

Interval join

Joins keyed stream elements where timestamps fall within a relative time range:
// Join elements where right.timestamp ∈ [left.timestamp - 2ms, left.timestamp + 2ms]
keyedStream1.intervalJoin(keyedStream2)
    .between(Duration.ofMillis(-2), Duration.ofMillis(2))
    .process(new ProcessJoinFunction<Left, Right, Output>() {
        @Override
        public void processElement(Left l, Right r, Context ctx, Collector<Output> out) {
            out.collect(new Output(l, r));
        }
    });

ProcessFunction

ProcessFunction gives you access to low-level runtime features: element timestamps, timers (event-time and processing-time), and side outputs. It is the most powerful but most verbose operator. Apply it to a keyed stream to access keyed state and register timers:
ProcessFunctionExample.java
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class AlertOnInactivity extends KeyedProcessFunction<String, Event, Alert> {

    private ValueState<Long> lastSeen;

    @Override
    public void open(OpenContext ctx) {
        lastSeen = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-seen", Long.class)
        );
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {
        lastSeen.update(ctx.timestamp());

        // Register a timer 5 minutes from now
        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 300_000L);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        Long ts = lastSeen.value();
        // If no new event arrived in the last 5 minutes, emit an alert
        if (ts != null && ts + 300_000L <= timestamp) {
            out.collect(new Alert(ctx.getCurrentKey(), "inactive for 5 minutes"));
            lastSeen.clear();
        }
    }
}

stream.keyBy(Event::getUserId)
      .process(new AlertOnInactivity())
      .print();

Physical partitioning

These operators control how data is distributed across parallel tasks without changing the stream’s type:
// Redistribute evenly round-robin
stream.rebalance();

// Random redistribution
stream.shuffle();

// Send all records to every downstream subtask
stream.broadcast();

// Redistribute round-robin to a subset of downstream tasks
stream.rescale();

// Custom partitioner
stream.partitionCustom(
    (key, numPartitions) -> key.hashCode() % numPartitions,
    event -> event.getCategory()
);

Operator chaining

Flink automatically chains operators that have a 1-to-1 connection pattern (e.g., consecutive map calls). Chained operators run in the same thread and pass records directly without network serialization. To control chaining manually:
// Start a new chain at this operator — breaks the chain before it
stream.map(f1).startNewChain().map(f2);

// Disable chaining for a specific operator
stream.map(f1).disableChaining();

// Disable chaining for the entire job
env.disableOperatorChaining();

Slot sharing groups

By default, all operators share slots. Use slot sharing groups to isolate operators into dedicated slots:
stream.filter(heavyFilter)
      .slotSharingGroup("heavy-computation");

Naming operators

Assign names and descriptions to operators for easier debugging in the Flink Web UI:
stream
    .filter(e -> e.isValid()).name("filter-invalid")
    .map(e -> e.enrich()).name("enrich")
    .setDescription("Adds geo and user metadata to each event");

Build docs developers (and LLMs) love