Skip to main content
This page demonstrates advanced streaming patterns in Apache Beam, including windowing strategies, trigger mechanisms, and handling late data.

Streaming Word Count

A basic streaming pipeline that reads from PubSub and processes data in fixed windows.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import window

def run_streaming_wordcount(input_topic, output_topic):
    pipeline_options = PipelineOptions(streaming=True)
    pipeline_options.view_as(StandardOptions).streaming = True
    
    with beam.Pipeline(options=pipeline_options) as p:
        # Read from PubSub
        messages = (
            p
            | 'Read' >> beam.io.ReadFromPubSub(
                topic=input_topic
            ).with_output_types(bytes)
        )
        
        lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
        
        # Apply windowing and count words
        counts = (
            lines
            | 'ExtractWords' >> beam.FlatMap(lambda x: x.split())
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'Window' >> beam.WindowInto(window.FixedWindows(15))
            | 'GroupByKey' >> beam.GroupByKey()
            | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
        )
        
        # Format and write results
        output = (
            counts
            | 'Format' >> beam.Map(lambda wc: f'{wc[0]}: {wc[1]}'.encode('utf-8'))
            | 'Write' >> beam.io.WriteToPubSub(output_topic)
        )
Key Concepts:
  • Set streaming=True in pipeline options
  • Use WindowInto with FixedWindows for 15-second windows
  • Data is processed continuously as it arrives

Advanced Windowing with Triggers

Demonstrating different trigger types for controlling when results are emitted.

Default Trigger (Watermark-based)

The default trigger fires when the watermark passes the end of the window.
import apache_beam as beam
from apache_beam.transforms import window, trigger
from joda.time import Duration

# Default trigger - fires once when watermark passes window end
default_windowed = (
    data
    | 'FixedWindows' >> beam.WindowInto(
        window.FixedWindows(Duration.standardMinutes(30)),
        trigger=trigger.AfterWatermark(),
        allowed_lateness=Duration.ZERO,
        accumulation_mode=trigger.AccumulationMode.DISCARDING
    )
)
Behavior:
  • Fires once when the watermark passes the window end
  • Produces ON_TIME results
  • Late data is dropped (zero allowed lateness)

Handling Late Data

Allow late data processing with allowed lateness.
from apache_beam.transforms import window, trigger

# Allow late data for up to 1 day
with_late_data = (
    data
    | 'WindowWithLateness' >> beam.WindowInto(
        window.FixedWindows(30 * 60),  # 30 minutes
        trigger=trigger.AfterWatermark(),
        allowed_lateness=24 * 60 * 60,  # 1 day
        accumulation_mode=trigger.AccumulationMode.DISCARDING
    )
    | 'CountPerWindow' >> beam.CombinePerKey(sum)
)
Key Points:
  • Windows stay open for 1 day after watermark passes
  • Each late element triggers a new pane (LATE timing)
  • Use DISCARDING mode to get incremental updates

Speculative Results (Early Firings)

Get early approximations before all data arrives.
from apache_beam.transforms import window, trigger

# Fire early results every minute
speculative = (
    data
    | 'SpeculativeWindow' >> beam.WindowInto(
        window.FixedWindows(30 * 60),
        trigger=trigger.Repeatedly(
            trigger.AfterProcessingTime(60)  # Every 1 minute
        ),
        allowed_lateness=24 * 60 * 60,
        accumulation_mode=trigger.AccumulationMode.ACCUMULATING
    )
    | 'Aggregate' >> beam.CombinePerKey(sum)
)
Use Case:
  • Get quick approximations for dashboards
  • Progressive refinement of results
  • All panes are marked EARLY (no watermark dependency)

Combined Trigger Strategy

Combine early firings, on-time results, and late data handling.
from apache_beam.transforms import window, trigger

# Complete trigger strategy
combined = (
    data
    | 'CombinedTrigger' >> beam.WindowInto(
        window.FixedWindows(30 * 60),
        trigger=trigger.AfterAll(
            trigger.Repeatedly(
                trigger.AfterProcessingTime(60)  # Early: every 1 min
            ),
            trigger.AfterWatermark(
                late=trigger.AfterProcessingTime(5 * 60)  # Late: every 5 min
            )
        ),
        allowed_lateness=24 * 60 * 60,
        accumulation_mode=trigger.AccumulationMode.ACCUMULATING
    )
)
Timeline:
  1. EARLY panes: Every 1 minute before window closes
  2. ON_TIME pane: When watermark passes window end
  3. LATE panes: Every 5 minutes after window closes

Windowing with Timestamps

Access window information in your pipeline for metadata enrichment.
import apache_beam as beam
from apache_beam.transforms import window

class FormatWithWindow(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        """Add window start/end times to output."""
        word, count = element
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        
        yield {
            'word': word,
            'count': count,
            'window_start': window_start.isoformat(),
            'window_end': window_end.isoformat()
        }

# Apply in pipeline
results = (
    windowed_counts
    | 'AddWindowInfo' >> beam.ParDo(FormatWithWindow())
)

Session Windows

Group events based on activity sessions with gaps of inactivity.
from apache_beam.transforms import window

# Create session windows with 10-minute gaps
sessions = (
    events
    | 'SessionWindows' >> beam.WindowInto(
        window.Sessions(10 * 60)  # 10-minute gap duration
    )
    | 'CountPerSession' >> beam.combiners.Count.PerElement()
)
Use Cases:
  • User session analytics
  • Detecting periods of activity
  • Grouping related events

Sliding Windows

Create overlapping windows for moving averages and continuous analysis.
from apache_beam.transforms import window

# 1-hour windows, sliding every 5 minutes
sliding = (
    metrics
    | 'SlidingWindows' >> beam.WindowInto(
        window.SlidingWindows(
            size=60 * 60,      # 1 hour window
            period=5 * 60      # Slide every 5 minutes
        )
    )
    | 'ComputeAverage' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
)

Best Practices

Choose Appropriate Windows

  • Fixed windows for regular intervals
  • Session windows for user activity
  • Sliding windows for moving calculations

Configure Allowed Lateness

  • Balance completeness vs. resource usage
  • Consider your data’s lateness characteristics
  • Use watermark estimators for better accuracy

Select Accumulation Mode

  • DISCARDING for independent updates
  • ACCUMULATING for cumulative results
  • Consider storage and computation trade-offs

Monitor Watermarks

  • Track watermark lag in production
  • Adjust allowed lateness based on metrics
  • Use custom watermark estimators if needed

Common Patterns

Traffic Analysis Example

Based on examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java:160-337
// Process traffic sensor data with multiple trigger strategies
PCollection<KV<String, Integer>> flowInfo = /* input data */;

// 1. Default trigger - watermark only
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes());

// 2. With allowed lateness - capture late data
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

// 3. Speculative - early approximations
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(1))))
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

// 4. Sequential - early, on-time, and late
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(AfterEach.inOrder(
            Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(1)))
                .orFinally(AfterWatermark.pastEndOfWindow()),
            Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(5)))))
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

Windowing Guide

Learn windowing fundamentals

Triggers Guide

Deep dive into trigger mechanisms

Watermarks

Understanding watermarks and event time

Streaming I/O

Streaming sources and sinks

Build docs developers (and LLMs) love