Skip to main content
The Beam Cookbook provides practical examples of common data processing patterns. Each example demonstrates a specific technique you can adapt for your own pipelines.

Overview

These examples show you how to:
  • Filter and transform data
  • Join multiple datasets
  • Combine values per key
  • Use side inputs for enrichment
  • Aggregate data with custom combiners
All cookbook examples use BigQuery as the data source, but the patterns apply to any I/O connector.

Filtering Data

The FilterExamples pipeline demonstrates multiple filtering techniques and shows how to use pipeline-computed values as side inputs.

Basic Filtering

Filter data based on field values:
static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
Integer monthFilter;

public FilterSingleMonthDataFn(Integer monthFilter) {
this.monthFilter = monthFilter;
}

@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
Integer month = (Integer) row.get("month");

if (month.equals(this.monthFilter)) {
  c.output(row);
}
}
}

// Use the filter
PCollection<TableRow> monthFilteredRows = rows
.apply(ParDo.of(new FilterSingleMonthDataFn(7)));
Pattern: Create a custom DoFn that only outputs elements matching your criteria.

Projection

Select specific fields from your data:
static class ProjectionFn extends DoFn<TableRow, TableRow> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    TableRow row = c.element();
    
    // Extract only the fields you need
    Integer year = Integer.parseInt((String) row.get("year"));
    Integer month = Integer.parseInt((String) row.get("month"));
    Integer day = Integer.parseInt((String) row.get("day"));
    Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
    
    // Create new row with selected fields
    TableRow outRow = new TableRow()
        .set("year", year)
        .set("month", month)
        .set("day", day)
        .set("mean_temp", meanTemp);
        
    c.output(outRow);
  }
}

PCollection<TableRow> projected = rows.apply(ParDo.of(new ProjectionFn()));
Pattern: Extract and transform only the fields you need to reduce data size and processing costs.

Side Inputs for Dynamic Filtering

Use computed values to filter data:
// Compute global mean temperature
PCollection<Double> meanTemps = rows.apply(ParDo.of(new ExtractTempFn()));
final PCollectionView<Double> globalMeanTemp = 
    meanTemps.apply(Mean.globally()).apply(View.asSingleton());

// Filter using the computed mean as a side input
PCollection<TableRow> filteredRows = monthFilteredRows.apply(
    "FilterBelowMean",
    ParDo.of(new DoFn<TableRow, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Double meanTemp = Double.parseDouble(
            c.element().get("mean_temp").toString());
        Double globalMean = c.sideInput(globalMeanTemp);
        
        if (meanTemp < globalMean) {
          c.output(c.element());
        }
      }
    }).withSideInputs(globalMeanTemp));
Key concepts:
  • Side inputs: Pass computed values to other transforms
  • View.asSingleton(): Convert a PCollection to a single value
  • Dynamic filtering: Use pipeline-computed values for filtering
1

Compute aggregate value

Calculate the global mean temperature across all data.
2

Create side input view

Convert the computed value to a PCollectionView for use as a side input.
3

Apply filter with side input

Use the computed mean to filter records in a downstream transform.

Joining Data

The JoinExamples pipeline demonstrates how to join two datasets using CoGroupByKey:
static PCollection<String> joinEvents(
    PCollection<TableRow> eventsTable, 
    PCollection<TableRow> countryCodes) {
  
  final TupleTag<String> eventInfoTag = new TupleTag<>();
  final TupleTag<String> countryInfoTag = new TupleTag<>();

  // Transform both collections to KV pairs with country code as key
  PCollection<KV<String, String>> eventInfo = 
      eventsTable.apply(ParDo.of(new ExtractEventDataFn()));
  PCollection<KV<String, String>> countryInfo = 
      countryCodes.apply(ParDo.of(new ExtractCountryInfoFn()));

  // Join using CoGroupByKey
  PCollection<KV<String, CoGbkResult>> joined = 
      KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
          .and(countryInfoTag, countryInfo)
          .apply(CoGroupByKey.create());

  // Process joined results
  PCollection<KV<String, String>> result = joined.apply(
      "ProcessJoin",
      ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          KV<String, CoGbkResult> e = c.element();
          String countryCode = e.getKey();
          String countryName = e.getValue().getOnly(countryInfoTag);
          
          // Process all events for this country
          for (String eventInfo : e.getValue().getAll(eventInfoTag)) {
            c.output(KV.of(countryCode, 
                "Country: " + countryName + ", Event: " + eventInfo));
          }
        }
      }));

  return result;
}

Extract Join Keys

Transform your data to key-value pairs:
static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    TableRow row = c.element();
    String countryCode = (String) row.get("ActionGeo_CountryCode");
    String eventInfo = buildEventInfo(row);
    c.output(KV.of(countryCode, eventInfo));
  }
}

static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    TableRow row = c.element();
    String countryCode = (String) row.get("FIPSCC");
    String countryName = (String) row.get("HumanName");
    c.output(KV.of(countryCode, countryName));
  }
}
1

Transform to KV pairs

Convert both datasets to PCollection<KV<K, V>> with the join key as K.
2

Apply CoGroupByKey

Group both collections by the common key using KeyedPCollectionTuple.
3

