The Pipeline module provides the top-level API for constructing and executing Apache Beam data processing pipelines.
Pipeline
A Pipeline object manages a DAG (Directed Acyclic Graph) of PTransforms and their PValues.
Constructor
beam.Pipeline(
runner=None,
options=None,
argv=None,
display_data=None
)
runner
Union[str, PipelineRunner]
default:"DirectRunner"
The runner to execute the pipeline. Can be a runner name string (e.g., ‘DirectRunner’, ‘DataflowRunner’) or a PipelineRunner object.
A configured PipelineOptions object containing arguments for running the pipeline.
Command line arguments for building PipelineOptions. Only used if options is None.
Static data associated with the pipeline that can be displayed when it runs.
Methods
run()
Executes the pipeline and returns the result.
Returns: PipelineResult - Result object from the runner
apply()
Applies a transform to the pipeline.
pipeline.apply(transform, pvalueish=None, label=None)
Input PCollection for the transform.
Usage Example
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Create a pipeline with default DirectRunner
with beam.Pipeline() as p:
lines = p | 'Create' >> beam.Create(['Hello', 'World'])
lines | 'Print' >> beam.Map(print)
# Create a pipeline with options
options = PipelineOptions([
'--runner=DirectRunner',
'--project=my-project'
])
with beam.Pipeline(options=options) as p:
(p
| 'ReadData' >> beam.Create([1, 2, 3, 4, 5])
| 'MultiplyByTwo' >> beam.Map(lambda x: x * 2)
| 'WriteOutput' >> beam.io.WriteToText('./output'))
PipelineOptions
Container class for command line options and pipeline configuration.
Constructor
PipelineOptions(flags=None, **kwargs)
Command line arguments to parse. Uses sys.argv if not specified.
Override values for specific options.
Methods
view_as()
Returns a view of the options as a specific options class.
standard_options = options.view_as(StandardOptions)
runner = standard_options.runner
get_all_options()
Returns a dictionary of all defined options.
all_opts = options.get_all_options()
Common Option Classes
StandardOptions
Basic pipeline execution options.
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
standard = options.view_as(StandardOptions)
standard.runner = 'DataflowRunner'
standard.streaming = False
Key attributes:
runner (str): Pipeline runner to use
streaming (bool): Whether to run in streaming mode
GoogleCloudOptions
Options for Google Cloud Dataflow runner.
from apache_beam.options.pipeline_options import GoogleCloudOptions
gcp_options = options.view_as(GoogleCloudOptions)
gcp_options.project = 'my-gcp-project'
gcp_options.job_name = 'my-beam-job'
gcp_options.staging_location = 'gs://my-bucket/staging'
gcp_options.temp_location = 'gs://my-bucket/temp'
gcp_options.region = 'us-central1'
Key attributes:
project (str): GCP project ID
job_name (str): Dataflow job name
staging_location (str): GCS path for staging files
temp_location (str): GCS path for temporary files
region (str): GCP region for job execution
SetupOptions
Options for environment setup.
from apache_beam.options.pipeline_options import SetupOptions
setup_options = options.view_as(SetupOptions)
setup_options.save_main_session = True
setup_options.requirements_file = 'requirements.txt'
Key attributes:
save_main_session (bool): Save main session state for pickling
requirements_file (str): Path to requirements file for dependencies
setup_file (str): Path to setup.py file
Usage Example
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
# Create options from command line
options = PipelineOptions([
'--project=my-project',
'--region=us-central1',
'--runner=DataflowRunner',
'--temp_location=gs://my-bucket/temp',
'--staging_location=gs://my-bucket/staging'
])
# Access specific option groups
gcp_options = options.view_as(GoogleCloudOptions)
print(f"Project: {gcp_options.project}")
# Create pipeline with options
with beam.Pipeline(options=options) as p:
result = (p
| beam.Create([1, 2, 3])
| beam.Map(lambda x: x * 2))
Context Managers
Attach annotations to a set of transforms.
with pipeline.transform_annotations(annotation_key='value'):
# All transforms applied here will have the annotations
data | 'Transform' >> beam.Map(lambda x: x)
Properties
options
Access the pipeline’s PipelineOptions.
pipeline_options = pipeline.options
runner = pipeline_options.view_as(StandardOptions).runner