The I/O packages provide transforms for reading from and writing to various data sources and sinks.
textio
Package textio provides transforms for reading and writing text files.
Read
Reads a set of files matching a glob pattern and returns lines as PCollection<string>.
func Read ( s beam . Scope , glob string , opts ... ReadOptionFn ) beam . PCollection
The scope to insert the transform into
File path or glob pattern (e.g., “gs://bucket/.txt”, “/path/to/ .log”)
Optional configuration: ReadAutoCompression(), ReadGzip(), ReadUncompressed()
PCollection<string> of lines (newlines removed)
Example:
import (
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
)
lines := textio . Read ( root , "gs://my-bucket/input/*.txt" )
// With compression options
gzLines := textio . Read ( root , "data/*.gz" , textio . ReadGzip ())
autoLines := textio . Read ( root , "data/*" , textio . ReadAutoCompression ())
ReadAll
Expands and reads filenames from an input PCollection<string> of globs.
func ReadAll ( s beam . Scope , col beam . PCollection , opts ... ReadOptionFn ) beam . PCollection
The scope to insert the transform into
PCollection<string> of file paths or glob patterns
Optional compression configuration
PCollection<string> of all lines from all matched files
Example:
patterns := beam . Create ( root , "data/part-*.txt" , "logs/*.log" )
allLines := textio . ReadAll ( root , patterns )
ReadWithFilename
Reads files and returns PCollection<KV<string,string>> of filename and line.
func ReadWithFilename ( s beam . Scope , glob string , opts ... ReadOptionFn ) beam . PCollection
PCollection<KV<string,string>> where key is filename and value is line content
Example:
filesAndLines := textio . ReadWithFilename ( root , "logs/*.log" )
// Process with filename context
func logWithFile ( filename , line string ) {
log . Printf ( "[ %s ] %s " , filename , line )
}
beam . ParDo0 ( root , logWithFile , filesAndLines )
Write
Writes a PCollection<string> to a file as separate lines.
func Write ( s beam . Scope , filename string , col beam . PCollection )
The scope to insert the transform into
PCollection<string> to write (each element becomes a line)
Example:
lines := beam . Create ( root , "line 1" , "line 2" , "line 3" )
textio . Write ( root , "output.txt" , lines )
// GCS path
textio . Write ( root , "gs://my-bucket/output.txt" , results )
Reads a local file at pipeline construction time and embeds data into the pipeline.
func Immediate ( s beam . Scope , filename string ) ( beam . PCollection , error )
The scope to insert the transform into
Local file path to read immediately
Returns
(beam.PCollection, error)
PCollection<string> with file contents, or error if read fails
Note: Only use for small files as data is embedded in the pipeline.
fileio
Package fileio provides lower-level transforms for matching and reading files with more control.
MatchFiles
Finds all files matching a glob pattern and returns PCollection<FileMetadata>.
func MatchFiles ( s beam . Scope , glob string , opts ... MatchOptionFn ) beam . PCollection
The scope to insert the transform into
File path or glob pattern
Options: MatchEmptyAllow(), MatchEmptyDisallow(), MatchEmptyAllowIfWildcard()
PCollection<FileMetadata> with file path, size, and last modified time
Example:
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio "
matches := fileio . MatchFiles ( root , "data/*.json" )
// Allow empty results
matches := fileio . MatchFiles ( root , "optional/*.txt" , fileio . MatchEmptyAllow ())
MatchAll
Matches files from an input PCollection<string> of glob patterns.
func MatchAll ( s beam . Scope , col beam . PCollection , opts ... MatchOptionFn ) beam . PCollection
PCollection<string> of glob patterns
PCollection<FileMetadata> of all matching files
MatchContinuously
Continuously watches for new files matching a pattern at regular intervals.
func MatchContinuously (
s beam . Scope ,
glob string ,
interval time . Duration ,
opts ... MatchContOptionFn ,
) beam . PCollection
The scope to insert the transform into
How often to check for new files
Options: MatchStart(), MatchEnd(), MatchDuplicateSkip(), MatchDuplicateAllowIfModified(), MatchApplyWindow()
Unbounded PCollection<FileMetadata> of matching files over time
Example:
import " time "
// Check for new files every 30 seconds
newFiles := fileio . MatchContinuously (
root ,
"gs://bucket/incoming/*.json" ,
30 * time . Second ,
fileio . MatchDuplicateSkip (),
)
ReadMatches
Converts PCollection<FileMetadata> to PCollection<ReadableFile>.
func ReadMatches ( s beam . Scope , col beam . PCollection , opts ... ReadOptionFn ) beam . PCollection
The scope to insert the transform into
PCollection<FileMetadata> from MatchFiles/MatchAll
Options: ReadAutoCompression(), ReadGzip(), ReadUncompressed(), ReadDirectorySkip()
PCollection<ReadableFile> ready for reading
Example:
matches := fileio . MatchFiles ( root , "data/*.gz" )
files := fileio . ReadMatches ( root , matches , fileio . ReadGzip ())
// Custom processing
func processFile ( ctx context . Context , file fileio . ReadableFile , emit func ( string )) error {
data , err := file . Read ( ctx )
if err != nil {
return err
}
// Process data...
emit ( string ( data ))
return nil
}
results := beam . ParDo ( root , processFile , files )
Contains metadata about a matched file.
type FileMetadata struct {
Path string
Size int64
LastModified time . Time
}
When the file was last modified
ReadableFile
Wrapper around FileMetadata providing methods to read file contents.
type ReadableFile struct {
Metadata FileMetadata
Compression compressionType
}
Open
Opens the file for reading.
func ( f ReadableFile ) Open ( ctx context . Context ) ( io . ReadCloser , error )
Context for the file operation
Reader for file contents (caller must close) or error
Example:
func processFile ( ctx context . Context , file fileio . ReadableFile ) error {
reader , err := file . Open ( ctx )
if err != nil {
return err
}
defer reader . Close ()
scanner := bufio . NewScanner ( reader )
for scanner . Scan () {
line := scanner . Text ()
// Process line...
}
return scanner . Err ()
}
Read
Reads the entire file into memory.
func ( f ReadableFile ) Read ( ctx context . Context ) ([] byte , error )
Context for the file operation
Complete file contents or error
ReadString
Reads the entire file as a string.
func ( f ReadableFile ) ReadString ( ctx context . Context ) ( string , error )
File contents as string or error
Other I/O Packages
The Go SDK includes I/O connectors for various data sources:
avroio
Read and write Avro files.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio "
bigqueryio
Read from and write to Google BigQuery.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio "
parquetio
Read and write Parquet files.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/parquetio "
pubsubio
Read from and write to Google Cloud Pub/Sub.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio "
databaseio
Generic database I/O for SQL databases.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/databaseio "
mongodbio
Read from and write to MongoDB.
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio "
Complete Example
package main
import (
" context "
" strings "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
" github.com/apache/beam/sdks/v2/go/pkg/beam/register "
" github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx "
)
func init () {
register . Function1x2 ( extractWords )
register . Function2x1 ( sumCounts )
register . Function1x1 ( formatOutput )
}
func extractWords ( line string ) ( string , int ) {
words := strings . Fields ( line )
for _ , word := range words {
// Would emit each word; simplified here
}
return word , 1
}
func sumCounts ( a , b int ) int {
return a + b
}
func formatOutput ( word string , count int ) string {
return fmt . Sprintf ( " %s : %d " , word , count )
}
func main () {
beam . Init ()
p := beam . NewPipeline ()
root := p . Root ()
// Read input files
lines := textio . Read ( root , "input/*.txt" )
// Count words
wordCounts := beam . ParDo ( root , extractWords , lines )
grouped := beam . CombinePerKey ( root , sumCounts , wordCounts )
// Format and write output
formatted := beam . ParDo ( root , formatOutput , grouped )
textio . Write ( root , "output.txt" , formatted )
if err := beamx . Run ( context . Background (), p ); err != nil {
log . Fatalf ( "Failed to execute pipeline: %v " , err )
}
}
Best Practices
Use appropriate URI schemes: gs:// for GCS, s3:// for S3, local paths for local files
Glob patterns support * and ** wildcards
Always validate file paths before pipeline execution
Use ReadAutoCompression() for mixed compression types
Explicit compression options are faster (no detection overhead)
Gzip is automatically detected from .gz extension
Use MatchContinuously for streaming file ingestion
Configure deduplication based on your use case
Consider windowing strategies for time-based processing