Skip to main content
Windowing divides a PCollection’s elements into finite groups based on timestamps. This is essential for processing unbounded (streaming) data and useful for bounded (batch) data analysis.

What is Windowing?

From the Apache Beam source code:
/**
 * {@link Window} logically divides up or groups the elements of a {@link PCollection} 
 * into finite windows according to a {@link WindowFn}. The output of {@code Window} 
 * contains the same elements as input, but they have been logically assigned to windows.
 *
 * Windowing a {@link PCollection} divides the elements into windows based on the 
 * associated event time for each element. This is especially useful for PCollections 
 * with unbounded size, since it allows operating on a sub-group of the elements placed 
 * into a related window.
 *
 * Several predefined {@link WindowFn}s are provided:
 * - {@link FixedWindows} partitions the timestamps into fixed-width intervals.
 * - {@link SlidingWindows} places data into overlapping fixed-width intervals.
 * - {@link Sessions} groups data into sessions where each item in a window is 
 *   separated from the next by no more than a specified gap.
 */

Why Windowing?

Windowing enables you to:
  • Process infinite streams by dividing them into finite chunks
  • Perform time-based aggregations (e.g., hourly totals, daily averages)
  • Handle late-arriving data with configurable lateness policies
  • Analyze time-based patterns in your data
All PCollections have a windowing strategy. By default, elements are assigned to a single global window that spans the entire dataset.

Built-in Window Functions

Fixed Windows (Tumbling)

Divide time into non-overlapping, fixed-duration intervals:
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

// 5-minute fixed windows
PCollection<String> windowed = input.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));

// 1-hour windows
PCollection<String> hourly = input.apply(
    Window.<String>into(FixedWindows.of(Duration.standardHours(1))));
Use case: Calculating metrics over regular intervals (hourly sales, daily averages)
Time:  0:00  0:05  0:10  0:15  0:20  0:25
Window: [----][----][----][----][----][----]

Sliding Windows (Hopping)

Create overlapping windows with a specified period and duration:
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;

// 10-minute windows, starting every 5 minutes
PCollection<String> windowed = input.apply(
    Window.<String>into(
        SlidingWindows.of(Duration.standardMinutes(10))
            .every(Duration.standardMinutes(5))));
Use case: Moving averages, overlapping analytics
Time:     0:00  0:05  0:10  0:15  0:20
Window1:  [----------]
Window2:        [----------]
Window3:              [----------]
Window4:                    [----------]

Session Windows

Group events separated by a minimum gap duration:
import org.apache.beam.sdk.transforms.windowing.Sessions;

// Session windows with 10-minute gap
PCollection<String> sessions = input.apply(
    Window.<String>into(
        Sessions.withGapDuration(Duration.standardMinutes(10))));
Use case: User sessions, activity bursts, click streams
Events:   *    *  *              *    *
Session:  [--------]             [------]
          ^gap>10min^             ^gap<10min

Global Windows

The default window - all elements in a single window:
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;

// Explicitly specify global windows (usually not needed)
PCollection<String> global = input.apply(
    Window.<String>into(new GlobalWindows()));
Global windows are the default. You only need to specify them explicitly when changing from another windowing strategy.

Windowing Example: Hourly Traffic Count

Here’s a complete example from the Apache Beam examples:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

Pipeline p = Pipeline.create(options);

// Read streaming data
PCollection<String> input = p.apply(
    PubsubIO.readStrings().fromTopic("projects/myproject/topics/traffic"));

// Apply 1-hour fixed windows
PCollection<String> windowed = input.apply(
    Window.<String>into(FixedWindows.of(Duration.standardHours(1))));

// Count events per window
PCollection<KV<String, Long>> counts = windowed
    .apply("ExtractRoute", ParDo.of(new ExtractRouteFn()))
    .apply("CountPerRoute", Count.perElement());

// Write results
counts.apply("FormatResults", ParDo.of(new FormatResultsFn()))
      .apply(BigQueryIO.write()...);

p.run();

Accessing Window Information

You can access window metadata in your DoFns:
class AddWindowInfoFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(
            @Element String element,
            BoundedWindow window,
            OutputReceiver<String> out) {
        
        String windowInfo = String.format(
            "Element '%s' in window [%s..%s)",
            element,
            window.maxTimestamp().minus(window.maxTimestamp()),
            window.maxTimestamp());
        
        out.output(windowInfo);
    }
}

Advanced Windowing Configuration

Allowed Lateness

Specify how long to wait for late data after a window closes:
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.Window;

PCollection<String> windowed = input.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        .withAllowedLateness(Duration.standardMinutes(10))
        .discardingFiredPanes());
Late data arriving after the allowed lateness period is discarded. Set this value based on your data’s characteristics and latency requirements.

Timestamp Combiner

Control how timestamps are combined when elements are grouped:
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;

PCollection<KV<String, Iterable<Integer>>> grouped = pairs.apply(
    Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(5)))
        .withTimestampCombiner(TimestampCombiner.EARLIEST)
    .apply(GroupByKey.create()));
Options:
  • EARLIEST: Use the earliest timestamp
  • LATEST: Use the latest timestamp
  • END_OF_WINDOW: Use the end of the window

Windowing with Aggregations

Windowing is most powerful when combined with aggregations:
// Count events per 5-minute window
PCollection<Long> counts = input
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
    .apply(Count.globally().withoutDefaults());

// Sum values per key per hour
PCollection<KV<String, Integer>> sums = keyedValues
    .apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
    .apply(Sum.integersPerKey());

// Average per 10-minute sliding window
PCollection<Double> averages = numbers
    .apply(Window.into(
        SlidingWindows.of(Duration.standardMinutes(10))
            .every(Duration.standardMinutes(5))))
    .apply(Mean.globally());

Watermarks

Watermarks track progress in event time:
A watermark is a timestamp that indicates Beam believes all data with timestamps before this point has been processed. When the watermark passes the end of a window, the window is considered complete.
Event Time:  0:00  0:05  0:10  0:15  0:20
Watermark:   ------>--------------->
                   ^
             Elements with timestamps 
             before this are complete

Best Practices

Choose appropriate window sizes: Balance between latency (smaller windows = faster results) and efficiency (larger windows = less overhead).
Session windows for user activity: Use session windows to track user sessions, where activity bursts are separated by periods of inactivity.
Beware of data skew: Ensure your keys are well-distributed across windows to avoid hotspots.

Common Patterns

  1. Hourly aggregations: Use 1-hour fixed windows for reports
  2. Real-time dashboards: Use small fixed windows (1-5 minutes)
  3. Moving averages: Use sliding windows
  4. User sessions: Use session windows with appropriate gap
  5. Daily summaries: Use 1-day fixed windows

Complete Example: Session Analysis

Pipeline p = Pipeline.create(options);

// Read click events
PCollection<ClickEvent> clicks = p.apply(
    PubsubIO.read(ClickEvent.class).fromTopic("clicks"));

// Group into user sessions (30-minute inactivity gap)
PCollection<KV<String, ClickEvent>> userClicks = clicks.apply(
    ParDo.of(new ExtractUserIdFn()));

PCollection<KV<String, Iterable<ClickEvent>>> sessions = userClicks
    .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(30))))
    .apply(GroupByKey.create());

// Calculate session metrics
PCollection<SessionMetrics> metrics = sessions.apply(
    ParDo.of(new ComputeSessionMetricsFn()));

metrics.apply(BigQueryIO.write()...);
p.run();

Next Steps

Triggers

Control when windowed results are emitted

Transforms

Apply transformations to windowed data

Build docs developers (and LLMs) love