Skip to main content
Coders define how to encode and decode values in Apache Beam pipelines. They are used to serialize data when it needs to be materialized, such as when data is shuffled between workers or written to external storage.

Coder

The abstract base class for all coders.
public abstract class Coder<T> implements Serializable

Core Methods

encode()

Encodes a value to an output stream.
public abstract void encode(T value, OutputStream outStream)
    throws CoderException, IOException
value
T
required
The value to encode
outStream
OutputStream
required
The output stream to write encoded bytes to
Example:
Coder<String> coder = StringUtf8Coder.of();
ByteArrayOutputStream output = new ByteArrayOutputStream();
coder.encode("hello", output);
byte[] encoded = output.toByteArray();

decode()

Decodes a value from an input stream.
public abstract T decode(InputStream inStream)
    throws CoderException, IOException
inStream
InputStream
required
The input stream to read encoded bytes from
value
T
The decoded value
Example:
Coder<String> coder = StringUtf8Coder.of();
ByteArrayInputStream input = new ByteArrayInputStream(encoded);
String decoded = coder.decode(input);

getCoderArguments()

Returns the component coders for parameterized types.
public abstract List<? extends Coder<?>> getCoderArguments()
coders
List<? extends Coder<?>>
List of component coders, or empty list if not parameterized
Example:
Coder<List<String>> listCoder = ListCoder.of(StringUtf8Coder.of());
List<? extends Coder<?>> components = listCoder.getCoderArguments();
// components contains [StringUtf8Coder.of()]

verifyDeterministic()

Verifies that this coder is deterministic.
public abstract void verifyDeterministic()
    throws NonDeterministicException
Deterministic coders must satisfy:
  • Two values that compare as equal produce the same encoding
  • The coder always produces a canonical encoding
Example:
try {
    coder.verifyDeterministic();
    // Coder is deterministic
} catch (NonDeterministicException e) {
    // Coder is not deterministic
}

consistentWithEquals()

Returns whether this coder is consistent with equals.
public boolean consistentWithEquals()
consistent
boolean
True if equal encoded bytes implies equal values

structuralValue()

Returns an object with structural equality for the given value.
public Object structuralValue(T value)
value
T
required
The value to get structural representation for
structural
Object
An object whose equals() represents structural equality

Common Coder Implementations

StringUtf8Coder

Encodes strings in UTF-8.
public class StringUtf8Coder extends AtomicCoder<String>

of()

Returns the singleton instance.
public static StringUtf8Coder of()
Example:
Coder<String> coder = StringUtf8Coder.of();
PCollection<String> strings = ...
strings.setCoder(coder);

VarIntCoder

Encodes integers using variable-length encoding (1-5 bytes).
public class VarIntCoder extends AtomicCoder<Integer>

of()

Returns the singleton instance.
public static VarIntCoder of()
Example:
Coder<Integer> coder = VarIntCoder.of();
Note: Negative numbers always take 5 bytes. Use BigEndianIntegerCoder if numbers are often large or negative.

VarLongCoder

Encodes longs using variable-length encoding.
public class VarLongCoder extends AtomicCoder<Long>

of()

public static VarLongCoder of()

DoubleCoder

Encodes doubles as 8 bytes.
public class DoubleCoder extends AtomicCoder<Double>

of()

public static DoubleCoder of()

BooleanCoder

Encodes booleans as a single byte.
public class BooleanCoder extends AtomicCoder<Boolean>

of()

public static BooleanCoder of()

ByteArrayCoder

Encodes byte arrays with length prefix.
public class ByteArrayCoder extends AtomicCoder<byte[]>

of()

public static ByteArrayCoder of()

InstantCoder

Encodes Joda-Time Instant values.
public class InstantCoder extends AtomicCoder<Instant>

of()

public static InstantCoder of()

Composite Coders

ListCoder

Encodes lists of elements.
public class ListCoder<T> extends StructuredCoder<List<T>>

of(Coder)

Creates a list coder for the given element coder.
public static <T> ListCoder<T> of(Coder<T> elementCoder)
elementCoder
Coder<T>
required
The coder for list elements
Example:
Coder<List<String>> listCoder = ListCoder.of(StringUtf8Coder.of());

SetCoder

Encodes sets of elements.
public class SetCoder<T> extends StructuredCoder<Set<T>>

of(Coder)

public static <T> SetCoder<T> of(Coder<T> elementCoder)

MapCoder

Encodes maps.
public class MapCoder<K, V> extends StructuredCoder<Map<K, V>>

of(Coder, Coder)

Creates a map coder.
public static <K, V> MapCoder<K, V> of(
    Coder<K> keyCoder,
    Coder<V> valueCoder)
