Transforms are the fundamental operations in Apache Beam that process data in a pipeline.
ParDo
Applies a DoFn to each element of a PCollection.
beam.ParDo(fn, *args, **kwargs)
A DoFn instance to apply to each element.
Positional arguments passed to the DoFn, may include side inputs.
Keyword arguments passed to the DoFn.
Usage Example
import apache_beam as beam
class SplitWords(beam.DoFn):
def process(self, element):
words = element.split()
for word in words:
yield word
with beam.Pipeline() as p:
lines = p | beam.Create(['Hello World', 'Apache Beam'])
words = lines | beam.ParDo(SplitWords())
with_exception_handling()
Provides automatic error handling for DoFn processing.
good, bad = (inputs
| beam.Map(maybe_erroring_fn)
.with_exception_handling())
Tag for successfully processed outputs.
exc_class
Exception
default:"Exception"
Exception class or tuple of classes to catch.
Whether to emit partial outputs before errors.
Maximum ratio of bad inputs before pipeline fails (0.0-1.0).
DoFn
Base class for defining custom element-wise processing logic.
class MyDoFn(beam.DoFn):
def process(self, element):
# Process element and yield results
yield element * 2
Lifecycle Methods
setup()
Called once per DoFn instance before processing.
class MyDoFn(beam.DoFn):
def setup(self):
self.connection = create_connection()
def process(self, element):
result = self.connection.query(element)
yield result
start_bundle()
Called at the start of each bundle of elements.
class MyDoFn(beam.DoFn):
def start_bundle(self):
self.batch = []
def process(self, element):
self.batch.append(element)
yield element
process()
Processes a single element. Must be defined.
def process(self, element, timestamp=DoFn.TimestampParam,
window=DoFn.WindowParam):
yield element
finish_bundle()
Called at the end of each bundle.
def finish_bundle(self):
# Flush batch processing
if self.batch:
self.write_batch(self.batch)
teardown()
Called once when the DoFn is discarded.
def teardown(self):
if hasattr(self, 'connection'):
self.connection.close()
Process Parameters
DoFn.process() can accept special parameters:
DoFn.ElementParam: The element being processed (default)
DoFn.TimestampParam: Element’s timestamp
DoFn.WindowParam: Element’s window
DoFn.PaneInfoParam: Pane information for the element
DoFn.SideInputParam: Access to side inputs
class TimestampDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
yield (element, timestamp)
Map, FlatMap, Filter
Convenience transforms for common operations.
Map
Applies a function to each element and outputs the result.
beam.Map(fn, *args, **kwargs)
# Double each number
numbers = p | beam.Create([1, 2, 3, 4, 5])
doubled = numbers | beam.Map(lambda x: x * 2)
# Result: [2, 4, 6, 8, 10]
# With additional arguments
scaled = numbers | beam.Map(lambda x, factor: x * factor, factor=10)
FlatMap
Applies a function that returns an iterable, flattening the results.
beam.FlatMap(fn, *args, **kwargs)
# Split lines into words
lines = p | beam.Create(['Hello World', 'Apache Beam'])
words = lines | beam.FlatMap(lambda line: line.split())
# Result: ['Hello', 'World', 'Apache', 'Beam']
# Generate ranges
numbers = p | beam.Create([3, 5, 2])
ranges = numbers | beam.FlatMap(lambda n: range(n))
# Result: [0, 1, 2, 0, 1, 2, 3, 4, 0, 1]
Filter
Keeps only elements for which the predicate returns True.
# Keep only even numbers
numbers = p | beam.Create([1, 2, 3, 4, 5, 6])
even = numbers | beam.Filter(lambda x: x % 2 == 0)
# Result: [2, 4, 6]
GroupByKey
Groups key-value pairs by key.
import apache_beam as beam
with beam.Pipeline() as p:
pairs = p | beam.Create([
('cat', 1),
('dog', 5),
('cat', 3),
('dog', 2),
('cat', 8)
])
grouped = pairs | beam.GroupByKey()
# Result: [('cat', [1, 3, 8]), ('dog', [5, 2])]
result = grouped | beam.Map(lambda kv: (kv[0], sum(kv[1])))
# Result: [('cat', 12), ('dog', 7)]
GroupByKey requires input elements to be key-value pairs (tuples). The key must be hashable and deterministically encodable.
CombineGlobally
Combines all elements in a PCollection into a single value.
beam.CombineGlobally(fn, *args, **kwargs)
A CombineFn instance or a callable that combines values.
Usage Example
import apache_beam as beam
# Using a simple function
with beam.Pipeline() as p:
numbers = p | beam.Create([1, 2, 3, 4, 5])
total = numbers | beam.CombineGlobally(sum)
# Result: [15]
# Using CombineFn for more control
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0) # (sum, count)
def add_input(self, accumulator, input):
sum_val, count = accumulator
return sum_val + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, accumulator):
sum_val, count = accumulator
return sum_val / count if count else 0
with beam.Pipeline() as p:
numbers = p | beam.Create([1, 2, 3, 4, 5])
average = numbers | beam.CombineGlobally(AverageFn())
# Result: [3.0]
Methods
with_defaults()
Returns a default value when combining empty collections.
total = numbers | beam.CombineGlobally(sum).with_defaults()
# Returns [0] for empty input instead of []
without_defaults()
Does not return a value for empty collections.
total = numbers | beam.CombineGlobally(sum).without_defaults()
# Returns [] for empty input
CombinePerKey
Combines values for each key in a key-value PCollection.
beam.CombinePerKey(fn, *args, **kwargs)
import apache_beam as beam
with beam.Pipeline() as p:
pairs = p | beam.Create([
('cat', 1),
('dog', 5),
('cat', 3),
('dog', 2)
])
sums = pairs | beam.CombinePerKey(sum)
# Result: [('cat', 4), ('dog', 7)]
Flatten
Merges multiple PCollections into a single PCollection.
import apache_beam as beam
with beam.Pipeline() as p:
pc1 = p | 'Create1' >> beam.Create([1, 2, 3])
pc2 = p | 'Create2' >> beam.Create([4, 5, 6])
pc3 = p | 'Create3' >> beam.Create([7, 8, 9])
merged = (pc1, pc2, pc3) | beam.Flatten()
# Result: [1, 2, 3, 4, 5, 6, 7, 8, 9]
All input PCollections must have the same element type.
Create
Creates a PCollection from in-memory data.
beam.Create(values, reshuffle=True)
An iterable of values to create the PCollection from.
Whether to reshuffle the data for better parallelization.
import apache_beam as beam
with beam.Pipeline() as p:
# Create from list
numbers = p | beam.Create([1, 2, 3, 4, 5])
# Create from dict items
pairs = p | beam.Create([('a', 1), ('b', 2)])
# Create from generator
range_values = p | beam.Create(range(10))
CombineFn
Base class for defining custom combining logic.
class MyCombineFn(beam.CombineFn):
def create_accumulator(self):
# Return initial accumulator value
return 0
def add_input(self, accumulator, input):
# Add input to accumulator
return accumulator + input
def merge_accumulators(self, accumulators):
# Merge multiple accumulators
return sum(accumulators)
def extract_output(self, accumulator):
# Extract final output from accumulator
return accumulator
Required Methods
Creates and returns a new accumulator.
add_input
(Accumulator, Input) -> Accumulator
Adds an input element to the accumulator.
merge_accumulators
(Iterable[Accumulator]) -> Accumulator
Merges multiple accumulators into one.
Extracts the final output value from the accumulator.
WindowInto
Assigns elements to windows.
beam.WindowInto(windowfn, trigger=None, accumulation_mode=None)
from apache_beam import window
# Fixed windows of 60 seconds
windowed = (
events
| beam.WindowInto(window.FixedWindows(60))
)
# Sliding windows
windowed = (
events
| beam.WindowInto(window.SlidingWindows(30, 5))
)
# Session windows
windowed = (
events
| beam.WindowInto(window.Sessions(10 * 60)) # 10 minute gap
)