The I/O module provides transforms for reading from and writing to various data sources.
Text I/O
ReadFromText
Reads text files and returns lines as strings.
beam.io.ReadFromText(
file_pattern,
min_bundle_size=0,
compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
validate=True,
skip_header_lines=0,
coder=StrUtf8Coder(),
**kwargs
)
File path or glob pattern to read from. Supports GCS paths (gs://), local paths, and wildcards.
Minimum size of bundles in bytes for splitting files.
compression_type
CompressionTypes
default:"AUTO"
Compression type of the files. Options: AUTO, GZIP, BZIP2, ZSTD, UNCOMPRESSED.
Whether to strip trailing newline characters from each line.
Whether to validate that files exist and are readable.
Number of header lines to skip at the beginning of each file.
coder
Coder
default:"StrUtf8Coder()"
Coder to use for decoding bytes to strings.
Usage Example
import apache_beam as beam
from apache_beam.io import ReadFromText
with beam.Pipeline() as p:
# Read from a single file
lines = p | ReadFromText('data/input.txt')
# Read from multiple files with glob pattern
lines = p | ReadFromText('data/*.txt')
# Read from GCS
lines = p | ReadFromText('gs://my-bucket/data/*.txt')
# Read compressed files
lines = p | ReadFromText(
'data/compressed/*.gz',
compression_type=beam.io.filesystem.CompressionTypes.GZIP
)
# Skip header lines
lines = p | ReadFromText(
'data/with_headers.csv',
skip_header_lines=1
)
WriteToText
Writes elements to text files.
beam.io.WriteToText(
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder=ToBytesCoder(),
compression_type=CompressionTypes.AUTO,
header=None,
footer=None
)
Prefix for output files. The final filename will be prefix-SSSSS-of-NNNNN[suffix].
Suffix to append to each output filename (e.g., ‘.txt’).
Whether to append a newline after each element.
Number of output shards. 0 means runner-determined.
Template for shard naming (e.g., ‘-SS-of-NN’).
coder
Coder
default:"ToBytesCoder()"
Coder to encode elements to bytes.
compression_type
CompressionTypes
default:"AUTO"
Compression type for output files.
Optional header to write at the beginning of each file.
Optional footer to write at the end of each file.
Usage Example
import apache_beam as beam
from apache_beam.io import WriteToText
with beam.Pipeline() as p:
# Write to a single file (or sharded files)
lines = p | beam.Create(['Hello', 'World'])
lines | WriteToText('output/data')
# Creates: output/data-00000-of-00001
# Write with custom suffix
lines | WriteToText('output/data', file_name_suffix='.txt')
# Creates: output/data-00000-of-00001.txt
# Write to multiple shards
lines | WriteToText('output/data', num_shards=5)
# Creates: output/data-00000-of-00005, ..., output/data-00004-of-00005
# Write compressed output
lines | WriteToText(
'output/data',
file_name_suffix='.gz',
compression_type=beam.io.filesystem.CompressionTypes.GZIP
)
# Write with header and footer
lines | WriteToText(
'output/report',
file_name_suffix='.txt',
header='--- Report Start ---',
footer='--- Report End ---'
)
Avro I/O
ReadFromAvro
Reads Avro files.
beam.io.ReadFromAvro(
file_pattern,
min_bundle_size=0,
validate=True,
use_fastavro=True
)
File path or pattern for Avro files.
Whether to use the fastavro library for better performance.
import apache_beam as beam
with beam.Pipeline() as p:
records = p | beam.io.ReadFromAvro('data/*.avro')
# Each record is a dictionary with fields from the Avro schema
WriteToAvro
Writes elements to Avro files.
beam.io.WriteToAvro(
file_path_prefix,
schema,
codec='deflate',
file_name_suffix='',
num_shards=0,
use_fastavro=True
)
Avro schema as JSON string or dictionary.
Compression codec (null, deflate, snappy).
import apache_beam as beam
schema = {
'type': 'record',
'name': 'User',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'age', 'type': 'int'}
]
}
with beam.Pipeline() as p:
users = p | beam.Create([
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25}
])
users | beam.io.WriteToAvro('output/users', schema=schema)
Parquet I/O
ReadFromParquet
Reads Parquet files.
beam.io.ReadFromParquet(
file_pattern,
min_bundle_size=0,
validate=True,
columns=None
)
File path or pattern for Parquet files.
Optional list of column names to read. Reads all columns if None.
import apache_beam as beam
with beam.Pipeline() as p:
# Read all columns
records = p | beam.io.ReadFromParquet('data/*.parquet')
# Read specific columns
records = p | beam.io.ReadFromParquet(
'data/*.parquet',
columns=['name', 'age']
)
WriteToParquet
Writes elements to Parquet files.
beam.io.WriteToParquet(
file_path_prefix,
schema,
codec='snappy',
file_name_suffix='',
num_shards=0
)
import apache_beam as beam
import pyarrow as pa
schema = pa.schema([
('name', pa.string()),
('age', pa.int64())
])
with beam.Pipeline() as p:
users = p | beam.Create([
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25}
])
users | beam.io.WriteToParquet('output/users', schema=schema)
TFRecord I/O
ReadFromTFRecord
Reads TensorFlow TFRecord files.
beam.io.ReadFromTFRecord(
file_pattern,
compression_type=CompressionTypes.AUTO,
validate=True
)
import apache_beam as beam
with beam.Pipeline() as p:
records = p | beam.io.ReadFromTFRecord('data/*.tfrecord')
# Each record is a bytes object
WriteToTFRecord
Writes elements to TFRecord files.
beam.io.WriteToTFRecord(
file_path_prefix,
coder=None,
file_name_suffix='',
num_shards=0,
compression_type=CompressionTypes.AUTO
)
import apache_beam as beam
import tensorflow as tf
with beam.Pipeline() as p:
# Create TF Examples
examples = p | beam.Create([
tf.train.Example(features=tf.train.Features(feature={
'value': tf.train.Feature(int64_list=tf.train.Int64List(value=[1]))
})).SerializeToString()
])
examples | beam.io.WriteToTFRecord('output/examples.tfrecord')
BigQuery I/O
ReadFromBigQuery
Reads from Google BigQuery.
beam.io.ReadFromBigQuery(
table=None,
query=None,
use_standard_sql=True,
flatten_results=True,
project=None,
gcs_location=None
)
BigQuery table reference (project:dataset.table or dataset.table).
SQL query string. Use either table or query, not both.
Whether to use standard SQL (vs legacy SQL).
import apache_beam as beam
with beam.Pipeline() as p:
# Read from table
rows = p | beam.io.ReadFromBigQuery(
table='my-project:my_dataset.my_table'
)
# Read with query
rows = p | beam.io.ReadFromBigQuery(
query='SELECT name, age FROM `my-project.my_dataset.my_table` WHERE age > 25'
)
WriteToBigQuery
Writes to Google BigQuery.
beam.io.WriteToBigQuery(
table,
schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
method=None
)
BigQuery table reference.
Table schema. Required if create_disposition is CREATE_IF_NEEDED.
create_disposition
str
default:"CREATE_IF_NEEDED"
CREATE_IF_NEEDED or CREATE_NEVER.
write_disposition
str
default:"WRITE_APPEND"
WRITE_APPEND, WRITE_TRUNCATE, or WRITE_EMPTY.
import apache_beam as beam
schema = {
'fields': [
{'name': 'name', 'type': 'STRING'},
{'name': 'age', 'type': 'INTEGER'}
]
}
with beam.Pipeline() as p:
users = p | beam.Create([
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25}
])
users | beam.io.WriteToBigQuery(
'my-project:my_dataset.my_table',
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
File I/O
ReadAllFromText
Reads text files given PCollection of file paths.
beam.io.ReadAllFromText(**kwargs)
import apache_beam as beam
with beam.Pipeline() as p:
file_paths = p | beam.Create([
'data/file1.txt',
'data/file2.txt'
])
lines = file_paths | beam.io.ReadAllFromText()
MatchFiles
Matches files based on patterns.
from apache_beam.io.fileio import MatchFiles
with beam.Pipeline() as p:
patterns = p | beam.Create(['data/*.txt'])
matches = patterns | MatchFiles()
# Returns FileMetadata objects
Compression Types
from apache_beam.io.filesystem import CompressionTypes
# Available compression types
CompressionTypes.AUTO # Auto-detect from file extension
CompressionTypes.GZIP # .gz files
CompressionTypes.BZIP2 # .bz2 files
CompressionTypes.ZSTD # .zst files
CompressionTypes.UNCOMPRESSED # No compression