Skip to main content
Transforms are the fundamental operations in Apache Beam that process data in a pipeline.

ParDo

Applies a DoFn to each element of a PCollection.
beam.ParDo(fn, *args, **kwargs)
fn
DoFn
A DoFn instance to apply to each element.
*args
Positional arguments passed to the DoFn, may include side inputs.
**kwargs
Keyword arguments passed to the DoFn.

Usage Example

import apache_beam as beam

class SplitWords(beam.DoFn):
    def process(self, element):
        words = element.split()
        for word in words:
            yield word

with beam.Pipeline() as p:
    lines = p | beam.Create(['Hello World', 'Apache Beam'])
    words = lines | beam.ParDo(SplitWords())

with_exception_handling()

Provides automatic error handling for DoFn processing.
good, bad = (inputs 
             | beam.Map(maybe_erroring_fn)
             .with_exception_handling())
main_tag
str
default:"good"
Tag for successfully processed outputs.
dead_letter_tag
str
default:"bad"
Tag for failed inputs.
exc_class
Exception
default:"Exception"
Exception class or tuple of classes to catch.
partial
bool
default:"False"
Whether to emit partial outputs before errors.
threshold
float
default:"1.0"
Maximum ratio of bad inputs before pipeline fails (0.0-1.0).

DoFn

Base class for defining custom element-wise processing logic.
class MyDoFn(beam.DoFn):
    def process(self, element):
        # Process element and yield results
        yield element * 2

Lifecycle Methods

setup()

Called once per DoFn instance before processing.
class MyDoFn(beam.DoFn):
    def setup(self):
        self.connection = create_connection()
    
    def process(self, element):
        result = self.connection.query(element)
        yield result

start_bundle()

Called at the start of each bundle of elements.
class MyDoFn(beam.DoFn):
    def start_bundle(self):
        self.batch = []
    
    def process(self, element):
        self.batch.append(element)
        yield element

process()

Processes a single element. Must be defined.
def process(self, element, timestamp=DoFn.TimestampParam, 
            window=DoFn.WindowParam):
    yield element

finish_bundle()

Called at the end of each bundle.
def finish_bundle(self):
    # Flush batch processing
    if self.batch:
        self.write_batch(self.batch)

teardown()

Called once when the DoFn is discarded.
def teardown(self):
    if hasattr(self, 'connection'):
        self.connection.close()

Process Parameters

DoFn.process() can accept special parameters:
  • DoFn.ElementParam: The element being processed (default)
  • DoFn.TimestampParam: Element’s timestamp
  • DoFn.WindowParam: Element’s window
  • DoFn.PaneInfoParam: Pane information for the element
  • DoFn.SideInputParam: Access to side inputs
class TimestampDoFn(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        yield (element, timestamp)

Map, FlatMap, Filter

Convenience transforms for common operations.

Map

Applies a function to each element and outputs the result.
beam.Map(fn, *args, **kwargs)
# Double each number
numbers = p | beam.Create([1, 2, 3, 4, 5])
doubled = numbers | beam.Map(lambda x: x * 2)
# Result: [2, 4, 6, 8, 10]

# With additional arguments
scaled = numbers | beam.Map(lambda x, factor: x * factor, factor=10)

FlatMap

Applies a function that returns an iterable, flattening the results.
beam.FlatMap(fn, *args, **kwargs)
# Split lines into words
lines = p | beam.Create(['Hello World', 'Apache Beam'])
words = lines | beam.FlatMap(lambda line: line.split())
# Result: ['Hello', 'World', 'Apache', 'Beam']

# Generate ranges
numbers = p | beam.Create([3, 5, 2])
ranges = numbers | beam.FlatMap(lambda n: range(n))
# Result: [0, 1, 2, 0, 1, 2, 3, 4, 0, 1]

Filter

Keeps only elements for which the predicate returns True.
beam.Filter(fn)
# Keep only even numbers
numbers = p | beam.Create([1, 2, 3, 4, 5, 6])
even = numbers | beam.Filter(lambda x: x % 2 == 0)
# Result: [2, 4, 6]

GroupByKey

Groups key-value pairs by key.
beam.GroupByKey()
import apache_beam as beam

with beam.Pipeline() as p:
    pairs = p | beam.Create([
        ('cat', 1),
        ('dog', 5),
        ('cat', 3),
        ('dog', 2),
        ('cat', 8)
    ])
    
    grouped = pairs | beam.GroupByKey()
    # Result: [('cat', [1, 3, 8]), ('dog', [5, 2])]
    
    result = grouped | beam.Map(lambda kv: (kv[0], sum(kv[1])))
    # Result: [('cat', 12), ('dog', 7)]
GroupByKey requires input elements to be key-value pairs (tuples). The key must be hashable and deterministically encodable.

CombineGlobally

Combines all elements in a PCollection into a single value.
beam.CombineGlobally(fn, *args, **kwargs)
fn
CombineFn or callable
A CombineFn instance or a callable that combines values.

Usage Example

import apache_beam as beam

# Using a simple function
with beam.Pipeline() as p:
    numbers = p | beam.Create([1, 2, 3, 4, 5])
    total = numbers | beam.CombineGlobally(sum)
    # Result: [15]

# Using CombineFn for more control
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # (sum, count)
    
    def add_input(self, accumulator, input):
        sum_val, count = accumulator
        return sum_val + input, count + 1
    
    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)
    
    def extract_output(self, accumulator):
        sum_val, count = accumulator
        return sum_val / count if count else 0

