Skip to main content
Coders are used to encode and decode data for transmission and storage in Apache Beam pipelines.

Overview

Coders handle the serialization and deserialization of elements in a PCollection. They are essential for:
  • Moving data between pipeline stages
  • Storing intermediate results
  • Shuffling data in GroupByKey operations
  • Checkpointing and state management

Coder Base Class

Base class for all coders.
class MyCoder(beam.coders.Coder):
    def encode(self, value):
        # Return bytes representation
        return value.encode('utf-8')
    
    def decode(self, encoded):
        # Return decoded value
        return encoded.decode('utf-8')
    
    def is_deterministic(self):
        # Return True if encoding is deterministic
        return True

Methods

encode
(value: T) -> bytes
Encodes a value to bytes.
decode
(encoded: bytes) -> T
Decodes bytes back to the original value.
is_deterministic
() -> bool
Returns whether encoding is deterministic. Required for GroupByKey keys.
estimate_size
(value: T) -> int
Estimates the encoded size in bytes.

Common Coders

BytesCoder

Encodes bytes without modification.
from apache_beam.coders import BytesCoder

coder = BytesCoder()
encoded = coder.encode(b'hello')
decoded = coder.decode(encoded)
Properties:
  • Deterministic: Yes
  • Use case: Binary data, raw bytes

StrUtf8Coder

Encodes strings using UTF-8 encoding.
from apache_beam.coders import StrUtf8Coder

coder = StrUtf8Coder()
encoded = coder.encode('Hello, World!')
decoded = coder.decode(encoded)  # Returns 'Hello, World!'
Properties:
  • Deterministic: Yes
  • Use case: Text data, string elements

VarIntCoder

Encodes integers using variable-length encoding.
from apache_beam.coders import VarIntCoder

coder = VarIntCoder()
encoded = coder.encode(12345)
decoded = coder.decode(encoded)  # Returns 12345
Properties:
  • Deterministic: Yes
  • Use case: Integer values, efficient for small integers

FloatCoder

Encodes floating-point numbers (double precision).
from apache_beam.coders import FloatCoder

coder = FloatCoder()
encoded = coder.encode(3.14159)
decoded = coder.decode(encoded)  # Returns 3.14159
Properties:
  • Deterministic: No (due to floating-point representation)
  • Use case: Floating-point numbers

BooleanCoder

Encodes boolean values.
from apache_beam.coders import BooleanCoder

coder = BooleanCoder()
encoded = coder.encode(True)
decoded = coder.decode(encoded)  # Returns True
Properties:
  • Deterministic: Yes
  • Use case: Boolean flags

TupleCoder

Encodes tuples with a coder for each component.
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder

# Coder for (str, int) tuples
coder = TupleCoder([StrUtf8Coder(), VarIntCoder()])
encoded = coder.encode(('Alice', 30))
decoded = coder.decode(encoded)  # Returns ('Alice', 30)
Properties:
  • Deterministic: If all component coders are deterministic
  • Use case: Key-value pairs, structured data

ListCoder

Encodes lists with a coder for elements.
from apache_beam.coders import ListCoder, VarIntCoder

coder = ListCoder(VarIntCoder())
encoded = coder.encode([1, 2, 3, 4, 5])
decoded = coder.decode(encoded)  # Returns [1, 2, 3, 4, 5]
Properties:
  • Deterministic: If element coder is deterministic
  • Use case: Lists, arrays

IterableCoder

Similar to ListCoder but for iterables.
from apache_beam.coders import IterableCoder, StrUtf8Coder

coder = IterableCoder(StrUtf8Coder())
encoded = coder.encode(['a', 'b', 'c'])
decoded = coder.decode(encoded)  # Returns iterable

MapCoder

Encodes dictionaries.
from apache_beam.coders import MapCoder, StrUtf8Coder, VarIntCoder

coder = MapCoder(StrUtf8Coder(), VarIntCoder())
data = {'a': 1, 'b': 2, 'c': 3}
encoded = coder.encode(data)
decoded = coder.decode(encoded)  # Returns dictionary
Properties:
  • Deterministic: No (dict order may vary)
  • Use case: Dictionary/map data

PickleCoder

Encodes arbitrary Python objects using pickle.
from apache_beam.coders import PickleCoder

class CustomClass:
    def __init__(self, value):
        self.value = value

coder = PickleCoder()
obj = CustomClass(42)
encoded = coder.encode(obj)
decoded = coder.decode(encoded)  # Returns CustomClass instance
Properties:
  • Deterministic: No
  • Use case: Custom objects, fallback coder
PickleCoder is not deterministic and should not be used for GroupByKey keys. Use deterministic coders when possible.

ProtoCoder

Encodes Protocol Buffer messages.
from apache_beam.coders import ProtoCoder
import my_proto_pb2

