Skip to main content
Transforms are the operations that process data in your Beam pipeline. A PTransform takes one or more PCollections as input and produces one or more PCollections as output.

What is a PTransform?

From the Apache Beam source code:
/**
 * A {@code PTransform<InputT, OutputT>} is an operation that takes an {@code InputT} 
 * (some subtype of {@link PInput}) and produces an {@code OutputT} (some subtype of 
 * {@link POutput}).
 *
 * PTransforms include root PTransforms like TextIO.Read, processing and conversion 
 * operations like ParDo, GroupByKey, Combine, and Count, and outputting PTransforms 
 * like TextIO.Write.
 *
 * Example usage:
 * PCollection<T1> pc1 = ...;
 * PCollection<T2> pc2 =
 *     pc1.apply(ParDo.of(new MyDoFn<T1,KV<K,V>>()))
 *        .apply(GroupByKey.<K, V>create())
 *        .apply(Combine.perKey(new MyKeyedCombineFn<K,V>()))
 *        .apply(ParDo.of(new MyDoFn2<KV<K,V>,T2>()));
 */

Core Transform Types

Beam provides several categories of built-in transforms:

1. Element-wise Transforms

Process each element independently.

2. Aggregating Transforms

Combine multiple elements into aggregated results.

3. Composite Transforms

Combine multiple transforms into reusable components.

4. I/O Transforms

Read from and write to external systems.

ParDo

ParDo is the most general transform - it processes each element with a user-defined function (DoFn).

Basic ParDo

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

static class ComputeLengthFn extends DoFn<String, Integer> {
    @ProcessElement
    public void processElement(@Element String word, OutputReceiver<Integer> out) {
        out.output(word.length());
    }
}

PCollection<String> words = ...;
PCollection<Integer> lengths = words.apply(ParDo.of(new ComputeLengthFn()));

DoFn Lifecycle

DoFns have a lifecycle with several methods:
class MyDoFn extends DoFn<String, String> {
    
    @Setup
    public void setup() {
        // Called once per DoFn instance before processing
        // Initialize expensive resources
    }
    
    @StartBundle
    public void startBundle() {
        // Called at the start of each bundle
    }
    
    @ProcessElement  
    public void processElement(@Element String element, OutputReceiver<String> out) {
        // Called for each element
        out.output(element.toUpperCase());
    }
    
    @FinishBundle
    public void finishBundle(FinishBundleContext context) {
        // Called at the end of each bundle
    }
    
    @Teardown
    public void teardown() {
        // Called once per DoFn instance after processing
        // Clean up resources
    }
}
DoFn restrictions: DoFns must be serializable and thread-safe. Avoid mutable shared state.

Multiple Outputs

DoFns can output to multiple PCollections:
final TupleTag<Integer> evenTag = new TupleTag<Integer>(){};
final TupleTag<Integer> oddTag = new TupleTag<Integer>(){};

PCollectionTuple results = numbers.apply(
    ParDo.of(new DoFn<Integer, Integer>() {
        @ProcessElement
        public void processElement(@Element Integer number, MultiOutputReceiver out) {
            if (number % 2 == 0) {
                out.get(evenTag).output(number);
            } else {
                out.get(oddTag).output(number);
            }
        }
    }).withOutputTags(evenTag, TupleTagList.of(oddTag))
);

PCollection<Integer> evens = results.get(evenTag);
PCollection<Integer> odds = results.get(oddTag);

Common Built-in Transforms

Map

Apply a simple function to each element:
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

// Using MapElements
PCollection<Integer> lengths = words.apply(
    MapElements.into(TypeDescriptors.integers())
        .via(String::length));

// Using lambda
PCollection<String> upper = words.apply(
    MapElements.into(TypeDescriptors.strings())
        .via(word -> word.toUpperCase()));

FlatMap

Produce zero or more outputs per input:
import org.apache.beam.sdk.transforms.FlatMapElements;
import java.util.Arrays;

PCollection<String> words = lines.apply(
    FlatMapElements.into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("\\s+"))));

Filter

Keep only elements that match a predicate:
import org.apache.beam.sdk.transforms.Filter;

