The Beam Cookbook provides practical examples of common data processing patterns. Each example demonstrates a specific technique you can adapt for your own pipelines.
// Compute global mean temperaturePCollection<Double> meanTemps = rows.apply(ParDo.of(new ExtractTempFn()));final PCollectionView<Double> globalMeanTemp = meanTemps.apply(Mean.globally()).apply(View.asSingleton());// Filter using the computed mean as a side inputPCollection<TableRow> filteredRows = monthFilteredRows.apply( "FilterBelowMean", ParDo.of(new DoFn<TableRow, TableRow>() { @ProcessElement public void processElement(ProcessContext c) { Double meanTemp = Double.parseDouble( c.element().get("mean_temp").toString()); Double globalMean = c.sideInput(globalMeanTemp); if (meanTemp < globalMean) { c.output(c.element()); } } }).withSideInputs(globalMeanTemp));
Key concepts:
Side inputs: Pass computed values to other transforms
View.asSingleton(): Convert a PCollection to a single value
Dynamic filtering: Use pipeline-computed values for filtering
1
Compute aggregate value
Calculate the global mean temperature across all data.
2
Create side input view
Convert the computed value to a PCollectionView for use as a side input.
3
Apply filter with side input
Use the computed mean to filter records in a downstream transform.
The CombinePerKeyExamples pipeline shows how to aggregate values for each unique key:
public static class ConcatWords implements SerializableFunction<Iterable<String>, String> { @Override public String apply(Iterable<String> input) { StringBuilder result = new StringBuilder(); for (String item : input) { if (!item.isEmpty()) { if (result.length() == 0) { result.append(item); } else { result.append(",").append(item); } } } return result.toString(); }}// Extract words and play namesPCollection<KV<String, String>> words = rows.apply(ParDo.of(new ExtractLargeWordsFn()));// Combine all play names for each wordPCollection<KV<String, String>> wordAllPlays = words.apply(Combine.perKey(new ConcatWords()));
Use when: You need to reduce dataset size based on criteriaPerformance: Very efficient, happens in parallelExample: Keep only records from a specific time period
Projection
Use when: You only need specific fields from your dataPerformance: Reduces memory and I/O costsExample: Extract only user ID and timestamp from logs
Joining
Use when: Combining data from multiple sourcesPerformance: Can be expensive, requires shufflingExample: Enrich events with user profile data
Combining
Use when: Aggregating values per keyPerformance: Efficient with incremental combinersExample: Calculate total sales per customer
For small reference data, consider using side inputs instead of CoGroupByKey for better performance.
// For small lookup tablesPCollectionView<Map<String, String>> countryMap = countryCodes.apply(View.asMap());PCollection<EnrichedEvent> enriched = events.apply( ParDo.of(new EnrichWithSideInput()) .withSideInputs(countryMap));
Choose the Right Combiner
Use built-in combiners when possible, as they’re optimized for performance. Only create custom combiners when you need specific logic.
// Built-in combiners are optimizeddata.apply(Sum.integersPerKey()) // Preferred// Custom combiner only when neededdata.apply(Combine.perKey(new MyComplexAggregation()))
Add Metrics for Monitoring
Track important pipeline behavior with counters and distributions.
private final Counter filteredRecords = Metrics.counter(MyDoFn.class, "filteredRecords");@ProcessElementpublic void processElement(ProcessContext c) { if (shouldFilter(c.element())) { filteredRecords.inc(); return; } c.output(c.element());}