Skip to main content
The transforms package provides the fundamental PTransforms for processing data in Apache Beam pipelines.

ParDo

ParDo is the core element-wise PTransform in Apache Beam, invoking a user-specified function on each element of the input PCollection.

ParDo

Applies a DoFn to each element, producing a single output PCollection.
func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
dofn
any
A DoFn function or struct with ProcessElement method
col
PCollection
The input PCollection to process
opts
...Option
Optional parameters including SideInput and TypeDefinition
Returns
PCollection
Output PCollection with transformed elements
Example:
import (
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

// Define a DoFn
func toUpper(word string) string {
    return strings.ToUpper(word)
}

// Register it
func init() {
    register.Function1x1(toUpper)
}

// Use in pipeline
words := beam.Create(root, "hello", "world")
upperWords := beam.ParDo(root, toUpper, words)

ParDo0

Applies a DoFn with zero outputs (side effects only).
func ParDo0(s Scope, dofn any, col PCollection, opts ...Option)
s
Scope
The scope to insert the transform into
dofn
any
A DoFn that produces no output elements
col
PCollection
The input PCollection to process
opts
...Option
Optional parameters
Example:
func logElement(element string) {
    log.Printf("Processing: %s", element)
}

func init() {
    register.Function1x0(logElement)
}

beam.ParDo0(root, logElement, data)

ParDo2

Applies a DoFn with two output PCollections.
func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection)
Returns
(PCollection, PCollection)
Two output PCollections from the DoFn
Example:
// Partition words by length
func partitionByLength(word string, cutoff int, short, long func(string)) {
    if len(word) < cutoff {
        short(word)
    } else {
        long(word)
    }
}

func init() {
    register.Function4x0(partitionByLength)
}

cutoff := beam.Create(root, 5)
shortWords, longWords := beam.ParDo2(root, partitionByLength, words, 
    beam.SideInput{Input: cutoff})

ParDo3, ParDo4, ParDo5, ParDo6, ParDo7

Apply a DoFn with 3-7 output PCollections.
func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)
func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)
// ... etc

ParDoN

Applies a DoFn with any number of outputs.
func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection
Returns
[]PCollection
Slice of output PCollections

DoFn Structure

DoFns can be simple functions or structs with lifecycle methods.

Simple Function DoFn

func processElement(element InputType) OutputType {
    // Process element
    return transformed
}

Struct DoFn with Lifecycle

type MyDoFn struct {
    // State fields (JSON serializable)
    Config string
}

func (fn *MyDoFn) Setup(ctx context.Context) error {
    // One-time initialization
    return nil
}

func (fn *MyDoFn) StartBundle(ctx context.Context) error {
    // Per-bundle initialization
    return nil
}

func (fn *MyDoFn) ProcessElement(ctx context.Context, element InputType, emit func(OutputType)) error {
    // Process each element
    emit(transform(element))
    return nil
}

func (fn *MyDoFn) FinishBundle(ctx context.Context) error {
    // Per-bundle finalization
    return nil
}

func (fn *MyDoFn) Teardown() error {
    // Cleanup
    return nil
}

Side Inputs

Access additional PCollections within a DoFn.
func filterByCutoff(word string, cutoff int, emit func(string)) {
    if len(word) >= cutoff {
        emit(word)
    }
}

func init() {
    register.Function3x0(filterByCutoff)
}

cutoff := beam.Create(root, 5)
filtered := beam.ParDo(root, filterByCutoff, words, 
    beam.SideInput{Input: cutoff})

GroupByKey

Groups values by key and window, converting PCollection<KV<K,V>> to PCollection<KV<K,Iter<V>>>.

GroupByKey

func GroupByKey(s Scope, a PCollection) PCollection
s
Scope
The scope to insert the transform into
a
PCollection
Input PCollection of type KV<K,V>
Returns
PCollection
Output PCollection of type KV<K,Iter<V>> with values grouped by key
Example:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"

// Create key-value pairs
type WordCount struct {
    Word  string
    Count int
}

func extractWord(wc WordCount) (string, int) {
    return wc.Word, wc.Count
}

func init() {
    register.Function1x2(extractWord)
}

wordCounts := beam.Create(root, 
    WordCount{"hello", 1},
    WordCount{"world", 1},
    WordCount{"hello", 1})

kvPairs := beam.ParDo(root, extractWord, wordCounts)
grouped := beam.GroupByKey(root, kvPairs)

CoGroupByKey

