Coders define how to encode and decode values in Apache Beam pipelines. They are used to serialize data when it needs to be materialized, such as when data is shuffled between workers or written to external storage.
Coder
The abstract base class for all coders.
public abstract class Coder<T> implements Serializable
Core Methods
encode()
Encodes a value to an output stream.
public abstract void encode(T value, OutputStream outStream)
throws CoderException, IOException
The output stream to write encoded bytes to
Example:
Coder<String> coder = StringUtf8Coder.of();
ByteArrayOutputStream output = new ByteArrayOutputStream();
coder.encode("hello", output);
byte[] encoded = output.toByteArray();
decode()
Decodes a value from an input stream.
public abstract T decode(InputStream inStream)
throws CoderException, IOException
The input stream to read encoded bytes from
Example:
Coder<String> coder = StringUtf8Coder.of();
ByteArrayInputStream input = new ByteArrayInputStream(encoded);
String decoded = coder.decode(input);
getCoderArguments()
Returns the component coders for parameterized types.
public abstract List<? extends Coder<?>> getCoderArguments()
List of component coders, or empty list if not parameterized
Example:
Coder<List<String>> listCoder = ListCoder.of(StringUtf8Coder.of());
List<? extends Coder<?>> components = listCoder.getCoderArguments();
// components contains [StringUtf8Coder.of()]
verifyDeterministic()
Verifies that this coder is deterministic.
public abstract void verifyDeterministic()
throws NonDeterministicException
Deterministic coders must satisfy:
- Two values that compare as equal produce the same encoding
- The coder always produces a canonical encoding
Example:
try {
coder.verifyDeterministic();
// Coder is deterministic
} catch (NonDeterministicException e) {
// Coder is not deterministic
}
consistentWithEquals()
Returns whether this coder is consistent with equals.
public boolean consistentWithEquals()
True if equal encoded bytes implies equal values
structuralValue()
Returns an object with structural equality for the given value.
public Object structuralValue(T value)
The value to get structural representation for
An object whose equals() represents structural equality
Common Coder Implementations
StringUtf8Coder
Encodes strings in UTF-8.
public class StringUtf8Coder extends AtomicCoder<String>
of()
Returns the singleton instance.
public static StringUtf8Coder of()
Example:
Coder<String> coder = StringUtf8Coder.of();
PCollection<String> strings = ...
strings.setCoder(coder);
VarIntCoder
Encodes integers using variable-length encoding (1-5 bytes).
public class VarIntCoder extends AtomicCoder<Integer>
of()
Returns the singleton instance.
public static VarIntCoder of()
Example:
Coder<Integer> coder = VarIntCoder.of();
Note: Negative numbers always take 5 bytes. Use BigEndianIntegerCoder if numbers are often large or negative.
VarLongCoder
Encodes longs using variable-length encoding.
public class VarLongCoder extends AtomicCoder<Long>
of()
public static VarLongCoder of()
DoubleCoder
Encodes doubles as 8 bytes.
public class DoubleCoder extends AtomicCoder<Double>
of()
public static DoubleCoder of()
BooleanCoder
Encodes booleans as a single byte.
public class BooleanCoder extends AtomicCoder<Boolean>
of()
public static BooleanCoder of()
ByteArrayCoder
Encodes byte arrays with length prefix.
public class ByteArrayCoder extends AtomicCoder<byte[]>
of()
public static ByteArrayCoder of()
InstantCoder
Encodes Joda-Time Instant values.
public class InstantCoder extends AtomicCoder<Instant>
of()
public static InstantCoder of()
Composite Coders
ListCoder
Encodes lists of elements.
public class ListCoder<T> extends StructuredCoder<List<T>>
of(Coder)
Creates a list coder for the given element coder.
public static <T> ListCoder<T> of(Coder<T> elementCoder)
The coder for list elements
Example:
Coder<List<String>> listCoder = ListCoder.of(StringUtf8Coder.of());
SetCoder
Encodes sets of elements.
public class SetCoder<T> extends StructuredCoder<Set<T>>
of(Coder)
public static <T> SetCoder<T> of(Coder<T> elementCoder)
MapCoder
Encodes maps.
public class MapCoder<K, V> extends StructuredCoder<Map<K, V>>
of(Coder, Coder)
Creates a map coder.
public static <K, V> MapCoder<K, V> of(
Coder<K> keyCoder,
Coder<V> valueCoder)
Example:
Coder<Map<String, Integer>> mapCoder = MapCoder.of(
StringUtf8Coder.of(),
VarIntCoder.of()
);
KvCoder
Encodes key-value pairs.
public class KvCoder<K, V> extends StructuredCoder<KV<K, V>>
of(Coder, Coder)
Creates a KV coder.
public static <K, V> KvCoder<K, V> of(
Coder<K> keyCoder,
Coder<V> valueCoder)
Example:
Coder<KV<String, Integer>> kvCoder = KvCoder.of(
StringUtf8Coder.of(),
VarIntCoder.of()
);
IterableCoder
Encodes iterables of elements.
public class IterableCoder<T> extends StructuredCoder<Iterable<T>>
of(Coder)
public static <T> IterableCoder<T> of(Coder<T> elementCoder)
NullableCoder
Wraps a coder to handle null values.
public class NullableCoder<T> extends StructuredCoder<T>
of(Coder)
public static <T> NullableCoder<T> of(Coder<T> valueCoder)
The coder for non-null values
Example:
Coder<String> nullableCoder = NullableCoder.of(StringUtf8Coder.of());
CoderRegistry
Manages coder inference and registration.
public class CoderRegistry
Methods
createDefault()
Creates a registry with default coders.
public static CoderRegistry createDefault()
Example:
CoderRegistry registry = CoderRegistry.createDefault();
getCoder(TypeDescriptor)
Infers a coder for a type.
public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
throws CannotProvideCoderException
typeDescriptor
TypeDescriptor<T>
required
The type to get a coder for
Example:
CoderRegistry registry = pipeline.getCoderRegistry();
Coder<String> coder = registry.getCoder(
TypeDescriptor.of(String.class)
);
registerCoderProvider(CoderProvider)
Registers a custom coder provider.
public void registerCoderProvider(CoderProvider coderProvider)
The coder provider to register
registerCoderForClass(Class, Coder)
Registers a coder for a specific class.
public <T> void registerCoderForClass(
Class<T> clazz,
Coder<T> coder)
The class to register the coder for
The coder to use for the class
Example:
CoderRegistry registry = pipeline.getCoderRegistry();
registry.registerCoderForClass(
MyCustomType.class,
MyCustomCoder.of()
);
Custom Coder Example
Creating a custom coder:
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class MyCustomCoder extends AtomicCoder<MyCustomType> {
private static final MyCustomCoder INSTANCE = new MyCustomCoder();
public static MyCustomCoder of() {
return INSTANCE;
}
private MyCustomCoder() {}
@Override
public void encode(MyCustomType value, OutputStream outStream)
throws CoderException, IOException {
// Encode the value's fields
StringUtf8Coder.of().encode(value.getName(), outStream);
VarIntCoder.of().encode(value.getId(), outStream);
}
@Override
public MyCustomType decode(InputStream inStream)
throws CoderException, IOException {
// Decode the fields
String name = StringUtf8Coder.of().decode(inStream);
int id = VarIntCoder.of().decode(inStream);
return new MyCustomType(name, id);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
// This coder is deterministic
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.emptyList();
}
}
Using Coders with @DefaultCoder
Annotate your class to specify a default coder:
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(MyCustomCoder.class)
public class MyCustomType {
private String name;
private int id;
// Constructor, getters, setters
}
Complete Example
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.Create;
public class CoderExample {
public static void main(String[] args) {
Pipeline p = Pipeline.create();
// Create data with explicit coder
PCollection<String> strings = p.apply(
Create.of("hello", "world")
.withCoder(StringUtf8Coder.of())
);
// Create KV data with composite coder
Coder<KV<String, Integer>> kvCoder = KvCoder.of(
StringUtf8Coder.of(),
VarIntCoder.of()
);
PCollection<KV<String, Integer>> kvs = p.apply(
Create.of(
KV.of("a", 1),
KV.of("b", 2)
).withCoder(kvCoder)
);
// Use coder registry
CoderRegistry registry = p.getCoderRegistry();
registry.registerCoderForClass(
MyClass.class,
MyClassCoder.of()
);
p.run();
}
}