keyCoder
Coder<K>
required
The coder for map keys
valueCoder
Coder<V>
required
The coder for map values
Example:
Coder<Map<String, Integer>> mapCoder = MapCoder.of(
    StringUtf8Coder.of(),
    VarIntCoder.of()
);

KvCoder

Encodes key-value pairs.
public class KvCoder<K, V> extends StructuredCoder<KV<K, V>>

of(Coder, Coder)

Creates a KV coder.
public static <K, V> KvCoder<K, V> of(
    Coder<K> keyCoder,
    Coder<V> valueCoder)
keyCoder
Coder<K>
required
The coder for keys
valueCoder
Coder<V>
required
The coder for values
Example:
Coder<KV<String, Integer>> kvCoder = KvCoder.of(
    StringUtf8Coder.of(),
    VarIntCoder.of()
);

IterableCoder

Encodes iterables of elements.
public class IterableCoder<T> extends StructuredCoder<Iterable<T>>

of(Coder)

public static <T> IterableCoder<T> of(Coder<T> elementCoder)

NullableCoder

Wraps a coder to handle null values.
public class NullableCoder<T> extends StructuredCoder<T>

of(Coder)

public static <T> NullableCoder<T> of(Coder<T> valueCoder)
valueCoder
Coder<T>
required
The coder for non-null values
Example:
Coder<String> nullableCoder = NullableCoder.of(StringUtf8Coder.of());

CoderRegistry

Manages coder inference and registration.
public class CoderRegistry

Methods

createDefault()

Creates a registry with default coders.
public static CoderRegistry createDefault()
Example:
CoderRegistry registry = CoderRegistry.createDefault();

getCoder(TypeDescriptor)

Infers a coder for a type.
public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
    throws CannotProvideCoderException
typeDescriptor
TypeDescriptor<T>
required
The type to get a coder for
coder
Coder<T>
The inferred coder
Example:
CoderRegistry registry = pipeline.getCoderRegistry();
Coder<String> coder = registry.getCoder(
    TypeDescriptor.of(String.class)
);

registerCoderProvider(CoderProvider)

Registers a custom coder provider.
public void registerCoderProvider(CoderProvider coderProvider)
coderProvider
CoderProvider
required
The coder provider to register

registerCoderForClass(Class, Coder)

Registers a coder for a specific class.
public <T> void registerCoderForClass(
    Class<T> clazz,
    Coder<T> coder)
clazz
Class<T>
required
The class to register the coder for
coder
Coder<T>
required
The coder to use for the class
Example:
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(
    MyCustomType.class,
    MyCustomCoder.of()
);

Custom Coder Example

Creating a custom coder:
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class MyCustomCoder extends AtomicCoder<MyCustomType> {
    
    private static final MyCustomCoder INSTANCE = new MyCustomCoder();
    
    public static MyCustomCoder of() {
        return INSTANCE;
    }
    
    private MyCustomCoder() {}
    
    @Override
    public void encode(MyCustomType value, OutputStream outStream)
            throws CoderException, IOException {
        // Encode the value's fields
        StringUtf8Coder.of().encode(value.getName(), outStream);
        VarIntCoder.of().encode(value.getId(), outStream);
    }
    
    @Override
    public MyCustomType decode(InputStream inStream)
            throws CoderException, IOException {
        // Decode the fields
        String name = StringUtf8Coder.of().decode(inStream);
        int id = VarIntCoder.of().decode(inStream);
        return new MyCustomType(name, id);
    }
    
    @Override
    public void verifyDeterministic() throws NonDeterministicException {
        // This coder is deterministic
    }
    
    @Override
    public List<? extends Coder<?>> getCoderArguments() {
        return Collections.emptyList();
    }
}

Using Coders with @DefaultCoder

Annotate your class to specify a default coder:
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(MyCustomCoder.class)
public class MyCustomType {
    private String name;
    private int id;
    
    // Constructor, getters, setters
}

Complete Example

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.Create;

public class CoderExample {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();
        
        // Create data with explicit coder
        PCollection<String> strings = p.apply(
            Create.of("hello", "world")
                  .withCoder(StringUtf8Coder.of())
        );
        
        // Create KV data with composite coder
        Coder<KV<String, Integer>> kvCoder = KvCoder.of(
            StringUtf8Coder.of(),
            VarIntCoder.of()
        );
        
        PCollection<KV<String, Integer>> kvs = p.apply(
            Create.of(
                KV.of("a", 1),
                KV.of("b", 2)
            ).withCoder(kvCoder)
        );
        
        // Use coder registry
        CoderRegistry registry = p.getCoderRegistry();
        registry.registerCoderForClass(
            MyClass.class,
            MyClassCoder.of()
        );
        
        p.run();
    }
}

Build docs developers (and LLMs) love