Skip to main content
Timely stream processing is an extension of stateful stream processing in which time plays a role in the computation. This is the case when you do time series analysis, aggregations over time periods (windows), or event processing where the moment an event occurred matters — not just when it was processed.

Notions of time

Flink supports three distinct notions of time in streaming programs:
Event time is the time at which each individual event occurred on its producing device. This timestamp is typically embedded in the event record before it enters Flink.With event time, the progress of time depends on the data, not on any wall clock. This makes event time processing deterministic and consistent:
  • An hourly event-time window always contains exactly the events whose timestamps fall in that hour — regardless of when they arrive or are processed
  • Reprocessing historical data produces the same results as live processing
  • Out-of-order events are handled correctly (up to a configurable lateness bound)
Event time programs must specify how to generate watermarks, which signal progress in event time. See Watermarks below.Best for: production pipelines where correctness matters more than latency, reprocessing historical data, and any scenario where events can arrive out of order.
Processing time refers to the wall-clock time of the machine executing the operation — the system clock at the moment a record is processed.A one-hour processing-time window includes all records that arrived at an operator within a real wall-clock hour. For example, if your application starts at 9:15 AM, the first window covers 9:15–10:00, the next covers 10:00–11:00, and so on.Processing time has the lowest latency and requires no coordination between streams and machines. However, it is non-deterministic: results depend on how fast records arrive and how quickly operators process them. Replaying the same data at a different time produces different window assignments.Best for: applications where approximate results and minimal latency are more important than correctness, such as real-time dashboards that don’t need exact windowing.
Ingestion time is the time at which events enter Flink at the source. Each source operator assigns the current wall-clock time as the event timestamp when a record is ingested.Ingestion time sits between event time and processing time:
  • More stable than processing time (timestamps are assigned once at the source, not per-operator)
  • Cannot handle out-of-order events that occurred before ingestion
  • Useful when events have no reliable timestamps but you still want watermark-based windowing
In practice, most production use cases choose either event time or processing time. Ingestion time is increasingly rare.

Event time and watermarks

A stream processor that supports event time needs a way to measure progress in event time — not just in wall-clock time. Without this, a window operator building hourly windows would never know when to close a window and emit results. Flink’s mechanism for measuring event-time progress is watermarks.

What is a watermark?

A watermark is a special marker that flows as part of the data stream, carrying a timestamp t. A Watermark(t) is a declaration that all events with a timestamp ≤ t have already arrived. In other words, no event older than t should appear after this point in the stream. When a window operator receives Watermark(t), it can safely close all windows whose end time ≤ t and emit their results.
In-order stream:

  e(1)  e(2)  e(3)  W(3)  e(4)  e(5)  W(5)  e(6) ...
                     │                  │
              Watermark at t=3   Watermark at t=5
              (all events ≤3      (all events ≤5
               have arrived)       have arrived)

Watermarks for out-of-order streams

In real-world systems, events routinely arrive out of order due to network delays, retries, or distributed producers. Watermarks handle this by including a lateness bound:
Out-of-order stream:

  e(4)  e(2)  e(7)  e(3)  e(5)  W(4)  e(6)  e(8)  W(7) ...
                                  │                   │
                           W(4): events up to t=4    W(7): events up to t=7
                           assumed complete           assumed complete
                           (even though e(4)         
                           arrived before e(2))
A common strategy is to emit watermarks that lag behind the maximum observed timestamp by a fixed amount (the out-of-order bound). For example, if the maximum seen event timestamp is t_max, a Watermark(t_max - 5s) says “events up to 5 seconds ago are complete.”

Watermarks in parallel streams

In a parallel execution, each source subtask generates watermarks independently. As watermarks flow downstream, each operator tracks the minimum watermark across all its input channels as its current event time.
  Source subtask 1: ... W(10) ... W(15) ...

                         Operator: event time = min(10, 8) = 8

  Source subtask 2: ... W(8)  ... W(12) ...
This minimum semantics ensures that an operator only advances its event time when all upstream sources have advanced past that point.

WatermarkStrategy

You configure watermark generation when reading from a source using WatermarkStrategy. A WatermarkStrategy combines:
  • A TimestampAssigner: extracts the event timestamp from each record
  • A WatermarkGenerator: decides when and how to emit watermarks
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;

DataStream<MyEvent> stream = env
    .fromSource(
        mySource,
        WatermarkStrategy
            .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(
                (event, recordTimestamp) -> event.getEventTimestamp()
            ),
        "my-source"
    );

Built-in watermark strategies

Assumes events can arrive out of order, but no later than a fixed maximum delay. Emits watermarks as max_timestamp - max_out_of_orderness.
WatermarkStrategy
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.getTimestamp());
Use this when your events have a known maximum delay and you want to wait for late arrivals before closing windows.
Assumes events arrive in strictly ascending timestamp order (no out-of-order events). Emits a watermark equal to the maximum observed timestamp.
WatermarkStrategy
    .<MyEvent>forMonotonousTimestamps()
    .withTimestampAssigner((event, ts) -> event.getTimestamp());
