Skip to main content
The Apache Beam Go SDK brings the power of Beam to Go developers, offering native Go idioms and excellent performance for data processing pipelines.

Installation

1

Install Go

Ensure you have Go 1.21 or later installed:
go version
2

Create a new Go module

mkdir my-beam-pipeline
cd my-beam-pipeline
go mod init github.com/yourusername/my-beam-pipeline
3

Install Apache Beam Go SDK

go get github.com/apache/beam/sdks/v2/go/pkg/beam
4

Install additional packages (optional)

# For I/O connectors
go get github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio
go get github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio

# For running pipelines
go get github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx

Quick Start

Here’s a complete word count example:
package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "regexp"
    "strings"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

var (
    input  = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File to read.")
    output = flag.String("output", "output.txt", "Output file.")
)

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

// extractWords splits a line into words.
func extractWords(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}

// formatCounts formats a word count as a string.
func formatCounts(word string, count int) string {
    return fmt.Sprintf("%s: %d", word, count)
}

func init() {
    // Register functions for serialization
    register.Function2x0(extractWords)
    register.Function2x1(formatCounts)
}

func main() {
    flag.Parse()
    beam.Init()

    p := beam.NewPipeline()
    s := p.Root()

    // Read lines from input file
    lines := textio.Read(s, *input)

    // Split lines into words
    words := beam.ParDo(s, extractWords, lines)

    // Count occurrences of each word
    counted := stats.Count(s, words)

    // Format the counts
    formatted := beam.ParDo(s, formatCounts, counted)

    // Write results to output file
    textio.Write(s, *output, formatted)

    // Run the pipeline
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf("Failed to execute pipeline: %v", err)
    }
}
Run the pipeline:
go run wordcount.go --output=output.txt

Core Concepts

Pipeline

Create and execute pipelines:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"

func main() {
    beam.Init()
    
    p := beam.NewPipeline()
    s := p.Root()
    
    // Build your pipeline using scope 's'
    
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatal(err)
    }
}

PCollection

PCollections represent distributed datasets:
// Create from in-memory data
data := beam.CreateList(s, []string{"Hello", "World", "Beam"})

// Read from files
lines := textio.Read(s, "input.txt")

// PCollections are immutable and strongly typed
var processed beam.PCollection // Type inference from transforms

Scopes

Scopes organize and name pipeline components:
s := p.Root()

// Create named subscopes
readScope := s.Scope("Read")
processScope := s.Scope("Process")

lines := textio.Read(readScope, "input.txt")
processed := beam.ParDo(processScope, transformFn, lines)

Transforms

Apply transforms to process data:
// ParDo: Element-wise transformation
result := beam.ParDo(s, myDoFn, input)

// Combine: Aggregate values
total := stats.Sum(s, numbers)

// Flatten: Merge multiple PCollections
merged := beam.Flatten(s, pcol1, pcol2, pcol3)

// GroupByKey: Group by key (for KV pairs)
grouped := beam.GroupByKey(s, kvPairs)

DoFns and Functions

Simple Functions

Use regular Go functions for transformations:
// Simple 1-to-1 transform
func double(x int) int {
    return x * 2
}

// 1-to-many transform (using emit)
func splitWords(line string, emit func(string)) {
    for _, word := range strings.Fields(line) {
        emit(word)
    }
}

// Register functions for serialization
func init() {
    register.Function1x1(double)
    register.Function2x0(splitWords)
}

// Use in pipeline
doubled := beam.ParDo(s, double, numbers)
words := beam.ParDo(s, splitWords, lines)

Structural DoFns

Use struct-based DoFns for complex logic:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"

// FilterByThresholdFn filters elements above a threshold
type FilterByThresholdFn struct {
    Threshold int
}

func (f *FilterByThresholdFn) ProcessElement(x int, emit func(int)) {
    if x > f.Threshold {
        emit(x)
    }
}

func init() {
    register.DoFn2x0[int](&FilterByThresholdFn{})
}

// Use in pipeline
filtered := beam.ParDo(s, &FilterByThresholdFn{Threshold: 10}, numbers)

Side Inputs

Access additional data during processing:
func enrichWithSideInput(word string, sideInput func(*string) bool, emit func(string)) {
    var prefix string
    if sideInput(&prefix) {
        emit(prefix + ": " + word)
    }
}

func init() {
    register.Function3x0(enrichWithSideInput)
}

sideData := beam.CreateList(s, []string{"PREFIX"})
sideIter := beam.SideInput{Input: sideData}

enriched := beam.ParDo(s, enrichWithSideInput, words, sideIter)

Go-Specific Features

Type Safety with Generics

The Go SDK leverages Go’s type system:
// Strongly typed transforms
func processInt(x int) int { return x * 2 }
func processString(s string) string { return strings.ToUpper(s) }

intResults := beam.ParDo(s, processInt, intCollection)
strResults := beam.ParDo(s, processString, strCollection)

Registration System

Register functions for proper serialization:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/register"

func myTransform(x int) int { return x + 1 }
func myEmitter(x int, emit func(int, int)) { emit(x, x*2) }

