Skip to main content
The Apache Beam Python SDK enables you to create powerful batch and streaming data pipelines using Python’s expressive syntax and rich ecosystem of libraries.

Installation

1

Install Python

Ensure you have Python 3.8 or later installed:
python --version
2

Create a virtual environment (recommended)

python -m venv beam-env
source beam-env/bin/activate  # On Windows: beam-env\Scripts\activate
3

Install Apache Beam

pip install apache-beam
4

Verify installation

python -c "import apache_beam as beam; print(beam.__version__)"

Quick Start

Here’s a simple word count example:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re

class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""
    
    def process(self, element):
        """Returns an iterator over the words of this element."""
        return re.findall(r'[\w\']+', element, re.UNICODE)

def run():
    # Create pipeline options
    pipeline_options = PipelineOptions()
    
    # Create the pipeline
    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read' >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
            | 'Split' >> beam.ParDo(WordExtractingDoFn())
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)
            | 'Format' >> beam.MapTuple(lambda word, count: f'{word}: {count}')
            | 'Write' >> WriteToText('output/wordcount')
        )

if __name__ == '__main__':
    run()
Run the pipeline:
python wordcount.py --output output.txt

Core Concepts

Pipeline

A pipeline encapsulates your data processing workflow:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
    # Build your pipeline here
    pass

PCollection

PCollection represents a distributed dataset:
# Create from memory
data = pipeline | beam.Create(['Hello', 'World', 'Beam'])

# Read from file
lines = pipeline | beam.io.ReadFromText('input.txt')

# PCollections are immutable
processed = data | beam.Map(str.upper)

Transforms

Transforms process data in your pipeline:
# Map: 1-to-1 transformation
upper = words | beam.Map(str.upper)

# FlatMap: 1-to-many transformation
words = lines | beam.FlatMap(lambda line: line.split())

# Filter: Keep elements matching condition
long_words = words | beam.Filter(lambda word: len(word) > 5)

# CombinePerKey: Aggregate values by key
sums = pairs | beam.CombinePerKey(sum)

DoFn (Do Functions)

DoFn allows for more complex processing:
class SplitWords(beam.DoFn):
    def process(self, element):
        """Yields individual words from each line."""
        for word in element.split():
            yield word

words = lines | beam.ParDo(SplitWords())

Python-Specific Features

Type Hints

Improve pipeline validation with type hints:
import apache_beam as beam
from apache_beam.typehints import TypeCheckError

def process_data(element: str) -> int:
    return len(element)

results = (
    pipeline
    | beam.Create(['a', 'bb', 'ccc'])
    | beam.Map(process_data).with_output_types(int)
)

Lambda Functions

Use Python lambdas for simple transformations:
results = (
    data
    | beam.Map(lambda x: x * 2)
    | beam.Filter(lambda x: x > 10)
    | beam.FlatMap(lambda x: [x, x + 1])
)

DataFrame API

Use familiar pandas-like operations:
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe, to_pcollection

with beam.Pipeline() as pipeline:
    # Create PCollection
    data = pipeline | beam.Create([
        {'word': 'hello', 'count': 3},
        {'word': 'world', 'count': 5},
    ])
    
    # Convert to DataFrame
    df = to_dataframe(data)
    
    # Use pandas operations
    df['count'] = df['count'] * 2
    result_df = df.groupby('word').sum()
    
    # Convert back to PCollection
    result = to_pcollection(result_df, pipeline=pipeline)

Machine Learning with RunInference

Integrate ML models directly into your pipeline:
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
import pickle

# Load your trained model
model_handler = SklearnModelHandlerNumpy(
    model_uri='gs://my-bucket/model.pkl'
)

predictions = (
    data
    | 'Preprocess' >> beam.Map(preprocess_fn)
    | 'RunInference' >> RunInference(model_handler)
    | 'Postprocess' >> beam.Map(postprocess_fn)
)

Streaming Pipelines

Process unbounded data streams:
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import StandardOptions

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
            subscription='projects/myproject/subscriptions/mysub'
        )
        | 'Parse JSON' >> beam.Map(json.loads)
        | 'Window' >> beam.WindowInto(window.FixedWindows(60))  # 1-minute windows
        | 'Count per key' >> beam.CombinePerKey(sum)
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            'myproject:mydataset.mytable'
        )
    )

Windowing

Group streaming data into windows:
from apache_beam import window

# Fixed time windows
windowed_data = (
    data
    | beam.WindowInto(window.FixedWindows(60))  # 60 seconds
)

# Sliding windows
sliding_windows = (
    data
    | beam.WindowInto(window.SlidingWindows(60, 30))  # 60s windows every 30s
)

