Coders are used to encode and decode data for transmission and storage in Apache Beam pipelines.
Overview
Coders handle the serialization and deserialization of elements in a PCollection. They are essential for:
- Moving data between pipeline stages
- Storing intermediate results
- Shuffling data in GroupByKey operations
- Checkpointing and state management
Coder Base Class
Base class for all coders.
class MyCoder(beam.coders.Coder):
def encode(self, value):
# Return bytes representation
return value.encode('utf-8')
def decode(self, encoded):
# Return decoded value
return encoded.decode('utf-8')
def is_deterministic(self):
# Return True if encoding is deterministic
return True
Methods
Encodes a value to bytes.
Decodes bytes back to the original value.
Returns whether encoding is deterministic. Required for GroupByKey keys.
Estimates the encoded size in bytes.
Common Coders
BytesCoder
Encodes bytes without modification.
from apache_beam.coders import BytesCoder
coder = BytesCoder()
encoded = coder.encode(b'hello')
decoded = coder.decode(encoded)
Properties:
- Deterministic: Yes
- Use case: Binary data, raw bytes
StrUtf8Coder
Encodes strings using UTF-8 encoding.
from apache_beam.coders import StrUtf8Coder
coder = StrUtf8Coder()
encoded = coder.encode('Hello, World!')
decoded = coder.decode(encoded) # Returns 'Hello, World!'
Properties:
- Deterministic: Yes
- Use case: Text data, string elements
VarIntCoder
Encodes integers using variable-length encoding.
from apache_beam.coders import VarIntCoder
coder = VarIntCoder()
encoded = coder.encode(12345)
decoded = coder.decode(encoded) # Returns 12345
Properties:
- Deterministic: Yes
- Use case: Integer values, efficient for small integers
FloatCoder
Encodes floating-point numbers (double precision).
from apache_beam.coders import FloatCoder
coder = FloatCoder()
encoded = coder.encode(3.14159)
decoded = coder.decode(encoded) # Returns 3.14159
Properties:
- Deterministic: No (due to floating-point representation)
- Use case: Floating-point numbers
BooleanCoder
Encodes boolean values.
from apache_beam.coders import BooleanCoder
coder = BooleanCoder()
encoded = coder.encode(True)
decoded = coder.decode(encoded) # Returns True
Properties:
- Deterministic: Yes
- Use case: Boolean flags
TupleCoder
Encodes tuples with a coder for each component.
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder
# Coder for (str, int) tuples
coder = TupleCoder([StrUtf8Coder(), VarIntCoder()])
encoded = coder.encode(('Alice', 30))
decoded = coder.decode(encoded) # Returns ('Alice', 30)
Properties:
- Deterministic: If all component coders are deterministic
- Use case: Key-value pairs, structured data
ListCoder
Encodes lists with a coder for elements.
from apache_beam.coders import ListCoder, VarIntCoder
coder = ListCoder(VarIntCoder())
encoded = coder.encode([1, 2, 3, 4, 5])
decoded = coder.decode(encoded) # Returns [1, 2, 3, 4, 5]
Properties:
- Deterministic: If element coder is deterministic
- Use case: Lists, arrays
IterableCoder
Similar to ListCoder but for iterables.
from apache_beam.coders import IterableCoder, StrUtf8Coder
coder = IterableCoder(StrUtf8Coder())
encoded = coder.encode(['a', 'b', 'c'])
decoded = coder.decode(encoded) # Returns iterable
MapCoder
Encodes dictionaries.
from apache_beam.coders import MapCoder, StrUtf8Coder, VarIntCoder
coder = MapCoder(StrUtf8Coder(), VarIntCoder())
data = {'a': 1, 'b': 2, 'c': 3}
encoded = coder.encode(data)
decoded = coder.decode(encoded) # Returns dictionary
Properties:
- Deterministic: No (dict order may vary)
- Use case: Dictionary/map data
PickleCoder
Encodes arbitrary Python objects using pickle.
from apache_beam.coders import PickleCoder
class CustomClass:
def __init__(self, value):
self.value = value
coder = PickleCoder()
obj = CustomClass(42)
encoded = coder.encode(obj)
decoded = coder.decode(encoded) # Returns CustomClass instance
Properties:
- Deterministic: No
- Use case: Custom objects, fallback coder
PickleCoder is not deterministic and should not be used for GroupByKey keys. Use deterministic coders when possible.
ProtoCoder
Encodes Protocol Buffer messages.
from apache_beam.coders import ProtoCoder
import my_proto_pb2
coder = ProtoCoder(my_proto_pb2.MyMessage)
message = my_proto_pb2.MyMessage(field='value')
encoded = coder.encode(message)
decoded = coder.decode(encoded)
Properties:
- Deterministic: Yes
- Use case: Protocol Buffer messages
AvroGenericCoder
Encodes data using Avro schema.
from apache_beam.coders import AvroGenericCoder
schema = {
'type': 'record',
'name': 'User',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'age', 'type': 'int'}
]
}
coder = AvroGenericCoder(schema)
data = {'name': 'Alice', 'age': 30}
encoded = coder.encode(data)
decoded = coder.decode(encoded)
WindowedValueCoder
Encodes windowed values (value + timestamp + windows).
from apache_beam.coders import WindowedValueCoder, StrUtf8Coder
from apache_beam.transforms.window import GlobalWindow
value_coder = StrUtf8Coder()
window_coder = GlobalWindow().get_window_coder()
coder = WindowedValueCoder(value_coder, window_coder)
Coder Registry
Apache Beam automatically selects coders based on type hints.
from apache_beam.coders import registry
# Get coder for a type
coder = registry.get_coder(int) # Returns VarIntCoder
coder = registry.get_coder(str) # Returns StrUtf8Coder
# Register custom coder
registry.register_coder(MyCustomType, MyCustomCoder)
Type Hints and Coders
import apache_beam as beam
from apache_beam import typehints
with beam.Pipeline() as p:
# Type hints help Beam select the right coder
numbers = (
p
| beam.Create([1, 2, 3])
.with_output_types(int)
)
# For custom types
class Person:
def __init__(self, name: str, age: int):
self.name = name
self.age = age
people = (
p
| beam.Create([Person('Alice', 30)])
.with_output_types(Person)
)
Deterministic Coders
For GroupByKey operations, keys must use deterministic coders.
import apache_beam as beam
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder
with beam.Pipeline() as p:
# Good: deterministic key coder
pairs = p | beam.Create([
('key1', 1),
('key2', 2)
])
grouped = pairs | beam.GroupByKey()
# Uses TupleCoder([StrUtf8Coder(), VarIntCoder()])
# Bad: non-deterministic key (dict)
# This would fail:
# bad_pairs = p | beam.Create([({'a': 1}, 'value')])
# bad_grouped = bad_pairs | beam.GroupByKey()
Custom Coders
Creating a Custom Coder
import apache_beam as beam
from apache_beam.coders import Coder
import json
class JsonCoder(Coder):
"""Encodes objects as JSON."""
def encode(self, value):
return json.dumps(value).encode('utf-8')
def decode(self, encoded):
return json.loads(encoded.decode('utf-8'))
def is_deterministic(self):
# JSON dict encoding order may vary
return False
def estimate_size(self, value):
return len(self.encode(value))
# Use custom coder
with beam.Pipeline() as p:
data = p | beam.Create([{'name': 'Alice', 'age': 30}])
data = data | beam.Map(lambda x: x).with_output_types(
beam.typehints.Dict[str, beam.typehints.Any]
)
Registering Custom Coders
from apache_beam.coders import registry
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
class PersonCoder(Coder):
def encode(self, person):
return f"{person.name},{person.age}".encode('utf-8')
def decode(self, encoded):
name, age = encoded.decode('utf-8').split(',')
return Person(name, int(age))
def is_deterministic(self):
return True
# Register the coder
registry.register_coder(Person, PersonCoder)
# Now Beam will automatically use PersonCoder for Person objects
with beam.Pipeline() as p:
people = p | beam.Create([Person('Alice', 30), Person('Bob', 25)])
# PersonCoder is automatically used
FastPrimitivesCoder
Optimized coder for Python primitives.
from apache_beam.coders import FastPrimitivesCoder
coder = FastPrimitivesCoder()
# Can encode various types
encoded_int = coder.encode(42)
encoded_str = coder.encode("hello")
encoded_list = coder.encode([1, 2, 3])
Coder Best Practices
- Use deterministic coders for keys: GroupByKey requires deterministic key coders
- Prefer built-in coders: They are optimized and well-tested
- Specify coders explicitly when needed: Use
.with_coder() to set specific coders
- Test custom coders: Ensure encode/decode round-trips correctly
- Consider size: Efficient coders reduce shuffle costs
import apache_beam as beam
from apache_beam.coders import VarIntCoder
with beam.Pipeline() as p:
# Explicitly set coder
numbers = (
p
| beam.Create([1, 2, 3])
| beam.Map(lambda x: x * 2).with_output_types(int)
)
# For PCollections after GroupByKey
pairs = p | beam.Create([('a', 1), ('b', 2)])
grouped = pairs | beam.GroupByKey()
# Values are automatically wrapped in IterableCoder