Skip to main content
The WordCount examples demonstrate core Beam concepts through progressively complex implementations. Each example builds on the previous one, introducing new features and best practices.

Overview

WordCount is the “Hello World” of data processing pipelines. These examples show you how to:
  • Read text data from files
  • Transform and process data using Beam transforms
  • Count and aggregate results
  • Write output to files
  • Configure and run pipelines on different runners
1

Start with MinimalWordCount

Begin with the simplest implementation to understand the basic pipeline structure.
2

Explore WordCount

Learn best practices including custom transforms, pipeline options, and metrics.
3

Debug with DebuggingWordCount

Add logging, metrics, and testing to your pipelines.
4

Handle streaming with WindowedWordCount

Process unbounded data with windowing and timestamps.

MinimalWordCount

The simplest WordCount implementation focuses on the core pipeline structure without additional complexity.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;

public class MinimalWordCount {
public static void main(String[] args) {
// Create pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    // Split lines into words
    .apply(FlatMapElements
        .into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
    // Filter empty words
    .apply(Filter.by((String word) -> !word.isEmpty()))
    // Count occurrences
    .apply(Count.perElement())
    // Format as text
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((KV<String, Long> wordCount) -> 
            wordCount.getKey() + ": " + wordCount.getValue()))
    // Write results
    .apply(TextIO.write().to("wordcounts"));

p.run().waitUntilFinish();
}
}
Key concepts:
  • TextIO.read(): Reads text files line by line
  • FlatMapElements: Splits each line into multiple words
  • Filter: Removes empty strings
  • Count.perElement(): Counts occurrences of each unique word
  • MapElements: Formats output as “word: count”
  • TextIO.write(): Writes results to files

WordCount with Best Practices

The full WordCount example demonstrates production-ready patterns:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class WordCount {

// Custom DoFn with metrics
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = 
    Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist = 
    Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
private final Pattern splitPattern = Pattern.compile("[^\\p{L}]+");

@ProcessElement
public void processElement(@Element String element, 
                           OutputReceiver<String> receiver) {
  lineLenDist.update(element.length());
  
  if (element.trim().isEmpty()) {
    emptyLines.inc();
  }

  Stream<String> stream = splitPattern.splitAsStream(element);
  stream.forEach(word -> {
    if (!word.isEmpty()) {
      receiver.output(word);
    }
  });
}
}

// Composite transform for reusability
public static class CountWords 
  extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
  PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
  PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
  return wordCounts;
}
}

// Custom pipeline options
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);

@Description("Path of the file to write to")
@Validation.Required
String getOutput();
void setOutput(String value);
}

static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    .apply(new CountWords())
    .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
      @Override
      public String apply(KV<String, Long> input) {
        return input.getKey() + ": " + input.getValue();
      }
    }))
    .apply("WriteCounts", TextIO.write().to(options.getOutput()));

p.run().waitUntilFinish();
}

public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .as(WordCountOptions.class);
runWordCount(options);
}
}
New concepts:
  • Custom DoFn classes: Define reusable processing logic
  • Metrics: Track pipeline behavior (counters, distributions)
  • Composite transforms: Bundle multiple transforms for reuse
  • Pipeline options: Configure pipelines via command-line arguments
  • Named transforms: Easier debugging and monitoring

Running WordCount

Local Execution

Run with the DirectRunner for local testing:
mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--output=./wordcounts"

Running on Cloud Dataflow

mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--runner=DataflowRunner \
               --project=YOUR_PROJECT_ID \
               --region=us-central1 \
               --tempLocation=gs://YOUR_BUCKET/temp/ \
               --output=gs://YOUR_BUCKET/output/wordcounts"
mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--runner=FlinkRunner \
               --flinkMaster=localhost:8081 \
               --output=./wordcounts"

WindowedWordCount

The windowed example shows how to process streaming data with time-based windows:
// Add timestamps to data
PCollection<String> input = pipeline
    .apply(TextIO.read().from(options.getInputFile()))
    .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));

// Apply fixed windows
PCollection<String> windowedWords = input.apply(
    Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

// Reuse CountWords transform
PCollection<KV<String, Long>> wordCounts = 
    windowedWords.apply(new WordCount.CountWords());

// Write one file per window
wordCounts
    .apply(MapElements.via(new WordCount.FormatAsTextFn()))
    .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
Windowing concepts:
  • Timestamps: Associate data with event times
  • Windows: Group data into time-based buckets
  • FixedWindows: Non-overlapping time intervals
  • Per-window processing: Process each window independently

Key Takeaways

Start Simple

Begin with MinimalWordCount to understand core concepts before adding complexity.

Add Metrics

Use counters and distributions to monitor pipeline behavior in production.

Create Composite Transforms

Bundle related transforms for reusability and better testing.

Use Pipeline Options

Make pipelines configurable via command-line arguments instead of hardcoding values.

Next Steps

Cookbook Examples

Explore common patterns like filtering, joining, and combining data.

Transforms Guide

Learn about all available Beam transforms.

I/O Connectors

Read from and write to various data sources.

Runners

Execute pipelines on different distributed processing backends.

Build docs developers (and LLMs) love