# Session windows
session_windows = (
    data
    | beam.WindowInto(window.Sessions(600))  # 10-minute gap
)

I/O Connectors

The Python SDK supports various data sources:

Files

# Text files
lines = pipeline | beam.io.ReadFromText('input.txt')
lines | beam.io.WriteToText('output.txt')

# Avro
records = pipeline | beam.io.ReadFromAvro('data.avro')
data | beam.io.WriteToAvro('output.avro', schema=avro_schema)

# Parquet
df = pipeline | beam.io.ReadFromParquet('data.parquet')
data | beam.io.WriteToParquet('output.parquet')

Google Cloud Platform

# BigQuery
rows = pipeline | beam.io.ReadFromBigQuery(
    table='project:dataset.table'
)
data | beam.io.WriteToBigQuery(
    'project:dataset.table',
    schema={'fields': [{'name': 'field1', 'type': 'STRING'}]}
)

# Cloud Pub/Sub
messages = pipeline | beam.io.ReadFromPubSub(
    topic='projects/myproject/topics/mytopic'
)
data | beam.io.WriteToPubSub(
    topic='projects/myproject/topics/output'
)

# Cloud Storage
files = pipeline | beam.io.ReadFromText('gs://bucket/path/*.txt')

Databases

# MongoDB
from apache_beam.io.mongodbio import ReadFromMongoDB

data = pipeline | ReadFromMongoDB(
    uri='mongodb://localhost:27017',
    db='mydb',
    coll='mycollection'
)

# JDBC (via Java)
from apache_beam.io.jdbc import ReadFromJdbc

rows = pipeline | ReadFromJdbc(
    table_name='my_table',
    driver_class_name='org.postgresql.Driver',
    jdbc_url='jdbc:postgresql://localhost:5432/mydb',
    username='user',
    password='pass'
)

Running Pipelines

Direct Runner (Local)

python my_pipeline.py \
  --runner DirectRunner \
  --output output.txt

Google Cloud Dataflow

python my_pipeline.py \
  --runner DataflowRunner \
  --project YOUR_PROJECT_ID \
  --region us-central1 \
  --temp_location gs://YOUR_BUCKET/temp \
  --staging_location gs://YOUR_BUCKET/staging
python my_pipeline.py \
  --runner FlinkRunner \
  --flink_master localhost:8081 \
  --environment_type=DOCKER \
  --environment_config=apache/beam_python3.9_sdk:latest

Best Practices

Define configurable options for your pipeline:
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input', required=True)
        parser.add_argument('--output', required=True)

options = MyOptions()
with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        | beam.io.ReadFromText(options.input)
        | beam.io.WriteToText(options.output)
    )
Manage pipeline dependencies properly:
# Use setup.py for custom dependencies
options = PipelineOptions(
    setup_file='./setup.py'
)

# Or specify requirements
options.view_as(SetupOptions).requirements_file = 'requirements.txt'
Use lifecycle methods for expensive operations:
class ProcessWithResource(beam.DoFn):
    def setup(self):
        # Initialize expensive resources once
        self.client = create_client()
    
    def process(self, element):
        # Process using self.client
        result = self.client.query(element)
        yield result
    
    def teardown(self):
        # Clean up resources
        self.client.close()
Prefer Combine transforms for better performance:
import apache_beam as beam
from apache_beam import combiners

# Good: Uses Combine for efficient aggregation
totals = data | beam.CombinePerKey(sum)

# Also good: Built-in combiners
stats = data | beam.CombineGlobally(
    combiners.MeanCombineFn()
)

Interactive Beam

Develop and debug pipelines in Jupyter notebooks:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib

# Create pipeline
p = beam.Pipeline(interactive_runner.InteractiveRunner())

data = p | beam.Create([1, 2, 3, 4, 5])
results = data | beam.Map(lambda x: x * 2)

# Show results interactively
ib.show(results)

Testing Pipelines

Test your pipelines effectively:
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

class MyPipelineTest(unittest.TestCase):
    def test_word_count(self):
        with TestPipeline() as p:
            input_data = ['hello world', 'hello beam']
            expected = [('hello', 2), ('world', 1), ('beam', 1)]
            
            result = (
                p
                | beam.Create(input_data)
                | beam.FlatMap(lambda x: x.split())
                | beam.Map(lambda x: (x, 1))
                | beam.CombinePerKey(sum)
            )
            
            assert_that(result, equal_to(expected))

Resources

API Reference

Complete Python API documentation

Code Examples

Example pipelines and patterns

ML Guide

Machine learning with Beam

Interactive Beam

Jupyter notebook development

Next Steps

Build docs developers (and LLMs) love