Use this for sources where ordering is guaranteed — for example, a single Kafka partition consumed in order.
Disables watermark generation. Use this for processing-time programs where watermarks are not needed.
WatermarkStrategy.noWatermarks();
For more control, implement the WatermarkGenerator interface:
public class CustomWatermarkGenerator
        implements WatermarkGenerator<MyEvent> {

    private long maxTimestamp = Long.MIN_VALUE + 1;
    private final long maxOutOfOrderness;

    public CustomWatermarkGenerator(Duration maxOutOfOrderness) {
        this.maxOutOfOrderness = maxOutOfOrderness.toMillis();
    }

    @Override
    public void onEvent(
            MyEvent event,
            long eventTimestamp,
            WatermarkOutput output) {
        // Track the maximum observed event timestamp
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Emit a watermark lagging behind max by the out-of-orderness bound
        output.emitWatermark(
            new Watermark(maxTimestamp - maxOutOfOrderness - 1)
        );
    }
}

// Register the custom strategy
WatermarkStrategy
    .<MyEvent>forGenerator(
        ctx -> new CustomWatermarkGenerator(Duration.ofSeconds(10))
    )
    .withTimestampAssigner((event, ts) -> event.getTimestamp());
You can also use withIdleness(Duration) to handle idle source partitions. If a source partition emits no records for the specified duration, Flink advances the watermark past it so other partitions don’t stall progress.
WatermarkStrategy
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.getTimestamp())
    .withIdleness(Duration.ofMinutes(1));

Lateness

Even with a bounded-out-of-orderness watermark strategy, some events may arrive later than the watermark allows. These are called late elements — events whose timestamp is less than or equal to the current watermark when they arrive. By default, late elements are dropped after a window fires. To handle them explicitly, use allowedLateness:
DataStream<MyEvent> result = stream
    .keyBy(MyEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .allowedLateness(Duration.ofMinutes(5))  // keep window state for 5 more minutes
    .sum("value");
With allowedLateness, Flink keeps the window state alive for an additional period after the watermark passes the window’s end. Late elements that arrive within this grace period trigger a recomputation of the window result.
Setting a large allowedLateness value or a large out-of-orderness bound increases result correctness but also increases state size (window state is kept longer) and output latency (windows fire later).

Windowing

Aggregations on streams (counts, sums, averages) work differently than in batch processing. Because a stream is potentially infinite, you cannot aggregate over the entire stream. Instead, you scope aggregations to windows — finite chunks of the stream. Windows are defined by an assignor (which events belong to which window) and a trigger (when to evaluate the window).

Window types

Fixed-size, non-overlapping windows. Each event belongs to exactly one window.
// Tumbling event-time window of 1 hour
stream
    .keyBy(MyEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .sum("value");

// Tumbling processing-time window of 30 seconds
stream
    .keyBy(MyEvent::getUserId)
    .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(30)))
    .count();
Fixed-size windows that overlap. An event may belong to multiple windows. Defined by a window size and a slide interval.
// Sliding event-time window: 1-hour size, slides every 15 minutes
// Each event belongs to 4 windows
stream
    .keyBy(MyEvent::getUserId)
    .window(SlidingEventTimeWindows.of(
        Duration.ofHours(1),    // window size
        Duration.ofMinutes(15)  // slide interval
    ))
    .sum("value");
Dynamic windows defined by a gap of inactivity. A new window starts whenever there is a gap between events larger than the session timeout. Session windows do not have a fixed size.
// Session window: new window after 30 minutes of inactivity
stream
    .keyBy(MyEvent::getUserId)
    .window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
    .sum("value");
Data-driven windows that trigger after a fixed number of elements, regardless of time.
// Tumbling count window: fires every 100 elements
stream
    .keyBy(MyEvent::getUserId)
    .countWindow(100)
    .sum("value");

// Sliding count window: size 100, slide 10
stream
    .keyBy(MyEvent::getUserId)
    .countWindow(100, 10)
    .sum("value");
All events with the same key go into a single, unbounded window. You must provide a custom trigger — otherwise the window never fires.
stream
    .keyBy(MyEvent::getUserId)
    .window(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountTrigger.of(100)))
    .sum("value");

How time affects windows

The choice between event time and processing time has a fundamental impact on window behavior:
AspectEvent timeProcessing time
Window boundariesBased on event timestampsBased on wall-clock time
Out-of-order handlingCorrect (with watermarks)Not applicable
DeterminismDeterministic for same dataNon-deterministic
ReprocessingSame resultsDifferent results
LatencyHigher (waits for watermark)Lower
Requires timestampsYesNo
With event time, a window fires when a watermark advances past the window’s end time. With processing time, a window fires when the system clock advances past the window’s end time.
Even in event-time programs, some operators may use processing time internally to ensure progress. For example, if no events arrive for a long time, a processing-time timer can fire a window based on inactivity rather than waiting indefinitely for a watermark.

Build docs developers (and LLMs) love