Flink supports testing at multiple levels:
- Unit tests for stateless functions — no Flink runtime required.
- Test harness tests for stateful operators and functions that use timers, keyed state, or watermarks.
- Integration tests using a local
MiniCluster that runs an embedded Flink cluster inside the test JVM.
Unit testing stateless functions
Simple functions that contain no Flink state or timers can be tested as plain Java:
IncrementMapFunctionTest.java
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
return value + 1;
}
}
// Test
public class IncrementMapFunctionTest {
@Test
public void testIncrement() throws Exception {
IncrementMapFunction fn = new IncrementMapFunction();
assertEquals(3L, (long) fn.map(2L));
}
}
For functions that use a Collector (e.g., FlatMapFunction, ProcessFunction), mock the collector with a test framework like Mockito:
public class IncrementFlatMapFunction implements FlatMapFunction<Long, Long> {
@Override
public void flatMap(Long value, Collector<Long> out) {
out.collect(value + 1);
}
}
// Test
public class IncrementFlatMapFunctionTest {
@Test
public void testIncrement() throws Exception {
IncrementFlatMapFunction fn = new IncrementFlatMapFunction();
Collector<Long> collector = mock(Collector.class);
fn.flatMap(2L, collector);
verify(collector, times(1)).collect(3L);
}
}
Test harnesses for stateful operators
Functions that use managed state, timers, or event-time logic require a Flink runtime context to run. Flink provides test harnesses that wrap operators and simulate the runtime:
| Harness | Use for |
|---|
OneInputStreamOperatorTestHarness | Operators on DataStream |
KeyedOneInputStreamOperatorTestHarness | Operators on KeyedStream |
TwoInputStreamOperatorTestHarness | Operators on ConnectedStream |
KeyedTwoInputStreamOperatorTestHarness | Keyed connected streams |
Add the test dependency to your build:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Testing a stateful operator
import org.apache.flink.streaming.runtime.operators.StreamFlatMap;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMapFunction function;
@Before
public void setup() throws Exception {
function = new StatefulFlatMapFunction();
// Wrap the function in its corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(
new StreamFlatMap<>(function)
);
// Configure execution settings if needed
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open() is called here, which also calls open() on RichFunctions
testHarness.open();
}
@Test
public void testStateAccumulation() throws Exception {
// Push elements with timestamps
testHarness.processElement(2L, 100L);
testHarness.processElement(3L, 200L);
// Advance event time by emitting a watermark
testHarness.processWatermark(150L);
// Advance processing time
testHarness.setProcessingTime(500L);
// Inspect output
List<Long> output = testHarness.extractOutputValues();
assertThat(output, containsInAnyOrder(3L, 4L));
}
@After
public void teardown() throws Exception {
testHarness.close();
}
}
Testing a keyed operator
For keyed operators, provide a KeySelector and the key’s TypeInformation:
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.api.common.typeinfo.Types;
KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Long>, Long> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
new StreamFlatMap<>(new MyKeyedFlatMap()),
tuple -> tuple.f0, // key selector
Types.STRING // key type information
);
harness.open();
harness.processElement(new Tuple2<>("user-1", 10L), 100L);
harness.processElement(new Tuple2<>("user-2", 20L), 100L);
harness.processElement(new Tuple2<>("user-1", 15L), 200L);
// Trigger event-time timers
harness.processWatermark(300L);
List<Long> results = harness.extractOutputValues();
What the test harness gives you
processElement(element, timestamp) — injects a record into the operator
processWatermark(timestamp) — advances event time and fires event-time timers
setProcessingTime(timestamp) — advances processing time and fires processing-time timers
getOutput() — returns all emitted records as a Queue
extractOutputValues() — returns output values as a List
getSideOutput(outputTag) — returns records emitted to a side output
ProcessFunctionTestHarnesses
For ProcessFunction and its variants, Flink provides a factory that simplifies harness setup:
ProcessFunctionHarnessTest.java
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
public class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
out.collect(value);
}
}
// Test
@Test
public void testPassThrough() throws Exception {
PassThroughProcessFunction fn = new PassThroughProcessFunction();
OneInputStreamOperatorTestHarness<Integer, Integer> harness =
ProcessFunctionTestHarnesses.forProcessFunction(fn);
harness.processElement(1, 10L);
harness.processElement(2, 20L);
assertEquals(Arrays.asList(1, 2), harness.extractOutputValues());
}
ProcessFunctionTestHarnesses also has factories for:
forKeyedProcessFunction(fn, keySelector, keyType) — KeyedProcessFunction
forCoProcessFunction(fn) — CoProcessFunction
forKeyedCoProcessFunction(fn, keySelector, keyType) — KeyedCoProcessFunction
Testing a KeyedProcessFunction with timers
KeyedProcessFunctionTimerTest.java
@Test
public void testEventTimeTimer() throws Exception {
MyKeyedProcessFunction fn = new MyKeyedProcessFunction();
KeyedOneInputStreamOperatorTestHarness<String, Event, Alert> harness =
ProcessFunctionTestHarnesses.forKeyedProcessFunction(
fn, Event::getUserId, Types.STRING
);
harness.open();
// Send an event — the function registers a timer 5 minutes from now
harness.processElement(new Event("user-1", 100L), 100L);
// Timer should not have fired yet
assertTrue(harness.extractOutputValues().isEmpty());
// Advance event time past the timer
harness.processWatermark(100L + 300_001L);
// Now the timer fires
List<Alert> alerts = harness.extractOutputValues();
assertEquals(1, alerts.size());
assertEquals("user-1", alerts.get(0).getUserId());
}
Integration testing with MiniCluster
For end-to-end tests that run a full Flink pipeline, use MiniClusterWithClientResource as a JUnit @ClassRule. It starts an embedded Flink cluster inside the test JVM.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
public class PipelineIntegrationTest {
@ClassRule
public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(2)
.build()
);
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectSink.values.clear();
env.fromData(1L, 21L, 22L)
.map(n -> n + 1)
.addSink(new CollectSink());
env.execute("increment-test");
assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
}
// A sink that collects results in a static list (static because Flink serializes sinks)
private static class CollectSink implements SinkFunction<Long> {
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, Context ctx) {
values.add(value);
}
}
}
Integration testing tips
Use @ClassRule instead of @Rule so all test methods in the class share the same Flink cluster. Starting and stopping a cluster is slow — sharing it across tests saves significant time.
- Always clear static collector state between tests (
CollectSink.values.clear()).
- Run tests with parallelism > 1 (
env.setParallelism(2)) to catch concurrency bugs.
- Make sources and sinks pluggable in your production code so you can inject test sources that produce deterministic data.
- To test failure recovery, throw an exception from a test-only UDF after a specific number of records. Use
env.enableCheckpointing(100) so a checkpoint is taken before the failure.
Testing event-time logic in integration tests
Supply a custom parallel source that emits watermarks alongside records:
EventTimeIntegrationTest.java
public class TimestampedSource extends RichParallelSourceFunction<Tuple2<String, Long>>
implements CheckpointedFunction {
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
ctx.collectWithTimestamp(Tuple2.of("a", 1000L), 1000L);
ctx.emitWatermark(new Watermark(1000L));
ctx.collectWithTimestamp(Tuple2.of("b", 2000L), 2000L);
ctx.emitWatermark(new Watermark(2000L));
// Signal end of stream
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
@Override
public void cancel() {}
}