Skip to main content
The I/O module provides transforms for reading from and writing to various data sources.

Text I/O

ReadFromText

Reads text files and returns lines as strings.
beam.io.ReadFromText(
    file_pattern,
    min_bundle_size=0,
    compression_type=CompressionTypes.AUTO,
    strip_trailing_newlines=True,
    validate=True,
    skip_header_lines=0,
    coder=StrUtf8Coder(),
    **kwargs
)
file_pattern
str
File path or glob pattern to read from. Supports GCS paths (gs://), local paths, and wildcards.
min_bundle_size
int
default:"0"
Minimum size of bundles in bytes for splitting files.
compression_type
CompressionTypes
default:"AUTO"
Compression type of the files. Options: AUTO, GZIP, BZIP2, ZSTD, UNCOMPRESSED.
strip_trailing_newlines
bool
default:"True"
Whether to strip trailing newline characters from each line.
validate
bool
default:"True"
Whether to validate that files exist and are readable.
skip_header_lines
int
default:"0"
Number of header lines to skip at the beginning of each file.
coder
Coder
default:"StrUtf8Coder()"
Coder to use for decoding bytes to strings.

Usage Example

import apache_beam as beam
from apache_beam.io import ReadFromText

with beam.Pipeline() as p:
    # Read from a single file
    lines = p | ReadFromText('data/input.txt')
    
    # Read from multiple files with glob pattern
    lines = p | ReadFromText('data/*.txt')
    
    # Read from GCS
    lines = p | ReadFromText('gs://my-bucket/data/*.txt')
    
    # Read compressed files
    lines = p | ReadFromText(
        'data/compressed/*.gz',
        compression_type=beam.io.filesystem.CompressionTypes.GZIP
    )
    
    # Skip header lines
    lines = p | ReadFromText(
        'data/with_headers.csv',
        skip_header_lines=1
    )

WriteToText

Writes elements to text files.
beam.io.WriteToText(
    file_path_prefix,
    file_name_suffix='',
    append_trailing_newlines=True,
    num_shards=0,
    shard_name_template=None,
    coder=ToBytesCoder(),
    compression_type=CompressionTypes.AUTO,
    header=None,
    footer=None
)
file_path_prefix
str
Prefix for output files. The final filename will be prefix-SSSSS-of-NNNNN[suffix].
file_name_suffix
str
default:"''"
Suffix to append to each output filename (e.g., ‘.txt’).
append_trailing_newlines
bool
default:"True"
Whether to append a newline after each element.
num_shards
int
default:"0"
Number of output shards. 0 means runner-determined.
shard_name_template
str
Template for shard naming (e.g., ‘-SS-of-NN’).
coder
Coder
default:"ToBytesCoder()"
Coder to encode elements to bytes.
compression_type
CompressionTypes
default:"AUTO"
Compression type for output files.
header
str
Optional header to write at the beginning of each file.
Optional footer to write at the end of each file.

Usage Example

import apache_beam as beam
from apache_beam.io import WriteToText

with beam.Pipeline() as p:
    # Write to a single file (or sharded files)
    lines = p | beam.Create(['Hello', 'World'])
    lines | WriteToText('output/data')
    # Creates: output/data-00000-of-00001
    
    # Write with custom suffix
    lines | WriteToText('output/data', file_name_suffix='.txt')
    # Creates: output/data-00000-of-00001.txt
    
    # Write to multiple shards
    lines | WriteToText('output/data', num_shards=5)
    # Creates: output/data-00000-of-00005, ..., output/data-00004-of-00005
    
    # Write compressed output
    lines | WriteToText(
        'output/data',
        file_name_suffix='.gz',
        compression_type=beam.io.filesystem.CompressionTypes.GZIP
    )
    
    # Write with header and footer
    lines | WriteToText(
        'output/report',
        file_name_suffix='.txt',
        header='--- Report Start ---',
        footer='--- Report End ---'
    )

Avro I/O

ReadFromAvro

Reads Avro files.
beam.io.ReadFromAvro(
    file_pattern,
    min_bundle_size=0,
    validate=True,
    use_fastavro=True
)
file_pattern
str
File path or pattern for Avro files.
use_fastavro
bool
default:"True"
Whether to use the fastavro library for better performance.
import apache_beam as beam

with beam.Pipeline() as p:
    records = p | beam.io.ReadFromAvro('data/*.avro')
    # Each record is a dictionary with fields from the Avro schema

WriteToAvro

Writes elements to Avro files.
beam.io.WriteToAvro(
    file_path_prefix,
    schema,
    codec='deflate',
    file_name_suffix='',
    num_shards=0,
    use_fastavro=True
)
file_path_prefix
str
Output file path prefix.
schema
str or dict
Avro schema as JSON string or dictionary.
codec
str
default:"deflate"
Compression codec (null, deflate, snappy).
import apache_beam as beam

schema = {
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

with beam.Pipeline() as p:
    users = p | beam.Create([
        {'name': 'Alice', 'age': 30},
        {'name': 'Bob', 'age': 25}
    ])
    users | beam.io.WriteToAvro('output/users', schema=schema)

Parquet I/O

ReadFromParquet

Reads Parquet files.
beam.io.ReadFromParquet(
    file_pattern,
    min_bundle_size=0,
    validate=True,
    columns=None
)
file_pattern
str
File path or pattern for Parquet files.
columns
list[str]
Optional list of column names to read. Reads all columns if None.
import apache_beam as beam

with beam.Pipeline() as p:
    # Read all columns
    records = p | beam.io.ReadFromParquet('data/*.parquet')
    
    # Read specific columns
    records = p | beam.io.ReadFromParquet(
        'data/*.parquet',
        columns=['name', 'age']
    )

WriteToParquet

Writes elements to Parquet files.
beam.io.WriteToParquet(
    file_path_prefix,
    schema,
    codec='snappy',
    file_name_suffix='',
    num_shards=0
)
import apache_beam as beam
import pyarrow as pa

schema = pa.schema([
    ('name', pa.string()),
    ('age', pa.int64())
])

with beam.Pipeline() as p:
    users = p | beam.Create([
        {'name': 'Alice', 'age': 30},
        {'name': 'Bob', 'age': 25}
    ])
    users | beam.io.WriteToParquet('output/users', schema=schema)

TFRecord I/O

ReadFromTFRecord

Reads TensorFlow TFRecord files.
beam.io.ReadFromTFRecord(
    file_pattern,
    compression_type=CompressionTypes.AUTO,
    validate=True
)
import apache_beam as beam

with beam.Pipeline() as p:
    records = p | beam.io.ReadFromTFRecord('data/*.tfrecord')
    # Each record is a bytes object

WriteToTFRecord

Writes elements to TFRecord files.
beam.io.WriteToTFRecord(
    file_path_prefix,
    coder=None,
    file_name_suffix='',
    num_shards=0,
    compression_type=CompressionTypes.AUTO
)
import apache_beam as beam
import tensorflow as tf

with beam.Pipeline() as p:
    # Create TF Examples
    examples = p | beam.Create([
        tf.train.Example(features=tf.train.Features(feature={
            'value': tf.train.Feature(int64_list=tf.train.Int64List(value=[1]))
        })).SerializeToString()
    ])
    
    examples | beam.io.WriteToTFRecord('output/examples.tfrecord')

BigQuery I/O

ReadFromBigQuery

Reads from Google BigQuery.
beam.io.ReadFromBigQuery(
    table=None,
    query=None,
    use_standard_sql=True,
    flatten_results=True,
    project=None,
    gcs_location=None
)
table
str
BigQuery table reference (project:dataset.table or dataset.table).
query
str
SQL query string. Use either table or query, not both.
use_standard_sql
bool
default:"True"
Whether to use standard SQL (vs legacy SQL).
import apache_beam as beam

with beam.Pipeline() as p:
    # Read from table
    rows = p | beam.io.ReadFromBigQuery(
        table='my-project:my_dataset.my_table'
    )
    
    # Read with query
    rows = p | beam.io.ReadFromBigQuery(
        query='SELECT name, age FROM `my-project.my_dataset.my_table` WHERE age > 25'
    )

WriteToBigQuery

Writes to Google BigQuery.
beam.io.WriteToBigQuery(
    table,
    schema=None,
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND,
    method=None
)
table
str
BigQuery table reference.
schema
str or dict
Table schema. Required if create_disposition is CREATE_IF_NEEDED.
create_disposition
str
default:"CREATE_IF_NEEDED"
CREATE_IF_NEEDED or CREATE_NEVER.
write_disposition
str
default:"WRITE_APPEND"
WRITE_APPEND, WRITE_TRUNCATE, or WRITE_EMPTY.
import apache_beam as beam

schema = {
    'fields': [
        {'name': 'name', 'type': 'STRING'},
        {'name': 'age', 'type': 'INTEGER'}
    ]
}

with beam.Pipeline() as p:
    users = p | beam.Create([
        {'name': 'Alice', 'age': 30},
        {'name': 'Bob', 'age': 25}
    ])
    
    users | beam.io.WriteToBigQuery(
        'my-project:my_dataset.my_table',
        schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    )

File I/O

ReadAllFromText

Reads text files given PCollection of file paths.
beam.io.ReadAllFromText(**kwargs)
import apache_beam as beam

with beam.Pipeline() as p:
    file_paths = p | beam.Create([
        'data/file1.txt',
        'data/file2.txt'
    ])
    
    lines = file_paths | beam.io.ReadAllFromText()

MatchFiles

Matches files based on patterns.
from apache_beam.io.fileio import MatchFiles

with beam.Pipeline() as p:
    patterns = p | beam.Create(['data/*.txt'])
    matches = patterns | MatchFiles()
    # Returns FileMetadata objects

Compression Types

from apache_beam.io.filesystem import CompressionTypes

# Available compression types
CompressionTypes.AUTO      # Auto-detect from file extension
CompressionTypes.GZIP       # .gz files
CompressionTypes.BZIP2      # .bz2 files
CompressionTypes.ZSTD       # .zst files
CompressionTypes.UNCOMPRESSED  # No compression

Build docs developers (and LLMs) love