Skip to main content
The DataflowRunner executes Apache Beam pipelines on Google Cloud Dataflow, a fully managed service for stream and batch data processing.

Overview

Google Cloud Dataflow provides:
  • Fully Managed: No cluster management required
  • Autoscaling: Automatic resource scaling based on workload
  • Optimization: Automatic pipeline optimization and execution
  • Monitoring: Built-in monitoring and logging with Cloud Monitoring
  • Security: Integration with Google Cloud IAM and VPC

When to Use DataflowRunner

Best For

  • Production workloads on GCP
  • Large-scale data processing
  • Auto-scaling requirements
  • Managed infrastructure
  • Integration with GCP services

Consider Alternatives

  • Small local datasets (use DirectRunner)
  • Non-GCP environments
  • Existing Spark/Flink clusters
  • Cost-sensitive batch jobs

Setup and Configuration

Prerequisites

  1. Google Cloud Project: Create a project in Google Cloud Console
  2. Enable APIs: Enable Cloud Dataflow, Compute Engine, and Cloud Storage APIs
  3. Authentication: Set up authentication credentials
  4. Cloud Storage: Create a GCS bucket for staging and temp files

Dependencies

Add the Dataflow runner dependency:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>{beam-version}</version>
</dependency>
For Gradle:
implementation 'org.apache.beam:beam-runners-google-cloud-dataflow-java:{beam-version}'

Authentication

Set up Google Cloud credentials:
# Install gcloud CLI
curl https://sdk.cloud.google.com | bash

# Authenticate
gcloud auth application-default login

# Set your project
gcloud config set project YOUR_PROJECT_ID

Running a Pipeline

Basic Example

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class MyDataflowPipeline {
  public static void main(String[] args) {
    DataflowPipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(DataflowPipelineOptions.class);
    
    // Required: Set the runner
    options.setRunner(DataflowRunner.class);
    
    // Required: GCP project
    options.setProject("your-project-id");
    
    // Required: GCP region
    options.setRegion("us-central1");
    
    // Required: Staging location
    options.setStagingLocation("gs://your-bucket/staging");
    
    // Required: Temp location
    options.setTempLocation("gs://your-bucket/temp");
    
    Pipeline p = Pipeline.create(options);
    
    // Build your pipeline
    p.apply(/* your transforms */);
    
    // Execute on Dataflow
    p.run();
  }
}

Command Line Execution

# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
  -Dexec.args="--runner=DataflowRunner \
               --project=your-project-id \
               --region=us-central1 \
               --stagingLocation=gs://your-bucket/staging \
               --tempLocation=gs://your-bucket/temp"

# Python
python my_pipeline.py \
  --runner=DataflowRunner \
  --project=your-project-id \
  --region=us-central1 \
  --staging_location=gs://your-bucket/staging \
  --temp_location=gs://your-bucket/temp

DataflowPipelineOptions

Key configuration options for the DataflowRunner:

Required Options

project
string
required
Google Cloud project ID.
options.setProject("my-gcp-project");
region
string
required
Google Cloud region for job execution (e.g., us-central1, europe-west1).
options.setRegion("us-central1");
stagingLocation
string
required
Cloud Storage path for staging files (must start with gs://).
options.setStagingLocation("gs://my-bucket/staging");
tempLocation
string
required
Cloud Storage path for temporary files.
options.setTempLocation("gs://my-bucket/temp");

Worker Configuration

numWorkers
integer
default:"autoscaling"
Initial number of workers. Dataflow will autoscale from this value.
options.setNumWorkers(10);
maxNumWorkers
integer
Maximum number of workers for autoscaling.
options.setMaxNumWorkers(100);
workerMachineType
string
default:"n1-standard-1"
Compute Engine machine type for workers.
options.setWorkerMachineType("n1-standard-4");
diskSizeGb
integer
default:"250"
Disk size in GB for each worker.
options.setDiskSizeGb(500);

Streaming Options

streaming
boolean
default:"false"
Enable streaming mode for unbounded sources.
options.setStreaming(true);
enableStreamingEngine
boolean
default:"false"
Use Dataflow Streaming Engine for streaming pipelines.
options.setEnableStreamingEngine(true);

Network Configuration

network
string
Compute Engine network for launching workers.
options.setNetwork("projects/my-project/global/networks/my-network");
subnetwork
string
Compute Engine subnetwork for launching workers.
options.setSubnetwork("regions/us-central1/subnetworks/my-subnet");
usePublicIps
boolean
default:"true"
Whether workers should have public IP addresses.
options.setUsePublicIps(false); // Private IPs only

Advanced Configuration

Autoscaling

Dataflow automatically scales workers based on workload:
DataflowPipelineOptions options = 
    PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// Set autoscaling parameters
options.setAutoscalingAlgorithm(
    DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
);
options.setNumWorkers(2);        // Initial workers
options.setMaxNumWorkers(20);    // Maximum workers

Flex Templates

Create reusable Dataflow templates:
DataflowPipelineOptions options = 
    PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// Specify template location
options.setTemplateLocation("gs://my-bucket/templates/my-template");

Pipeline p = Pipeline.create(options);
// Build pipeline
p.run(); // Creates template instead of running

Update Existing Jobs

Update a running Dataflow job:
options.setUpdate(true);
options.setJobName("existing-job-name");
python my_pipeline.py \
  --runner=DataflowRunner \
  --update \
  --job_name=existing-job-name \
  # other options...

Monitoring and Debugging

Cloud Console

Monitor jobs in the Dataflow Console:
  • View job graph and metrics
  • Monitor worker resource usage
  • Inspect logs and errors
  • Track data throughput

Logging

Logs are automatically sent to Cloud Logging:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    LOG.info("Processing: {}", c.element());
    c.output(c.element());
  }
}

