The DirectRunner executes your pipeline locally within the process that constructed it. It’s designed for development, testing, and debugging on small-scale data.
Overview
The DirectRunner is suitable for:
Development : Quick iteration on pipeline logic
Testing : Unit and integration tests
Debugging : Local debugging with familiar tools
Small datasets : Processing data that fits on a single machine
The DirectRunner enforces correctness checks that may cause pipelines to fail if they violate Beam model assumptions. This helps catch bugs before production deployment.
When to Use DirectRunner
Good For
Local development
Unit testing
Debugging pipelines
Small data processing
Validating pipeline logic
Not Suitable For
Large datasets
Production workloads
Distributed processing
High throughput streaming
Long-running jobs
Setup and Configuration
Dependencies
The DirectRunner is included in the core Beam SDK:
Add to your pom.xml: < dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-direct-java </ artifactId >
< version > {beam-version} </ version >
</ dependency >
Or for Gradle: implementation 'org.apache.beam:beam-runners-direct-java:{beam-version}'
The DirectRunner is included with the Beam SDK: Import the direct runner: import " github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct "
Running a Pipeline
Basic Usage
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.runners.direct.DirectRunner;
public class MyPipeline {
public static void main ( String [] args ) {
PipelineOptions options = PipelineOptionsFactory . create ();
options . setRunner ( DirectRunner . class );
Pipeline p = Pipeline . create (options);
// Build your pipeline
p . apply ( /* transforms */ );
// Run and wait for completion
p . run (). waitUntilFinish ();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run ():
options = PipelineOptions([
'--runner=DirectRunner' ,
])
with beam.Pipeline( options = options) as p:
# Build your pipeline
(p
| beam.Create([ 'Hello' , 'World' ])
| beam.Map( print ))
if __name__ == '__main__' :
run()
package main
import (
" context "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct "
)
func main () {
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Build your pipeline
if err := direct . Execute ( context . Background (), p ); err != nil {
panic ( err )
}
}
Command Line
Run with DirectRunner from the command line:
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Dexec.args= "--runner=DirectRunner"
# Python
python my_pipeline.py --runner=DirectRunner
# Go
go run my_pipeline.go --runner=direct
DirectRunner Options
Configure DirectRunner behavior with these options:
Core Options
If true, Pipeline.run() blocks until the pipeline completes. Set to false for async execution. DirectOptions options = PipelineOptionsFactory . as ( DirectOptions . class );
options . setBlockOnRun ( false );
targetParallelism
integer
default: "max(availableProcessors, 3)"
Controls the amount of parallelism the DirectRunner will use. options . setTargetParallelism ( 8 );
Enforcement Options
The DirectRunner enforces correctness to catch bugs early:
Ensures elements are not mutated. Detects illegal modifications to input or output elements. options . setEnforceImmutability ( true );
Validates that all elements can be encoded and decoded by their Coder. options . setEnforceEncodability ( true );
The enforcement options help catch bugs that would only appear in distributed runners. Keep them enabled during development.
Example Configuration
import org.apache.beam.runners.direct.DirectOptions;
DirectOptions options = PipelineOptionsFactory . as ( DirectOptions . class );
// Runner configuration
options . setRunner ( DirectRunner . class );
options . setBlockOnRun ( true );
options . setTargetParallelism ( 4 );
// Enforcement (recommended for development)
options . setEnforceImmutability ( true );
options . setEnforceEncodability ( true );
Pipeline p = Pipeline . create (options);
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
'--runner=DirectRunner' ,
'--direct_num_workers=4' ,
'--direct_running_mode=multi_threading' ,
])
Runner Capabilities
Supported Features
✅ ParDo and Map operations
✅ GroupByKey and CoGroupByKey
✅ Windowing (all types)
✅ Triggers
✅ Side inputs
✅ Bounded and unbounded sources
✅ Metrics
Limitations
❌ Not designed for large-scale data
❌ Single machine execution only
❌ Limited streaming capabilities
❌ No distributed state management
❌ No automatic scaling
Testing with DirectRunner
The DirectRunner is excellent for testing:
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.junit.Rule;
import org.junit.Test;
public class MyPipelineTest {
@ Rule
public final transient TestPipeline pipeline = TestPipeline . create ();
@ Test
public void testPipeline () {
PCollection < String > output = pipeline
. apply ( Create . of ( "Hello" , "World" ))
. apply ( MapElements . into ( TypeDescriptors . strings ())
. via (String :: toUpperCase));
PAssert . that (output). containsInAnyOrder ( "HELLO" , "WORLD" );
pipeline . run ();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class MyPipelineTest ( unittest . TestCase ):
def test_pipeline ( self ):
with TestPipeline() as p:
output = (
p
| beam.Create([ 'Hello' , 'World' ])
| beam.Map( str .upper)
)
assert_that(output, equal_to([ 'HELLO' , 'WORLD' ]))
Debugging
The DirectRunner provides excellent debugging support:
Using Breakpoints
Since the pipeline runs locally, you can use standard debugging tools:
// Java - set breakpoints in your DoFn
public class MyDoFn extends DoFn < String , String > {
@ ProcessElement
public void processElement ( ProcessContext c ) {
String element = c . element ();
// Set breakpoint here
c . output ( element . toUpperCase ());
}
}
Logging
Use standard logging to inspect pipeline execution:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyDoFn extends DoFn < String , String > {
private static final Logger LOG = LoggerFactory . getLogger ( MyDoFn . class );
@ ProcessElement
public void processElement ( ProcessContext c ) {
LOG . info ( "Processing: {}" , c . element ());
c . output ( c . element ());
}
}
import logging
import apache_beam as beam
class MyDoFn ( beam . DoFn ):
def process ( self , element ):
logging.info( f 'Processing: { element } ' )
yield element.upper()
The DirectRunner is not optimized for performance:
Memory : All data is processed in memory
Parallelism : Limited to single machine resources
Optimization : Minimal query optimization compared to distributed runners
Next Steps
PrismRunner Try the modern portable local runner
DataflowRunner Scale to production with Dataflow
Testing Guide Learn more about testing pipelines
Pipeline Options Explore all configuration options