The transforms package provides the fundamental PTransforms for processing data in Apache Beam pipelines.
ParDo
ParDo is the core element-wise PTransform in Apache Beam, invoking a user-specified function on each element of the input PCollection.
ParDo
Applies a DoFn to each element, producing a single output PCollection.
func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection
The scope to insert the transform into
A DoFn function or struct with ProcessElement method
The input PCollection to process
Optional parameters including SideInput and TypeDefinition
Output PCollection with transformed elements
Example:
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)
// Define a DoFn
func toUpper(word string) string {
return strings.ToUpper(word)
}
// Register it
func init() {
register.Function1x1(toUpper)
}
// Use in pipeline
words := beam.Create(root, "hello", "world")
upperWords := beam.ParDo(root, toUpper, words)
ParDo0
Applies a DoFn with zero outputs (side effects only).
func ParDo0(s Scope, dofn any, col PCollection, opts ...Option)
The scope to insert the transform into
A DoFn that produces no output elements
The input PCollection to process
Example:
func logElement(element string) {
log.Printf("Processing: %s", element)
}
func init() {
register.Function1x0(logElement)
}
beam.ParDo0(root, logElement, data)
ParDo2
Applies a DoFn with two output PCollections.
func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection)
Returns
(PCollection, PCollection)
Two output PCollections from the DoFn
Example:
// Partition words by length
func partitionByLength(word string, cutoff int, short, long func(string)) {
if len(word) < cutoff {
short(word)
} else {
long(word)
}
}
func init() {
register.Function4x0(partitionByLength)
}
cutoff := beam.Create(root, 5)
shortWords, longWords := beam.ParDo2(root, partitionByLength, words,
beam.SideInput{Input: cutoff})
ParDo3, ParDo4, ParDo5, ParDo6, ParDo7
Apply a DoFn with 3-7 output PCollections.
func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)
func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)
// ... etc
ParDoN
Applies a DoFn with any number of outputs.
func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection
Slice of output PCollections
DoFn Structure
DoFns can be simple functions or structs with lifecycle methods.
Simple Function DoFn
func processElement(element InputType) OutputType {
// Process element
return transformed
}
Struct DoFn with Lifecycle
type MyDoFn struct {
// State fields (JSON serializable)
Config string
}
func (fn *MyDoFn) Setup(ctx context.Context) error {
// One-time initialization
return nil
}
func (fn *MyDoFn) StartBundle(ctx context.Context) error {
// Per-bundle initialization
return nil
}
func (fn *MyDoFn) ProcessElement(ctx context.Context, element InputType, emit func(OutputType)) error {
// Process each element
emit(transform(element))
return nil
}
func (fn *MyDoFn) FinishBundle(ctx context.Context) error {
// Per-bundle finalization
return nil
}
func (fn *MyDoFn) Teardown() error {
// Cleanup
return nil
}
Access additional PCollections within a DoFn.
func filterByCutoff(word string, cutoff int, emit func(string)) {
if len(word) >= cutoff {
emit(word)
}
}
func init() {
register.Function3x0(filterByCutoff)
}
cutoff := beam.Create(root, 5)
filtered := beam.ParDo(root, filterByCutoff, words,
beam.SideInput{Input: cutoff})
GroupByKey
Groups values by key and window, converting PCollection<KV<K,V>> to PCollection<KV<K,Iter<V>>>.
GroupByKey
func GroupByKey(s Scope, a PCollection) PCollection
The scope to insert the transform into
Input PCollection of type KV<K,V>
Output PCollection of type KV<K,Iter<V>> with values grouped by key
Example:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"
// Create key-value pairs
type WordCount struct {
Word string
Count int
}
func extractWord(wc WordCount) (string, int) {
return wc.Word, wc.Count
}
func init() {
register.Function1x2(extractWord)
}
wordCounts := beam.Create(root,
WordCount{"hello", 1},
WordCount{"world", 1},
WordCount{"hello", 1})
kvPairs := beam.ParDo(root, extractWord, wordCounts)
grouped := beam.GroupByKey(root, kvPairs)
CoGroupByKey
Groups multiple PCollections by a common key.
func CoGroupByKey(s Scope, cols ...PCollection) PCollection
The scope to insert the transform into
Multiple input PCollections of type KV<K,V> with the same key type
CoGBK result with iterators for each input collection
Example:
// Join two datasets by common key
emails := beam.Create(root,
KV{"user1", "[email protected]"},
KV{"user2", "[email protected]"})
names := beam.Create(root,
KV{"user1", "Alice"},
KV{"user2", "Bob"})
joined := beam.CoGroupByKey(root, emails, names)
Reshuffle
Breaks fusion and allows different sharding for subsequent operations.
func Reshuffle(s Scope, col PCollection) PCollection
The scope to insert the transform into
Input PCollection to reshuffle
Reshuffled PCollection with same elements and windowing
Example:
// High parallelism computation
processed := beam.ParDo(root, expensiveComputation, input)
// Reshuffle before writing to reduce output parallelism
reshuffled := beam.Reshuffle(root, processed)
beam.ParDo0(root, writeToFile, reshuffled)
Combine
Combines all elements in a PCollection or groups using a CombineFn.
Combine
Globally combines all elements in a PCollection.
func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
The scope to insert the transform into
A CombineFn or simple combining function
Input PCollection to combine
Optional type definitions
Single-element PCollection with the combined result
Example:
// Simple combining function
func sum(a, b int) int {
return a + b
}
func init() {
register.Function2x1(sum)
}
numbers := beam.Create(root, 1, 2, 3, 4, 5)
total := beam.Combine(root, sum, numbers)
CombinePerKey
Combines values for each key after GroupByKey.
func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
The scope to insert the transform into
A CombineFn, optionally taking a key parameter
Input PCollection of type KV<K,V>
Optional type definitions
PCollection of type KV<K,CombinedValue>
Example:
// Word count using CombinePerKey
func init() {
register.Function2x1(sum)
}
words := beam.Create(root, "hello", "world", "hello")
// Convert to KV pairs with count 1
func wordToKV(word string) (string, int) {
return word, 1
}
func init() {
register.Function1x2(wordToKV)
}
kvPairs := beam.ParDo(root, wordToKV, words)
counts := beam.CombinePerKey(root, sum, kvPairs)
CombineFn Structure
For complex combining logic, implement a CombineFn:
type AverageFn struct{}
type acc struct {
Sum float64
Count int
}
func (fn *AverageFn) CreateAccumulator() acc {
return acc{}
}
func (fn *AverageFn) AddInput(a acc, value float64) acc {
return acc{Sum: a.Sum + value, Count: a.Count + 1}
}
func (fn *AverageFn) MergeAccumulators(a, b acc) acc {
return acc{Sum: a.Sum + b.Sum, Count: a.Count + b.Count}
}
func (fn *AverageFn) ExtractOutput(a acc) float64 {
if a.Count == 0 {
return 0
}
return a.Sum / float64(a.Count)
}
Registration
All DoFns and functions must be registered for distributed execution:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
func init() {
// Simple functions
register.Function1x1(myFunc1x1) // 1 input, 1 output
register.Function2x1(myFunc2x1) // 2 inputs, 1 output
register.Function1x2(myFunc1x2) // 1 input, 2 outputs
// DoFn structs
register.DoFn3x1[context.Context, InputType, func(OutputType), error](&MyDoFn{})
// Emitters
register.Emitter1[OutputType]()
register.Emitter2[KeyType, ValueType]()
// Iterators
register.Iter1[ValueType]()
}
Best Practices
- Keep DoFns stateless when possible
- Use struct fields for construction-time configuration
- Implement Setup/Teardown for expensive resource initialization
- Make ProcessElement deterministic for fault tolerance
- GroupByKey triggers a shuffle operation (expensive)
- Ensure key types have deterministic coders
- Consider CombinePerKey instead of GroupByKey + ParDo for aggregations
- Be aware of memory implications with large value lists per key
- Use Combine instead of GroupByKey when aggregating all elements
- CombineFns enable efficient parallel and incremental combining
- Associative and commutative operations work best
- Implement proper accumulator types for complex aggregations