Skip to main content
A Pipeline encapsulates your entire data processing task, from start to finish. It represents a directed acyclic graph (DAG) of data transformations.

What is a Pipeline?

A Pipeline manages the execution of data processing operations. According to the source code:
/**
 * A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the
 * {@link PCollection PCollections} that the {@link PTransform PTransforms} consume and produce.
 *
 * Each {@link Pipeline} is self-contained and isolated from any other {@link Pipeline}. The
 * {@link PValue PValues} that are inputs and outputs of each of a {@link Pipeline Pipeline's}
 * {@link PTransform PTransforms} are also owned by that {@link Pipeline}.
 */

Creating a Pipeline

Basic Pipeline Creation

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

// Create default options
PipelineOptions options = PipelineOptionsFactory.create();

// Create the pipeline
Pipeline p = Pipeline.create(options);

Pipeline Options

Pipeline options configure pipeline execution, including runner selection and runtime parameters.

Standard Pipeline Options

import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;

// Parse from command line
PipelineOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .create();

// Configure for specific runner (e.g., Dataflow)
DataflowPipelineOptions dataflowOptions = 
    options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-project-id");
dataflowOptions.setRegion("us-central1");
dataflowOptions.setTempLocation("gs://my-bucket/temp");

Pipeline p = Pipeline.create(dataflowOptions);

Custom Pipeline Options

Define custom options for application-specific parameters:
import org.apache.beam.sdk.options.*;

public interface MyOptions extends PipelineOptions {
    @Description("Input file path or pattern")
    @Default.String("gs://my-bucket/input/*.txt")
    String getInputFile();
    void setInputFile(String value);
    
    @Description("Output directory")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
    
    @Description("Minimum word length to count")
    @Default.Integer(5)
    Integer getMinWordLength();
    void setMinWordLength(Integer value);
}

// Usage
MyOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()
    .as(MyOptions.class);

Pipeline p = Pipeline.create(options);

// Access custom options
String inputFile = options.getInputFile();
Integer minLength = options.getMinWordLength();

Running a Pipeline

Direct Execution

Pipeline p = Pipeline.create(options);

// Build pipeline
p.apply(...);

// Run and wait for completion
PipelineResult result = p.run();
result.waitUntilFinish();

Running with Different Runners

The DirectRunner executes pipelines locally for development and testing.
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
    -Pdirect-runner

# Python
python my_pipeline.py --runner=DirectRunner

# Go
go run my_pipeline.go --runner=direct

Pipeline Lifecycle

1. Construction Phase

During construction, you build the pipeline DAG by applying transforms:
Pipeline p = Pipeline.create(options);

// Each apply() adds nodes to the DAG
PCollection<String> lines = p.apply("Read", TextIO.read().from("input.txt"));
PCollection<String> words = lines.apply("Split", new SplitWords());
PCollection<KV<String, Long>> counts = words.apply("Count", Count.perElement());

// No actual data processing happens yet

2. Validation Phase

Before execution, the pipeline is validated:
  • Type checking ensures transforms are compatible
  • Graph structure is verified (no cycles, etc.)
  • Runner capabilities are checked

3. Optimization Phase

The runner may optimize the pipeline:
  • Fusing compatible transforms
  • Reordering operations
  • Eliminating redundant work

4. Execution Phase

PipelineResult result = p.run();
The runner executes the pipeline:
  • Distributes work across workers
  • Processes data elements
  • Manages state and checkpointing
  • Handles failures and retries

5. Monitoring and Completion

// Wait for completion
result.waitUntilFinish();

// Or monitor state
PipelineResult.State state = result.getState();
if (state == PipelineResult.State.DONE) {
    // Pipeline completed successfully
}

Complete Example: Word Count Pipeline

Here’s a complete pipeline from the Apache Beam examples:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;

public class MinimalWordCount {
    public static void main(String[] args) {
        // Create pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();
        
        // Create the pipeline
        Pipeline p = Pipeline.create(options);
        
        // Read input
        p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
        
            // Split into words
            .apply(FlatMapElements.into(TypeDescriptors.strings())
                .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        
            // Filter empty strings
            .apply(Filter.by((String word) -> !word.isEmpty()))
        
            // Count occurrences
            .apply(Count.perElement())
        
            // Format output
            .apply(MapElements.into(TypeDescriptors.strings())
                .via((KV<String, Long> wordCount) ->
                    wordCount.getKey() + ": " + wordCount.getValue()))
        
            // Write output
            .apply(TextIO.write().to("wordcounts"));
        
        // Run the pipeline
        p.run().waitUntilFinish();
    }
}

Best Practices

Immutability: Pipelines are immutable after construction. You cannot modify a pipeline after calling run().
Resource Management: Always clean up resources properly. Use try-with-resources (Java) or context managers (Python) when appropriate.
Testing: Use TestPipeline for unit testing. It provides the same API but runs in-memory for fast testing.

Performance Considerations

  1. Minimize I/O: Read and write efficiently
  2. Avoid shuffles: Reduce GroupByKey operations when possible
  3. Use appropriate runners: Choose the right runner for your use case
  4. Monitor metrics: Track pipeline performance and optimize bottlenecks

Next Steps

PCollections

Learn about distributed data collections

Transforms

Explore data transformation operations

Build docs developers (and LLMs) love