with beam.Pipeline() as p:
    numbers = p | beam.Create([1, 2, 3, 4, 5])
    average = numbers | beam.CombineGlobally(AverageFn())
    # Result: [3.0]

Methods

with_defaults()

Returns a default value when combining empty collections.
total = numbers | beam.CombineGlobally(sum).with_defaults()
# Returns [0] for empty input instead of []

without_defaults()

Does not return a value for empty collections.
total = numbers | beam.CombineGlobally(sum).without_defaults()
# Returns [] for empty input

CombinePerKey

Combines values for each key in a key-value PCollection.
beam.CombinePerKey(fn, *args, **kwargs)
import apache_beam as beam

with beam.Pipeline() as p:
    pairs = p | beam.Create([
        ('cat', 1),
        ('dog', 5),
        ('cat', 3),
        ('dog', 2)
    ])
    
    sums = pairs | beam.CombinePerKey(sum)
    # Result: [('cat', 4), ('dog', 7)]

Flatten

Merges multiple PCollections into a single PCollection.
beam.Flatten()
import apache_beam as beam

with beam.Pipeline() as p:
    pc1 = p | 'Create1' >> beam.Create([1, 2, 3])
    pc2 = p | 'Create2' >> beam.Create([4, 5, 6])
    pc3 = p | 'Create3' >> beam.Create([7, 8, 9])
    
    merged = (pc1, pc2, pc3) | beam.Flatten()
    # Result: [1, 2, 3, 4, 5, 6, 7, 8, 9]
All input PCollections must have the same element type.

Create

Creates a PCollection from in-memory data.
beam.Create(values, reshuffle=True)
values
iterable
An iterable of values to create the PCollection from.
reshuffle
bool
default:"True"
Whether to reshuffle the data for better parallelization.
import apache_beam as beam

with beam.Pipeline() as p:
    # Create from list
    numbers = p | beam.Create([1, 2, 3, 4, 5])
    
    # Create from dict items
    pairs = p | beam.Create([('a', 1), ('b', 2)])
    
    # Create from generator
    range_values = p | beam.Create(range(10))

CombineFn

Base class for defining custom combining logic.
class MyCombineFn(beam.CombineFn):
    def create_accumulator(self):
        # Return initial accumulator value
        return 0
    
    def add_input(self, accumulator, input):
        # Add input to accumulator
        return accumulator + input
    
    def merge_accumulators(self, accumulators):
        # Merge multiple accumulators
        return sum(accumulators)
    
    def extract_output(self, accumulator):
        # Extract final output from accumulator
        return accumulator

Required Methods

create_accumulator
() -> Accumulator
Creates and returns a new accumulator.
add_input
(Accumulator, Input) -> Accumulator
Adds an input element to the accumulator.
merge_accumulators
(Iterable[Accumulator]) -> Accumulator
Merges multiple accumulators into one.
extract_output
(Accumulator) -> Output
Extracts the final output value from the accumulator.

WindowInto

Assigns elements to windows.
beam.WindowInto(windowfn, trigger=None, accumulation_mode=None)
from apache_beam import window

# Fixed windows of 60 seconds
windowed = (
    events 
    | beam.WindowInto(window.FixedWindows(60))
)

# Sliding windows
windowed = (
    events
    | beam.WindowInto(window.SlidingWindows(30, 5))
)

# Session windows
windowed = (
    events
    | beam.WindowInto(window.Sessions(10 * 60))  # 10 minute gap
)

Build docs developers (and LLMs) love