func init() {
    // Register simple functions
    register.Function1x1(myTransform)
    register.Function2x0(myEmitter)
    
    // Register DoFns
    register.DoFn2x0[int](&MyDoFn{})
    
    // Register types
    register.Type(reflect.TypeOf((*MyStruct)(nil)).Elem())
}

Combiners

Implement efficient aggregations:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

// Built-in combiners
sum := stats.Sum(s, numbers)
count := stats.Count(s, elements)
max := stats.Max(s, values)

// Count per key
counted := stats.Count(s, words)  // Returns PCollection<KV<string, int>>

I/O Connectors

Text Files

import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"

// Read text files
lines := textio.Read(s, "input.txt")

// Read with glob pattern
allFiles := textio.Read(s, "data/*.txt")

// Write text files
textio.Write(s, "output.txt", results)

// Immediate write (returns no PCollection)
textio.Immediate(s, "output.txt", data)

Avro Files

import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio"

// Define your schema type
type MyRecord struct {
    Name  string `avro:"name"`
    Age   int    `avro:"age"`
    Email string `avro:"email"`
}

func init() {
    register.Type(reflect.TypeOf((*MyRecord)(nil)).Elem())
}

// Read Avro files
records := avroio.Read(s, "data.avro", reflect.TypeOf(MyRecord{}))

// Write Avro files  
avroio.Write(s, "output.avro", myRecords)

Cross-Language I/O

Use I/O transforms from other SDKs:
import (
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
)

// Use Python's ReadFromBigQuery via cross-language
expansionAddr := "localhost:8097"  // Expansion service address

rows := beam.CrossLanguage(
    s,
    "beam:transform:org.apache.beam:bigquery_read:v1",
    &ReadFromBigQueryConfig{
        Query: "SELECT * FROM dataset.table",
    },
    expansionAddr,
    beam.UnnamedInput(beam.Impulse(s)),
)

Running Pipelines

Direct Runner (Local)

go run main.go \
  --runner=direct \
  --output=output.txt

Dataflow Runner

go run main.go \
  --runner=dataflow \
  --project=YOUR_PROJECT_ID \
  --region=us-central1 \
  --staging_location=gs://YOUR_BUCKET/staging \
  --worker_harness_container_image=gcr.io/YOUR_PROJECT/beam_go_sdk:latest
go run main.go \
  --runner=flink \
  --flink_master=localhost:8081 \
  --environment_type=DOCKER

Prism (Portable Local Runner)

# Install Prism
go install github.com/apache/beam/sdks/v2/go/cmd/prism@latest

# Run your pipeline
go run main.go --runner=prism

Best Practices

Use the init() function to register all custom types and functions:
func init() {
    register.Function1x1(myFunc)
    register.DoFn2x0[string](&MyDoFn{})
    register.Type(reflect.TypeOf((*MyStruct)(nil)).Elem())
}
Organize your pipeline with descriptive scope names:
s := p.Root()

readScope := s.Scope("ReadInput")
lines := textio.Read(readScope, *input)

processScope := s.Scope("ProcessData")
results := beam.ParDo(processScope, processFn, lines)

writeScope := s.Scope("WriteOutput")
textio.Write(writeScope, *output, results)
Use the stats package for common aggregations:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

// Efficient built-in combiners
wordCounts := stats.Count(s, words)
totalSum := stats.Sum(s, numbers)
maxValue := stats.Max(s, values)
minValue := stats.Min(s, values)
Check errors from pipeline execution:
import "context"

if err := beamx.Run(context.Background(), p); err != nil {
    log.Fatalf("Pipeline failed: %v", err)
}

Composite Transforms

Create reusable pipeline components:
// CountWords is a composite transform
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")
    
    words := beam.ParDo(s.Scope("ExtractWords"), extractWords, lines)
    counted := stats.Count(s.Scope("Count"), words)
    formatted := beam.ParDo(s.Scope("Format"), formatCounts, counted)
    
    return formatted
}

// Use the composite transform
results := CountWords(s, inputLines)

Testing

Test your pipeline components:
import (
    "testing"
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestMyTransform(t *testing.T) {
    p, s := beam.NewPipelineWithRoot()
    
    input := beam.CreateList(s, []int{1, 2, 3, 4, 5})
    output := beam.ParDo(s, double, input)
    
    expected := []int{2, 4, 6, 8, 10}
    passert.Equals(s, output, expected...)
    
    if err := ptest.Run(p); err != nil {
        t.Fatalf("Pipeline failed: %v", err)
    }
}

Building Container Images

For portable runners, build SDK harness containers:
# Using Gradle from Beam source
./gradlew :sdks:go:container:docker -Pdocker-repository-root=YOUR_REPO

# Push to registry
docker push YOUR_REPO/beam_go_sdk:latest

# Use in pipeline
go run main.go \
  --runner=dataflow \
  --worker_harness_container_image=YOUR_REPO/beam_go_sdk:latest

Resources

Go SDK Reference

Complete Go package documentation

Code Examples

Sample pipelines and patterns

Prism Runner

Local portable runner for testing

Build Guide

Building and testing Go SDK

Next Steps

Build docs developers (and LLMs) love