coder = ProtoCoder(my_proto_pb2.MyMessage)
message = my_proto_pb2.MyMessage(field='value')
encoded = coder.encode(message)
decoded = coder.decode(encoded)
Properties:
  • Deterministic: Yes
  • Use case: Protocol Buffer messages

AvroGenericCoder

Encodes data using Avro schema.
from apache_beam.coders import AvroGenericCoder

schema = {
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

coder = AvroGenericCoder(schema)
data = {'name': 'Alice', 'age': 30}
encoded = coder.encode(data)
decoded = coder.decode(encoded)

WindowedValueCoder

Encodes windowed values (value + timestamp + windows).
from apache_beam.coders import WindowedValueCoder, StrUtf8Coder
from apache_beam.transforms.window import GlobalWindow

value_coder = StrUtf8Coder()
window_coder = GlobalWindow().get_window_coder()

coder = WindowedValueCoder(value_coder, window_coder)

Coder Registry

Apache Beam automatically selects coders based on type hints.
from apache_beam.coders import registry

# Get coder for a type
coder = registry.get_coder(int)  # Returns VarIntCoder
coder = registry.get_coder(str)  # Returns StrUtf8Coder

# Register custom coder
registry.register_coder(MyCustomType, MyCustomCoder)

Type Hints and Coders

import apache_beam as beam
from apache_beam import typehints

with beam.Pipeline() as p:
    # Type hints help Beam select the right coder
    numbers = (
        p 
        | beam.Create([1, 2, 3])
        .with_output_types(int)
    )
    
    # For custom types
    class Person:
        def __init__(self, name: str, age: int):
            self.name = name
            self.age = age
    
    people = (
        p
        | beam.Create([Person('Alice', 30)])
        .with_output_types(Person)
    )

Deterministic Coders

For GroupByKey operations, keys must use deterministic coders.
import apache_beam as beam
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder

with beam.Pipeline() as p:
    # Good: deterministic key coder
    pairs = p | beam.Create([
        ('key1', 1),
        ('key2', 2)
    ])
    grouped = pairs | beam.GroupByKey()
    # Uses TupleCoder([StrUtf8Coder(), VarIntCoder()])
    
    # Bad: non-deterministic key (dict)
    # This would fail:
    # bad_pairs = p | beam.Create([({'a': 1}, 'value')])
    # bad_grouped = bad_pairs | beam.GroupByKey()

Custom Coders

Creating a Custom Coder

import apache_beam as beam
from apache_beam.coders import Coder
import json

class JsonCoder(Coder):
    """Encodes objects as JSON."""
    
    def encode(self, value):
        return json.dumps(value).encode('utf-8')
    
    def decode(self, encoded):
        return json.loads(encoded.decode('utf-8'))
    
    def is_deterministic(self):
        # JSON dict encoding order may vary
        return False
    
    def estimate_size(self, value):
        return len(self.encode(value))

# Use custom coder
with beam.Pipeline() as p:
    data = p | beam.Create([{'name': 'Alice', 'age': 30}])
    data = data | beam.Map(lambda x: x).with_output_types(
        beam.typehints.Dict[str, beam.typehints.Any]
    )

Registering Custom Coders

from apache_beam.coders import registry

class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

class PersonCoder(Coder):
    def encode(self, person):
        return f"{person.name},{person.age}".encode('utf-8')
    
    def decode(self, encoded):
        name, age = encoded.decode('utf-8').split(',')
        return Person(name, int(age))
    
    def is_deterministic(self):
        return True

# Register the coder
registry.register_coder(Person, PersonCoder)

# Now Beam will automatically use PersonCoder for Person objects
with beam.Pipeline() as p:
    people = p | beam.Create([Person('Alice', 30), Person('Bob', 25)])
    # PersonCoder is automatically used

FastPrimitivesCoder

Optimized coder for Python primitives.
from apache_beam.coders import FastPrimitivesCoder

coder = FastPrimitivesCoder()

# Can encode various types
encoded_int = coder.encode(42)
encoded_str = coder.encode("hello")
encoded_list = coder.encode([1, 2, 3])

Coder Best Practices

  1. Use deterministic coders for keys: GroupByKey requires deterministic key coders
  2. Prefer built-in coders: They are optimized and well-tested
  3. Specify coders explicitly when needed: Use .with_coder() to set specific coders
  4. Test custom coders: Ensure encode/decode round-trips correctly
  5. Consider size: Efficient coders reduce shuffle costs
import apache_beam as beam
from apache_beam.coders import VarIntCoder

with beam.Pipeline() as p:
    # Explicitly set coder
    numbers = (
        p 
        | beam.Create([1, 2, 3])
        | beam.Map(lambda x: x * 2).with_output_types(int)
    )
    
    # For PCollections after GroupByKey
    pairs = p | beam.Create([('a', 1), ('b', 2)])
    grouped = pairs | beam.GroupByKey()
    # Values are automatically wrapped in IterableCoder

Build docs developers (and LLMs) love