Groups multiple PCollections by a common key.
func CoGroupByKey(s Scope, cols ...PCollection) PCollection
s
Scope
The scope to insert the transform into
cols
...PCollection
Multiple input PCollections of type KV<K,V> with the same key type
Returns
PCollection
CoGBK result with iterators for each input collection
Example:
// Join two datasets by common key
emails := beam.Create(root, 
    KV{"user1", "[email protected]"},
    KV{"user2", "[email protected]"})

names := beam.Create(root,
    KV{"user1", "Alice"},
    KV{"user2", "Bob"})

joined := beam.CoGroupByKey(root, emails, names)

Reshuffle

Breaks fusion and allows different sharding for subsequent operations.
func Reshuffle(s Scope, col PCollection) PCollection
s
Scope
The scope to insert the transform into
col
PCollection
Input PCollection to reshuffle
Returns
PCollection
Reshuffled PCollection with same elements and windowing
Example:
// High parallelism computation
processed := beam.ParDo(root, expensiveComputation, input)

// Reshuffle before writing to reduce output parallelism
reshuffled := beam.Reshuffle(root, processed)
beam.ParDo0(root, writeToFile, reshuffled)

Combine

Combines all elements in a PCollection or groups using a CombineFn.

Combine

Globally combines all elements in a PCollection.
func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
combinefn
any
A CombineFn or simple combining function
col
PCollection
Input PCollection to combine
opts
...Option
Optional type definitions
Returns
PCollection
Single-element PCollection with the combined result
Example:
// Simple combining function
func sum(a, b int) int {
    return a + b
}

func init() {
    register.Function2x1(sum)
}

numbers := beam.Create(root, 1, 2, 3, 4, 5)
total := beam.Combine(root, sum, numbers)

CombinePerKey

Combines values for each key after GroupByKey.
func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
combinefn
any
A CombineFn, optionally taking a key parameter
col
PCollection
Input PCollection of type KV<K,V>
opts
...Option
Optional type definitions
Returns
PCollection
PCollection of type KV<K,CombinedValue>
Example:
// Word count using CombinePerKey
func init() {
    register.Function2x1(sum)
}

words := beam.Create(root, "hello", "world", "hello")

// Convert to KV pairs with count 1
func wordToKV(word string) (string, int) {
    return word, 1
}

func init() {
    register.Function1x2(wordToKV)
}

kvPairs := beam.ParDo(root, wordToKV, words)
counts := beam.CombinePerKey(root, sum, kvPairs)

CombineFn Structure

For complex combining logic, implement a CombineFn:
type AverageFn struct{}

type acc struct {
    Sum   float64
    Count int
}

func (fn *AverageFn) CreateAccumulator() acc {
    return acc{}
}

func (fn *AverageFn) AddInput(a acc, value float64) acc {
    return acc{Sum: a.Sum + value, Count: a.Count + 1}
}

func (fn *AverageFn) MergeAccumulators(a, b acc) acc {
    return acc{Sum: a.Sum + b.Sum, Count: a.Count + b.Count}
}

func (fn *AverageFn) ExtractOutput(a acc) float64 {
    if a.Count == 0 {
        return 0
    }
    return a.Sum / float64(a.Count)
}

Registration

All DoFns and functions must be registered for distributed execution:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/register"

func init() {
    // Simple functions
    register.Function1x1(myFunc1x1)  // 1 input, 1 output
    register.Function2x1(myFunc2x1)  // 2 inputs, 1 output
    register.Function1x2(myFunc1x2)  // 1 input, 2 outputs
    
    // DoFn structs
    register.DoFn3x1[context.Context, InputType, func(OutputType), error](&MyDoFn{})
    
    // Emitters
    register.Emitter1[OutputType]()
    register.Emitter2[KeyType, ValueType]()
    
    // Iterators
    register.Iter1[ValueType]()
}

Best Practices

  • Keep DoFns stateless when possible
  • Use struct fields for construction-time configuration
  • Implement Setup/Teardown for expensive resource initialization
  • Make ProcessElement deterministic for fault tolerance
  • GroupByKey triggers a shuffle operation (expensive)
  • Ensure key types have deterministic coders
  • Consider CombinePerKey instead of GroupByKey + ParDo for aggregations
  • Be aware of memory implications with large value lists per key
  • Use Combine instead of GroupByKey when aggregating all elements
  • CombineFns enable efficient parallel and incremental combining
  • Associative and commutative operations work best
  • Implement proper accumulator types for complex aggregations

Build docs developers (and LLMs) love