Flink supports three notions of time in streaming programs:
- Event time: The time embedded in the event itself (e.g., a log timestamp). Results are reproducible and correct even if events arrive out-of-order.
- Processing time: Wall-clock time on the machine processing the record. Fast but non-deterministic.
- Ingestion time: The time a record enters the Flink source. A middle ground that is monotonic but assigned at ingestion, not when the event occurred.
Event time is the preferred choice for most production pipelines because it produces correct results regardless of processing speed or event ordering.
WatermarkStrategy
A WatermarkStrategy bundles two concerns:
TimestampAssigner: Extracts the event-time timestamp from a record (a long in milliseconds since the Unix epoch).
WatermarkGenerator: Emits watermarks — special markers that tell operators “no event with timestamp < T will arrive in the future.”
You attach a WatermarkStrategy directly to a source (preferred) or after an arbitrary operator:
// Preferred: at the source, per-split watermarks work correctly
DataStream<MyEvent> stream = env.fromSource(
mySource,
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime()),
"my-source"
);
// After an operator: less accurate for multi-partition sources
DataStream<MyEvent> stream = rawStream
.filter(e -> e.isValid())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime())
);
Both timestamps and watermarks are expressed as milliseconds since 1970-01-01T00:00:00Z.
Built-in watermark strategies
Monotonously increasing timestamps
Use this when events in each parallel partition arrive in strictly ascending timestamp order (e.g., a single Kafka partition with monotonic event times). The current maximum timestamp is used directly as the watermark.
WatermarkStrategy.<MyEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getTimestamp());
Bounded out-of-orderness
Use this when events can arrive out-of-order, but only up to a known maximum lateness. The watermark lags behind the maximum observed timestamp by the specified duration.
// Assume events arrive at most 10 seconds late
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp());
A watermark W with this strategy means: “all events with timestamp < W - 10s have been seen.” Windows close when their end time exceeds the current watermark.
No watermarks
For sources where you don’t use event time (e.g., processing-time windows only):
WatermarkStrategy.noWatermarks();
Dealing with idle sources
If a source partition produces no events for a while, its watermark stops advancing. Because Flink takes the minimum watermark across all partitions, a single idle partition blocks all event-time progress.
Use withIdleness to mark idle partitions so they are excluded from the watermark minimum:
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
After a partition produces no events for 1 minute, it is marked idle and no longer holds back the watermark.
Watermark alignment
When parallel partitions produce watermarks at very different speeds, fast partitions force downstream operators to buffer large amounts of data waiting for slow partitions to catch up. Watermark alignment prevents any partition from advancing its watermark more than a given amount ahead of the slowest.
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withWatermarkAlignment(
"alignment-group-1", // group name — sources sharing a name are aligned
Duration.ofSeconds(20), // max drift from the minimum watermark in the group
Duration.ofSeconds(1) // how often to update the max watermark on the JobManager
);
Watermark alignment is supported only for FLIP-27 sources. It does not work with legacy SourceFunction-based sources or when applied via assignTimestampsAndWatermarks.
Writing custom watermark generators
When built-in strategies don’t fit your use case, implement WatermarkGenerator<T> directly.
Periodic generator
A periodic generator tracks timestamps in onEvent() and emits a watermark periodically via onPeriodicEmit(). The emission interval is controlled by env.getConfig().setAutoWatermarkInterval(ms) (default: 200 ms).
BoundedOutOfOrdernessGenerator.java
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3_500L; // 3.5 seconds in ms
private long currentMaxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
Wrap the generator in a WatermarkStrategy:
WatermarkStrategy.<MyEvent>of(new BoundedOutOfOrdernessGenerator())
.withTimestampAssigner((e, ts) -> e.getTimestamp());
Punctuated generator
A punctuated generator emits a watermark only when it sees special “marker” events. This gives per-record watermark control but can flood the system with watermarks if overused.
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.isWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// No periodic emission — we emit in onEvent()
}
}
Emitting a watermark for every single event degrades performance. Batch watermark emissions by only emitting when the timestamp advances significantly.
Windowing with event time
Event-time windows fire when the watermark passes the window’s end boundary. This means Flink can correctly handle out-of-order events as long as they arrive within the allowed lateness.
EventTimeWindowExample.java
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import java.time.Duration;
DataStream<SensorReading> readings = env.fromSource(
source,
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((r, ts) -> r.getTimestamp()),
"sensors"
);
// Tumbling 5-minute windows per sensor
DataStream<AggregatedReading> result = readings
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.aggregate(new AverageAggregator());
Allowed lateness
Elements that arrive after a window has fired (because their timestamp falls behind the watermark) are by default dropped. Set allowedLateness to keep the window open for late arrivals:
readings
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofSeconds(30)) // window stays alive 30s after firing
.aggregate(new AverageAggregator());
Late elements that arrive after the allowed lateness period can be routed to a side output:
OutputTag<SensorReading> lateTag = new OutputTag<SensorReading>("late-data") {};
SingleOutputStreamOperator<AggregatedReading> result = readings
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofSeconds(30))
.sideOutputLateData(lateTag)
.aggregate(new AverageAggregator());
DataStream<SensorReading> lateData = result.getSideOutput(lateTag);
Kafka per-partition watermarks
When consuming from Kafka, each partition may have its own event-time pattern. Flink’s Kafka connector generates per-partition watermarks internally and merges them:
KafkaSource<MyEvent> source = KafkaSource.<MyEvent>builder()
.setBootstrapServers("broker:9092")
.setTopics("events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new MyDeserializer()))
.build();
DataStream<MyEvent> stream = env.fromSource(
source,
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((e, ts) -> e.getEventTime()),
"kafka-source"
);
Per-partition watermark generation ensures that a slow Kafka partition does not block watermark progress beyond what is required — each partition independently tracks its watermark.
Setting the watermark emission interval
The interval at which periodic watermark generators call onPeriodicEmit is configured globally:
env.getConfig().setAutoWatermarkInterval(100); // emit watermarks every 100ms
Lower values reduce event-time latency but increase overhead. The default is 200 ms.