Skip to main content
The DirectRunner executes your pipeline locally within the process that constructed it. It’s designed for development, testing, and debugging on small-scale data.

Overview

The DirectRunner is suitable for:
  • Development: Quick iteration on pipeline logic
  • Testing: Unit and integration tests
  • Debugging: Local debugging with familiar tools
  • Small datasets: Processing data that fits on a single machine
The DirectRunner enforces correctness checks that may cause pipelines to fail if they violate Beam model assumptions. This helps catch bugs before production deployment.

When to Use DirectRunner

Good For

  • Local development
  • Unit testing
  • Debugging pipelines
  • Small data processing
  • Validating pipeline logic

Not Suitable For

  • Large datasets
  • Production workloads
  • Distributed processing
  • High throughput streaming
  • Long-running jobs

Setup and Configuration

Dependencies

The DirectRunner is included in the core Beam SDK:
Add to your pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>{beam-version}</version>
</dependency>
Or for Gradle:
implementation 'org.apache.beam:beam-runners-direct-java:{beam-version}'

Running a Pipeline

Basic Usage

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.direct.DirectRunner;

public class MyPipeline {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    options.setRunner(DirectRunner.class);
    
    Pipeline p = Pipeline.create(options);
    
    // Build your pipeline
    p.apply(/* transforms */);
    
    // Run and wait for completion
    p.run().waitUntilFinish();
  }
}

Command Line

Run with DirectRunner from the command line:
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
  -Dexec.args="--runner=DirectRunner"

# Python
python my_pipeline.py --runner=DirectRunner

# Go
go run my_pipeline.go --runner=direct

DirectRunner Options

Configure DirectRunner behavior with these options:

Core Options

blockOnRun
boolean
default:"true"
If true, Pipeline.run() blocks until the pipeline completes. Set to false for async execution.
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
options.setBlockOnRun(false);
targetParallelism
integer
default:"max(availableProcessors, 3)"
Controls the amount of parallelism the DirectRunner will use.
options.setTargetParallelism(8);

Enforcement Options

The DirectRunner enforces correctness to catch bugs early:
enforceImmutability
boolean
default:"true"
Ensures elements are not mutated. Detects illegal modifications to input or output elements.
options.setEnforceImmutability(true);
enforceEncodability
boolean
default:"true"
Validates that all elements can be encoded and decoded by their Coder.
options.setEnforceEncodability(true);
The enforcement options help catch bugs that would only appear in distributed runners. Keep them enabled during development.

Example Configuration

import org.apache.beam.runners.direct.DirectOptions;

DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);

// Runner configuration
options.setRunner(DirectRunner.class);
options.setBlockOnRun(true);
options.setTargetParallelism(4);

// Enforcement (recommended for development)
options.setEnforceImmutability(true);
options.setEnforceEncodability(true);

Pipeline p = Pipeline.create(options);

Runner Capabilities

Supported Features

  • ✅ ParDo and Map operations
  • ✅ GroupByKey and CoGroupByKey
  • ✅ Windowing (all types)
  • ✅ Triggers
  • ✅ Side inputs
  • ✅ Bounded and unbounded sources
  • ✅ Metrics

Limitations

  • ❌ Not designed for large-scale data
  • ❌ Single machine execution only
  • ❌ Limited streaming capabilities
  • ❌ No distributed state management
  • ❌ No automatic scaling

Testing with DirectRunner

The DirectRunner is excellent for testing:
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.junit.Rule;
import org.junit.Test;

public class MyPipelineTest {
  @Rule
  public final transient TestPipeline pipeline = TestPipeline.create();
  
  @Test
  public void testPipeline() {
    PCollection<String> output = pipeline
        .apply(Create.of("Hello", "World"))
        .apply(MapElements.into(TypeDescriptors.strings())
            .via(String::toUpperCase));
    
    PAssert.that(output).containsInAnyOrder("HELLO", "WORLD");
    
    pipeline.run();
  }
}

Debugging

The DirectRunner provides excellent debugging support:

Using Breakpoints

Since the pipeline runs locally, you can use standard debugging tools:
// Java - set breakpoints in your DoFn
public class MyDoFn extends DoFn<String, String> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    String element = c.element();
    // Set breakpoint here
    c.output(element.toUpperCase());
  }
}

Logging

Use standard logging to inspect pipeline execution:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    LOG.info("Processing: {}", c.element());
    c.output(c.element());
  }
}

Performance Considerations

The DirectRunner is not optimized for performance:
  • Memory: All data is processed in memory
  • Parallelism: Limited to single machine resources
  • Optimization: Minimal query optimization compared to distributed runners
For production workloads or large datasets, use a distributed runner like DataflowRunner, FlinkRunner, or SparkRunner.

Next Steps

PrismRunner

Try the modern portable local runner

DataflowRunner

Scale to production with Dataflow

Testing Guide

Learn more about testing pipelines

Pipeline Options

Explore all configuration options

Build docs developers (and LLMs) love