Transforms are the fundamental building blocks for processing data in Apache Beam. They take PCollections as input and produce PCollections as output.
The abstract base class for all transforms.
public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
expand()
Defines how the transform processes input to produce output.
public abstract OutputT expand(InputT input)
The input PCollection or PInput
The output PCollection or POutput
ParDo
The core element-wise transform for processing elements independently.
ParDo.of(DoFn)
Creates a ParDo transform with the specified DoFn.
public static <InputT, OutputT> SingleOutput<InputT, OutputT> of(
DoFn<InputT, OutputT> fn)
fn
DoFn<InputT, OutputT>
required
The function to apply to each element
transform
SingleOutput<InputT, OutputT>
A ParDo transform that can be applied to a PCollection
Example:
PCollection<String> words = lines.apply(
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String line,
OutputReceiver<String> out) {
for (String word : line.split("\\s+")) {
out.output(word);
}
}
})
);
Adds side inputs to the ParDo transform.
public SingleOutput<InputT, OutputT> withSideInputs(
PCollectionView<?>... sideInputs)
sideInputs
PCollectionView<?>...
required
Side input views accessible in the DoFn
Example:
PCollectionView<Integer> maxLengthView =
maxLength.apply(View.asSingleton());
PCollection<String> filtered = words.apply(
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
int max = c.sideInput(maxLengthView);
if (c.element().length() <= max) {
c.output(c.element());
}
}
}).withSideInputs(maxLengthView)
);
Specifies multiple output tags for the transform.
public MultiOutput<InputT, OutputT> withOutputTags(
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags)
mainOutputTag
TupleTag<OutputT>
required
Tag for the main output
Tags for additional outputs
Example:
final TupleTag<String> validTag = new TupleTag<String>(){};
final TupleTag<String> invalidTag = new TupleTag<String>(){};
PCollectionTuple results = input.apply(
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String element,
MultiOutputReceiver out) {
if (isValid(element)) {
out.get(validTag).output(element);
} else {
out.get(invalidTag).output(element);
}
}
}).withOutputTags(validTag, TupleTagList.of(invalidTag))
);
PCollection<String> valid = results.get(validTag);
PCollection<String> invalid = results.get(invalidTag);
DoFn
The function class used with ParDo to process elements.
public abstract class DoFn<InputT, OutputT>
Annotations
@ProcessElement
Marks the method that processes each element.
@ProcessElement
public void processElement(
@Element InputT element,
OutputReceiver<OutputT> out) {
// Process element and output results
}
@StartBundle
Called before processing a bundle of elements.
@StartBundle
public void startBundle(StartBundleContext c) {
// Initialize resources
}
@FinishBundle
Called after processing a bundle of elements.
@FinishBundle
public void finishBundle(FinishBundleContext c) {
// Clean up resources
}
@Setup
Called once per DoFn instance before any bundles are processed.
@Setup
public void setup() {
// One-time initialization
}
@Teardown
Called once per DoFn instance when it’s being discarded.
@Teardown
public void teardown() {
// One-time cleanup
}
GroupByKey
Groups elements by key and window.
create()
Creates a GroupByKey transform.
public static <K, V> GroupByKey<K, V> create()
A transform that groups PCollection<KV<K, V>> into PCollection<KV<K, Iterable<V>>>
Example:
PCollection<KV<String, Integer>> keyedValues = ...;
PCollection<KV<String, Iterable<Integer>>> grouped =
keyedValues.apply(GroupByKey.create());
Note: Keys are compared by encoding them with their Coder and comparing the encoded bytes. The key Coder must be deterministic.
Combine
Combines elements in a PCollection.
globally(CombineFn)
Combines all elements in each window.
public static <InputT, OutputT> Globally<InputT, OutputT> globally(
GlobalCombineFn<? super InputT, ?, OutputT> fn)
fn
GlobalCombineFn<InputT, ?, OutputT>
required
The combining function
transform
Globally<InputT, OutputT>
A transform that combines all elements
Example:
PCollection<Integer> numbers = ...;
PCollection<Integer> sum = numbers.apply(
Combine.globally(Sum.ofIntegers())
);
perKey(CombineFn)
Combines values per key.
public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn)
fn
GlobalCombineFn<InputT, ?, OutputT>
required
The combining function
transform
PerKey<K, InputT, OutputT>
A transform that combines values for each key
Example:
PCollection<KV<String, Integer>> keyedValues = ...;
PCollection<KV<String, Integer>> sums = keyedValues.apply(
Combine.perKey(Sum.ofIntegers())
);
Flatten
Merges multiple PCollections into a single PCollection.
pCollections()
Flattens a PCollectionList into a single PCollection.
public static <T> PCollections<T> pCollections()
A transform that flattens multiple PCollections
Example:
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollection<String> merged = PCollectionList
.of(pc1).and(pc2).and(pc3)
.apply(Flatten.pCollections());
iterables()
Flattens PCollection<Iterable<T>> into PCollection<T>.
public static <T> Iterables<T> iterables()
Example:
PCollection<Iterable<Integer>> iterables = ...;
PCollection<Integer> flattened =
iterables.apply(Flatten.iterables());
Partition
Splits a PCollection into multiple output PCollections.
of(int, PartitionFn)
Partitions elements into n partitions.
public static <T> Partition<T> of(
int numPartitions,
PartitionFn<? super T> partitionFn)
The number of partitions to create
Function that determines which partition each element goes to
Example:
PCollectionList<Integer> partitions = numbers.apply(
Partition.of(3, (num, numPartitions) -> num % numPartitions)
);
PCollection<Integer> partition0 = partitions.get(0);
PCollection<Integer> partition1 = partitions.get(1);
PCollection<Integer> partition2 = partitions.get(2);
Filter
Filters elements based on a predicate.
by(SerializableFunction)
public static <T> Filter<T> by(
SerializableFunction<T, Boolean> predicate)
predicate
SerializableFunction<T, Boolean>
required
Function that returns true for elements to keep
Example:
PCollection<String> filtered = words.apply(
Filter.by(word -> word.length() > 3)
);
MapElements
Maps elements to a different type.
into(TypeDescriptor).via(SerializableFunction)
MapElements.into(TypeDescriptor<OutputT> outputType)
.via(SerializableFunction<InputT, OutputT> fn)
outputType
TypeDescriptor<OutputT>
required
The type descriptor for output elements
fn
SerializableFunction<InputT, OutputT>
required
The mapping function
Example:
PCollection<Integer> lengths = words.apply(
MapElements.into(TypeDescriptors.integers())
.via(word -> word.length())
);
FlatMapElements
Maps each element to zero or more elements.
into(TypeDescriptor).via(SerializableFunction)
FlatMapElements.into(TypeDescriptor<OutputT> outputType)
.via(SerializableFunction<InputT, Iterable<OutputT>> fn)
Example:
PCollection<String> words = lines.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\s+")))
);
CoGroupByKey
Joins two or more PCollections by key.
Example:
PCollection<KV<String, String>> emails = ...;
PCollection<KV<String, String>> phones = ...;
final TupleTag<String> emailTag = new TupleTag<>();
final TupleTag<String> phoneTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> joined =
KeyedPCollectionTuple
.of(emailTag, emails)
.and(phoneTag, phones)
.apply(CoGroupByKey.create());