Skip to main content
The Pipeline module provides the top-level API for constructing and executing Apache Beam data processing pipelines.

Pipeline

A Pipeline object manages a DAG (Directed Acyclic Graph) of PTransforms and their PValues.

Constructor

beam.Pipeline(
    runner=None,
    options=None,
    argv=None,
    display_data=None
)
runner
Union[str, PipelineRunner]
default:"DirectRunner"
The runner to execute the pipeline. Can be a runner name string (e.g., ‘DirectRunner’, ‘DataflowRunner’) or a PipelineRunner object.
options
PipelineOptions
A configured PipelineOptions object containing arguments for running the pipeline.
argv
list[str]
Command line arguments for building PipelineOptions. Only used if options is None.
display_data
dict[str, Any]
Static data associated with the pipeline that can be displayed when it runs.

Methods

run()

Executes the pipeline and returns the result.
result = pipeline.run()
Returns: PipelineResult - Result object from the runner

apply()

Applies a transform to the pipeline.
pipeline.apply(transform, pvalueish=None, label=None)
transform
PTransform
The PTransform to apply.
pvalueish
PCollection
Input PCollection for the transform.
label
str
Label for the transform.

Usage Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create a pipeline with default DirectRunner
with beam.Pipeline() as p:
    lines = p | 'Create' >> beam.Create(['Hello', 'World'])
    lines | 'Print' >> beam.Map(print)

# Create a pipeline with options
options = PipelineOptions([
    '--runner=DirectRunner',
    '--project=my-project'
])

with beam.Pipeline(options=options) as p:
    (p 
     | 'ReadData' >> beam.Create([1, 2, 3, 4, 5])
     | 'MultiplyByTwo' >> beam.Map(lambda x: x * 2)
     | 'WriteOutput' >> beam.io.WriteToText('./output'))

PipelineOptions

Container class for command line options and pipeline configuration.

Constructor

PipelineOptions(flags=None, **kwargs)
flags
list[str]
Command line arguments to parse. Uses sys.argv if not specified.
**kwargs
dict
Override values for specific options.

Methods

view_as()

Returns a view of the options as a specific options class.
standard_options = options.view_as(StandardOptions)
runner = standard_options.runner

get_all_options()

Returns a dictionary of all defined options.
all_opts = options.get_all_options()

Common Option Classes

StandardOptions

Basic pipeline execution options.
from apache_beam.options.pipeline_options import StandardOptions

options = PipelineOptions()
standard = options.view_as(StandardOptions)
standard.runner = 'DataflowRunner'
standard.streaming = False
Key attributes:
  • runner (str): Pipeline runner to use
  • streaming (bool): Whether to run in streaming mode

GoogleCloudOptions

Options for Google Cloud Dataflow runner.
from apache_beam.options.pipeline_options import GoogleCloudOptions

gcp_options = options.view_as(GoogleCloudOptions)
gcp_options.project = 'my-gcp-project'
gcp_options.job_name = 'my-beam-job'
gcp_options.staging_location = 'gs://my-bucket/staging'
gcp_options.temp_location = 'gs://my-bucket/temp'
gcp_options.region = 'us-central1'
Key attributes:
  • project (str): GCP project ID
  • job_name (str): Dataflow job name
  • staging_location (str): GCS path for staging files
  • temp_location (str): GCS path for temporary files
  • region (str): GCP region for job execution

SetupOptions

Options for environment setup.
from apache_beam.options.pipeline_options import SetupOptions

setup_options = options.view_as(SetupOptions)
setup_options.save_main_session = True
setup_options.requirements_file = 'requirements.txt'
Key attributes:
  • save_main_session (bool): Save main session state for pickling
  • requirements_file (str): Path to requirements file for dependencies
  • setup_file (str): Path to setup.py file

Usage Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions

# Create options from command line
options = PipelineOptions([
    '--project=my-project',
    '--region=us-central1',
    '--runner=DataflowRunner',
    '--temp_location=gs://my-bucket/temp',
    '--staging_location=gs://my-bucket/staging'
])

# Access specific option groups
gcp_options = options.view_as(GoogleCloudOptions)
print(f"Project: {gcp_options.project}")

# Create pipeline with options
with beam.Pipeline(options=options) as p:
    result = (p 
              | beam.Create([1, 2, 3])
              | beam.Map(lambda x: x * 2))

Context Managers

transform_annotations()

Attach annotations to a set of transforms.
with pipeline.transform_annotations(annotation_key='value'):
    # All transforms applied here will have the annotations
    data | 'Transform' >> beam.Map(lambda x: x)

Properties

options

Access the pipeline’s PipelineOptions.
pipeline_options = pipeline.options
runner = pipeline_options.view_as(StandardOptions).runner

Build docs developers (and LLMs) love