Skip to main content
The Beam programming model provides a structured approach to building data processing pipelines that work across batch and streaming data sources.

Pipeline Construction

A Beam pipeline follows a clear pattern:
  1. Create a Pipeline object
  2. Read data from one or more sources into PCollections
  3. Apply transforms to process and transform PCollections
  4. Write results to one or more sinks
  5. Run the pipeline on a runner

Basic Pipeline Pattern

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;

public class BasicPipeline {
    public static void main(String[] args) {
        // 1. Create pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();
        
        // 2. Create the pipeline
        Pipeline p = Pipeline.create(options);
        
        // 3. Read data (creates a PCollection)
        PCollection<String> lines = p.apply(
            TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"));
        
        // 4. Transform data
        PCollection<String> words = lines.apply(
            FlatMapElements.into(TypeDescriptors.strings())
                .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))));
        
        // 5. Write results
        words.apply(TextIO.write().to("output"));
        
        // 6. Run the pipeline
        p.run().waitUntilFinish();
    }
}

Pipeline Options

Pipeline options configure how your pipeline executes. They specify:
  • Which runner to use (DirectRunner, DataflowRunner, FlinkRunner, etc.)
  • Runner-specific settings (project ID, staging location, etc.)
  • Custom application parameters
import org.apache.beam.sdk.options.*;

public interface MyOptions extends PipelineOptions {
    @Description("Input file pattern")
    @Default.String("gs://my-bucket/input/*.txt")
    String getInputFile();
    void setInputFile(String value);
    
    @Description("Output path")
    @Validation.Required
    String getOutputPath();
    void setOutputPath(String value);
}

// Parse options from command line
MyOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .as(MyOptions.class);

Pipeline p = Pipeline.create(options);

Transform Application Patterns

Chaining Transforms

Transforms can be chained to create a processing pipeline:
PCollection<String> results = input
    .apply("Step1", ParDo.of(new TransformFn1()))
    .apply("Step2", ParDo.of(new TransformFn2()))
    .apply("Step3", Combine.globally(new CombineFn()));

Branching Pipelines

A single PCollection can be used as input to multiple transforms:
PCollection<String> input = p.apply(TextIO.read().from("input.txt"));

// Branch 1: Count words
PCollection<KV<String, Long>> wordCounts = input
    .apply("CountWords", new CountWords());

// Branch 2: Find long lines  
PCollection<String> longLines = input
    .apply("FilterLongLines", Filter.by(line -> line.length() > 100));

// Each branch processes independently
wordCounts.apply("WriteCount", TextIO.write().to("counts"));
longLines.apply("WriteLong", TextIO.write().to("long-lines"));

Composite Transforms

Create reusable transform components by combining multiple transforms:
static class CountWords 
    extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
        return lines
            .apply("ExtractWords", 
                FlatMapElements.into(TypeDescriptors.strings())
                    .via(line -> Arrays.asList(line.split("\\s+"))))
            .apply("FilterEmpty", 
                Filter.by(word -> !word.isEmpty()))
            .apply("CountPerElement", 
                Count.perElement());
    }
}

// Usage
PCollection<String> lines = ...;
PCollection<KV<String, Long>> counts = lines.apply(new CountWords());
Composite transforms improve code organization, enable reuse, and make pipelines easier to understand and maintain.

Multiple Inputs and Outputs

Multiple Outputs (Tagged)

// Define output tags
final TupleTag<String> upperTag = new TupleTag<String>(){};
final TupleTag<String> lowerTag = new TupleTag<String>(){};

PCollectionTuple results = input.apply(
    ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(@Element String word, MultiOutputReceiver out) {
            if (word.equals(word.toUpperCase())) {
                out.get(upperTag).output(word);
            } else {
                out.get(lowerTag).output(word);
            }
        }
    }).withOutputTags(upperTag, TupleTagList.of(lowerTag))
);

PCollection<String> upperWords = results.get(upperTag);
PCollection<String> lowerWords = results.get(lowerTag);

Merging Multiple Inputs

PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;

// Flatten multiple PCollections
PCollection<String> merged = PCollectionList
    .of(pc1).and(pc2).and(pc3)
    .apply(Flatten.pCollections());

Best Practices

Naming transforms: Always provide meaningful names to transforms. This helps with monitoring, debugging, and understanding the pipeline structure.
Avoid side effects: PTransforms should be deterministic and avoid side effects. The same input should always produce the same output, regardless of how many times the transform is executed.
Start simple: Begin with a minimal pipeline and add complexity gradually. Test each transform independently before combining them.

Testing Pipelines

Beam provides testing utilities to validate pipeline logic:
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
public void testPipeline() {
    PCollection<String> input = p.apply(Create.of("hello", "world"));
    PCollection<Integer> lengths = input.apply(
        MapElements.into(TypeDescriptors.integers())
            .via(String::length));
    
    PAssert.that(lengths).containsInAnyOrder(5, 5);
    p.run();
}

Next Steps

Pipelines

Deep dive into Pipeline creation and configuration

PCollections

Learn about working with distributed data

Transforms

Explore the full range of available transforms

Windowing

Handle unbounded data with windows

Build docs developers (and LLMs) love