// Keep words longer than 5 characters
PCollection<String> longWords = words.apply(
    Filter.by(word -> word.length() > 5));

// Remove empty strings
PCollection<String> nonEmpty = words.apply(
    Filter.by(word -> !word.isEmpty()));

GroupByKey

Group values by their key:
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.values.KV;

// Input: [("cat", 1), ("dog", 5), ("cat", 2), ("dog", 1)]
PCollection<KV<String, Integer>> pairs = ...;

// Group by key
PCollection<KV<String, Iterable<Integer>>> grouped = 
    pairs.apply(GroupByKey.create());
// Output: [("cat", [1, 2]), ("dog", [5, 1])]
GroupByKey triggers a shuffle operation, which can be expensive. Use it judiciously.

CoGroupByKey

Join multiple PCollections by key:
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

final TupleTag<String> emailsTag = new TupleTag<>();
final TupleTag<String> phonesTag = new TupleTag<>();

PCollection<KV<String, String>> emails = ...; // (name, email)
PCollection<KV<String, String>> phones = ...; // (name, phone)

PCollection<KV<String, CoGbkResult>> joined =
    KeyedPCollectionTuple.of(emailsTag, emails)
        .and(phonesTag, phones)
        .apply(CoGroupByKey.create());

Combine

Aggregate values:
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Count;

// Count globally
PCollection<Long> count = words.apply(Count.globally());

// Count per element
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

// Sum integers
PCollection<Integer> sum = numbers.apply(Sum.integersGlobally());

// Sum per key
PCollection<KV<String, Integer>> sums = pairs.apply(Sum.integersPerKey());

// Maximum
PCollection<Integer> max = numbers.apply(Max.integersGlobally());

Custom CombineFn

Create custom aggregation logic:
import org.apache.beam.sdk.transforms.Combine.CombineFn;

static class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
    static class Accum {
        int sum = 0;
        int count = 0;
    }
    
    @Override
    public Accum createAccumulator() {
        return new Accum();
    }
    
    @Override
    public Accum addInput(Accum accum, Integer input) {
        accum.sum += input;
        accum.count++;
        return accum;
    }
    
    @Override
    public Accum mergeAccumulators(Iterable<Accum> accums) {
        Accum merged = createAccumulator();
        for (Accum accum : accums) {
            merged.sum += accum.sum;
            merged.count += accum.count;
        }
        return merged;
    }
    
    @Override
    public Double extractOutput(Accum accum) {
        return accum.count == 0 ? 0.0 : (double) accum.sum / accum.count;
    }
}

PCollection<Double> average = numbers.apply(Combine.globally(new AverageFn()));

Flatten

Merge multiple PCollections of the same type:
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollectionList;

PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;

PCollection<String> merged = PCollectionList
    .of(pc1).and(pc2).and(pc3)
    .apply(Flatten.pCollections());

Partition

Split a PCollection into multiple partitions:
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.values.PCollectionList;

PCollectionList<Integer> partitioned = numbers.apply(
    Partition.of(3, (Integer num, int numPartitions) -> num % numPartitions));

PCollection<Integer> partition0 = partitioned.get(0);
PCollection<Integer> partition1 = partitioned.get(1);
PCollection<Integer> partition2 = partitioned.get(2);

Composite Transforms

Create reusable transform pipelines:
static class CountWords 
    extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
        return lines
            .apply("ExtractWords", FlatMapElements
                .into(TypeDescriptors.strings())
                .via(line -> Arrays.asList(line.split("\\s+"))))
            .apply("FilterEmpty", Filter.by(word -> !word.isEmpty()))
            .apply("CountPerElement", Count.perElement());
    }
}

// Usage
PCollection<String> lines = ...;
PCollection<KV<String, Long>> counts = lines.apply(new CountWords());
Composite transforms improve code reusability and make your pipeline structure clearer.

Best Practices

Name your transforms: Always provide descriptive names to help with debugging and monitoring.
Avoid non-deterministic operations: Transforms should produce the same output for the same input to support fault tolerance.

Next Steps

Windowing

Learn how to divide data into time-based windows

Triggers

Control when results are emitted

Build docs developers (and LLMs) love