Process joined results

Access values from both collections using CoGbkResult.getOnly() or getAll().
Use getOnly() when each key has exactly one value:
String countryName = result.getValue().getOnly(countryInfoTag);

Combining Per Key

The CombinePerKeyExamples pipeline shows how to aggregate values for each unique key:
public static class ConcatWords 
    implements SerializableFunction<Iterable<String>, String> {
  @Override
  public String apply(Iterable<String> input) {
    StringBuilder result = new StringBuilder();
    for (String item : input) {
      if (!item.isEmpty()) {
        if (result.length() == 0) {
          result.append(item);
        } else {
          result.append(",").append(item);
        }
      }
    }
    return result.toString();
  }
}

// Extract words and play names
PCollection<KV<String, String>> words = 
    rows.apply(ParDo.of(new ExtractLargeWordsFn()));

// Combine all play names for each word
PCollection<KV<String, String>> wordAllPlays = 
    words.apply(Combine.perKey(new ConcatWords()));

Built-in Combiners

Beam provides common aggregation functions:
PCollection<KV<String, Integer>> totals = 
    data.apply(Sum.integersPerKey());

Custom Combine Functions

Create custom aggregation logic:
public static class AverageFn 
    extends Combine.CombineFn<Integer, AverageFn.Accum, Double> {
  
  public static class Accum {
    int sum = 0;
    int count = 0;
  }

  @Override
  public Accum createAccumulator() {
    return new Accum();
  }

  @Override
  public Accum addInput(Accum accum, Integer input) {
    accum.sum += input;
    accum.count++;
    return accum;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accums) {
    Accum merged = createAccumulator();
    for (Accum accum : accums) {
      merged.sum += accum.sum;
      merged.count += accum.count;
    }
    return merged;
  }

  @Override
  public Double extractOutput(Accum accum) {
    return ((double) accum.sum) / accum.count;
  }
}

// Use custom combiner
PCollection<KV<String, Double>> averages = 
    data.apply(Combine.perKey(new AverageFn()));
When to use custom combiners:
  • Computing statistics beyond built-in functions
  • Optimizing performance with incremental aggregation
  • Implementing domain-specific aggregation logic

Running Cookbook Examples

Prerequisites

1

Set up BigQuery

Create a BigQuery dataset for output tables.
2

Configure credentials

Set up authentication for your chosen runner.
3

Install dependencies

Ensure you have the Beam SDK and I/O connectors installed.

FilterExamples

mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.cookbook.FilterExamples \
  -Dexec.args="--output=YOUR_PROJECT:DATASET.TABLE \
               --monthFilter=7"

JoinExamples

mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.cookbook.JoinExamples \
  -Dexec.args="--output=./join-results"

CombinePerKeyExamples

mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.cookbook.CombinePerKeyExamples \
  -Dexec.args="--output=YOUR_PROJECT:DATASET.TABLE"

Pattern Comparison

Filtering

Use when: You need to reduce dataset size based on criteriaPerformance: Very efficient, happens in parallelExample: Keep only records from a specific time period

Projection

Use when: You only need specific fields from your dataPerformance: Reduces memory and I/O costsExample: Extract only user ID and timestamp from logs

Joining

Use when: Combining data from multiple sourcesPerformance: Can be expensive, requires shufflingExample: Enrich events with user profile data

Combining

Use when: Aggregating values per keyPerformance: Efficient with incremental combinersExample: Calculate total sales per customer

Best Practices

Apply filters and projections as early as possible in your pipeline to reduce the amount of data processed by downstream transforms.
// Good: Filter before expensive operations
PCollection<Record> filtered = input
    .apply("EarlyFilter", Filter.by(r -> r.isValid()))
    .apply("ExpensiveTransform", ParDo.of(new ExpensiveFn()));
For small reference data, consider using side inputs instead of CoGroupByKey for better performance.
// For small lookup tables
PCollectionView<Map<String, String>> countryMap = 
    countryCodes.apply(View.asMap());
    
PCollection<EnrichedEvent> enriched = events.apply(
    ParDo.of(new EnrichWithSideInput())
        .withSideInputs(countryMap));
Use built-in combiners when possible, as they’re optimized for performance. Only create custom combiners when you need specific logic.
// Built-in combiners are optimized
data.apply(Sum.integersPerKey())  // Preferred

// Custom combiner only when needed
data.apply(Combine.perKey(new MyComplexAggregation()))
Track important pipeline behavior with counters and distributions.
private final Counter filteredRecords = 
    Metrics.counter(MyDoFn.class, "filteredRecords");
    
@ProcessElement
public void processElement(ProcessContext c) {
  if (shouldFilter(c.element())) {
    filteredRecords.inc();
    return;
  }
  c.output(c.element());
}

Next Steps

WordCount Examples

Learn Beam fundamentals with the classic WordCount example.

Windowing

Process streaming data with time-based windows.

I/O Transforms

Connect to various data sources and sinks.

Advanced Examples

Explore streaming pipelines and real-time processing.

Testing Pipelines

Learn how to test your Beam pipelines.

Metrics & Monitoring

Add observability to your pipelines.

Build docs developers (and LLMs) love