Metrics

Dataflow provides built-in metrics:
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;

public class MyDoFn extends DoFn<String, String> {
  private final Counter elementsProcessed = 
      Metrics.counter(MyDoFn.class, "elementsProcessed");
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    elementsProcessed.inc();
    c.output(c.element());
  }
}

Best Practices

Cost Optimization

  1. Use Appropriate Machine Types
    // For CPU-intensive workloads
    options.setWorkerMachineType("n1-highcpu-4");
    
    // For memory-intensive workloads
    options.setWorkerMachineType("n1-highmem-4");
    
  2. Enable Autoscaling
    options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
    options.setMaxNumWorkers(50);
    
  3. Use Streaming Engine (for streaming jobs)
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);
    

Performance

  1. Optimize Windowing
    • Use appropriate window sizes
    • Consider allowed lateness for late data
  2. Batch Elements
    • Use GroupIntoBatches for downstream API calls
    • Reduce per-element overhead
  3. Use Side Inputs Wisely
    • Keep side inputs small
    • Consider using external lookups for large datasets

Security

  1. Use VPC Networks
    options.setNetwork("projects/my-project/global/networks/my-vpc");
    options.setUsePublicIps(false);
    
  2. Service Accounts
    options.setServiceAccount("[email protected]");
    
  3. Encryption
    • Data is encrypted at rest and in transit by default
    • Use Customer Managed Encryption Keys (CMEK) for additional control

Streaming vs Batch

Batch Pipeline

DataflowPipelineOptions options = 
    PipelineOptionsFactory.as(DataflowPipelineOptions.class);

options.setRunner(DataflowRunner.class);
// streaming defaults to false

Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://bucket/input/*"))
    .apply(/* transforms */)
    .apply(TextIO.write().to("gs://bucket/output"));

Streaming Pipeline

DataflowPipelineOptions options = 
    PipelineOptionsFactory.as(DataflowPipelineOptions.class);

options.setRunner(DataflowRunner.class);
options.setStreaming(true);
options.setEnableStreamingEngine(true);

Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromTopic("projects/my-project/topics/my-topic"))
    .apply(/* transforms */)
    .apply(PubsubIO.writeStrings().to("projects/my-project/topics/output-topic"));

Troubleshooting

Common Issues

Increase quotas in the GCP Console:
  • Go to IAM & Admin > Quotas
  • Filter by service (Compute Engine)
  • Request quota increase
Check:
  • Service account permissions
  • Network/firewall configuration
  • Region availability
  • Machine type availability in the region
Optimize:
  • Reduce worker machine sizes
  • Set appropriate max workers
  • Use Flex templates for repeated jobs
  • Enable Streaming Engine for streaming
  • Set appropriate worker disk sizes
Consider:
  • Increasing worker count or machine type
  • Optimizing transforms (reduce shuffles)
  • Using Combiner functions
  • Partitioning data appropriately

Next Steps

Dataflow Console

Monitor and manage your Dataflow jobs

FlinkRunner

Alternative for self-managed clusters

Monitoring Guide

Learn about metrics and monitoring

Pricing

Understand Dataflow pricing

Build docs developers (and LLMs) love