Apache Beam provides I/O connectors for reading from and writing to various data sources and sinks.
Read
Transform for reading from a Source.
Read.from(BoundedSource)
Reads from a bounded source.
public static <T> Bounded<T> from(BoundedSource<T> source)
The bounded source to read from
A PTransform that reads from the source
Example:
Pipeline p = Pipeline.create();
PCollection<String> data = p.apply(
Read.from(new MyBoundedSource())
);
Read.from(UnboundedSource)
Reads from an unbounded (streaming) source.
public static <T> Unbounded<T> from(UnboundedSource<T, ?> source)
source
UnboundedSource<T, ?>
required
The unbounded source to read from
A PTransform that reads from the streaming source
TextIO
Transforms for reading and writing text files.
TextIO.read()
Creates a read transform for text files.
public static Read read()
from(String)
Specifies the file path or pattern to read from.
public Read from(String filepattern)
File path or glob pattern (e.g., “gs://bucket/files*.txt”)
Example:
PCollection<String> lines = pipeline.apply(
TextIO.read().from("/path/to/files*.txt")
);
withDelimiter(byte[])
Sets a custom delimiter for splitting records.
public Read withDelimiter(byte[] delimiter)
The delimiter bytes (default is newline)
withCompression(Compression)
Specifies the compression type.
public Read withCompression(Compression compression)
The compression type (AUTO, GZIP, BZIP2, ZIP, DEFLATE, ZSTD, LZ4, LZOP, UNCOMPRESSED)
Example:
PCollection<String> lines = pipeline.apply(
TextIO.read()
.from("input.txt.gz")
.withCompression(Compression.GZIP)
);
withEmptyMatchTreatment(EmptyMatchTreatment)
Specifies how to handle empty file matches.
public Read withEmptyMatchTreatment(
EmptyMatchTreatment emptyMatchTreatment)
emptyMatchTreatment
EmptyMatchTreatment
required
Treatment for empty matches (ALLOW, DISALLOW, ALLOW_IF_WILDCARD)
watchForNewFiles(Duration, TerminationCondition)
Continuously watches for new files matching the pattern.
public Read watchForNewFiles(
Duration pollInterval,
TerminationCondition<String, ?> terminationCondition)
How often to check for new files
terminationCondition
TerminationCondition
required
When to stop watching for new files
Example:
PCollection<String> lines = pipeline.apply(
TextIO.read()
.from("input*.txt")
.watchForNewFiles(
Duration.standardMinutes(1),
afterTimeSinceNewOutput(Duration.standardHours(1))
)
);
TextIO.write()
Creates a write transform for text files.
public static Write write()
A Write transform builder
to(String)
Specifies the output file prefix.
public Write to(String outputPrefix)
The prefix for output files
Example:
lines.apply(
TextIO.write().to("/path/to/output")
);
withSuffix(String)
Sets the file suffix.
public Write withSuffix(String suffix)
The file suffix (e.g., “.txt”)
withNumShards(int)
Sets the number of output shards.
public Write withNumShards(int numShards)
The number of output files to create (0 means runner-determined)
withCompression(Compression)
Sets the compression type for output files.
public Write withCompression(Compression compression)
Example:
lines.apply(
TextIO.write()
.to("output/results")
.withSuffix(".txt")
.withNumShards(10)
.withCompression(Compression.GZIP)
);
Adds a header to each output file.
public Write withHeader(String header)
The header line to write at the start of each file
Adds a footer to each output file.
public Write withFooter(String footer)
The footer line to write at the end of each file
withWritableByteChannelFactory(WritableByteChannelFactory)
Customizes how output channels are created.
public Write withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory)
TextIO.readFiles()
Reads files that have already been matched.
public static ReadFiles readFiles()
Example:
PCollection<String> lines = pipeline
.apply(FileIO.match().filepattern("*.txt"))
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
FileIO
Low-level file I/O operations.
FileIO.match()
Matches files based on a pattern.
public static Match match()
Example:
PCollection<MatchResult.Metadata> matches = pipeline.apply(
FileIO.match().filepattern("gs://bucket/*.csv")
);
filepattern(String)
Sets the file pattern to match.
public Match filepattern(String filepattern)
The glob pattern for files to match
continuously(Duration, TerminationCondition)
Continuously watches for new files.
public Match continuously(
Duration pollInterval,
TerminationCondition<String, ?> terminationCondition)
FileIO.readMatches()
Reads matched files as ReadableFile objects.
public static ReadMatches readMatches()
Example:
PCollection<ReadableFile> files = pipeline
.apply(FileIO.match().filepattern("*.txt"))
.apply(FileIO.readMatches());
withCompression(Compression)
Sets the compression for reading files.
public ReadMatches withCompression(Compression compression)
withDirectoryTreatment(DirectoryTreatment)
Specifies how to handle directories.
public ReadMatches withDirectoryTreatment(
DirectoryTreatment directoryTreatment)
directoryTreatment
DirectoryTreatment
required
SKIP_DIRECTORIES or PROHIBIT_DIRECTORIES
FileIO.write()
Writes elements to files.
public static <UserT, DestinationT> Write<DestinationT, UserT> write()
Example:
elements.apply(
FileIO.<String>write()
.via(TextIO.sink())
.to("output")
);
via(Sink)
Specifies the sink for writing elements.
public <ElementT> TypedWrite<ElementT, DestinationT, UserT> via(
Sink<ElementT> sink)
to(String)
Sets the output directory or prefix.
public TypedWrite<ElementT, DestinationT, UserT> to(
String outputPrefix)
withNaming(FileNaming)
Customizes output file naming.
public TypedWrite<ElementT, DestinationT, UserT> withNaming(
SerializableFunction<UserT, FileNaming> namingFn)
withNumShards(int)
Sets the number of output shards.
public TypedWrite<ElementT, DestinationT, UserT> withNumShards(
int numShards)
withCompression(Compression)
Sets output compression.
public TypedWrite<ElementT, DestinationT, UserT> withCompression(
Compression compression)
AvroIO
Reads and writes Avro files.
AvroIO.read()
public static <T> Read<T> read(Class<T> recordClass)
Example:
PCollection<MyRecord> records = pipeline.apply(
AvroIO.read(MyRecord.class).from("data/*.avro")
);
AvroIO.write()
public static <T> Write<T> write(Class<T> recordClass)
Example:
records.apply(
AvroIO.write(MyRecord.class).to("output/data")
);
Complete Example
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
public class FileProcessing {
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).create()
);
// Read from text files
p.apply("Read", TextIO.read()
.from("input/*.txt")
.withCompression(Compression.AUTO))
// Process the data
.apply("ToUpper", MapElements
.into(TypeDescriptors.strings())
.via(String::toUpperCase))
// Write to output files
.apply("Write", TextIO.write()
.to("output/results")
.withSuffix(".txt")
.withNumShards(5)
.withHeader("=== Results ===")
.withCompression(Compression.GZIP));
p.run().waitUntilFinish();
}
}