Skip to main content
Flink supports testing at multiple levels:
  1. Unit tests for stateless functions — no Flink runtime required.
  2. Test harness tests for stateful operators and functions that use timers, keyed state, or watermarks.
  3. Integration tests using a local MiniCluster that runs an embedded Flink cluster inside the test JVM.

Unit testing stateless functions

Simple functions that contain no Flink state or timers can be tested as plain Java:
IncrementMapFunctionTest.java
public class IncrementMapFunction implements MapFunction<Long, Long> {
    @Override
    public Long map(Long value) {
        return value + 1;
    }
}

// Test
public class IncrementMapFunctionTest {
    @Test
    public void testIncrement() throws Exception {
        IncrementMapFunction fn = new IncrementMapFunction();
        assertEquals(3L, (long) fn.map(2L));
    }
}
For functions that use a Collector (e.g., FlatMapFunction, ProcessFunction), mock the collector with a test framework like Mockito:
FlatMapTest.java
public class IncrementFlatMapFunction implements FlatMapFunction<Long, Long> {
    @Override
    public void flatMap(Long value, Collector<Long> out) {
        out.collect(value + 1);
    }
}

// Test
public class IncrementFlatMapFunctionTest {
    @Test
    public void testIncrement() throws Exception {
        IncrementFlatMapFunction fn = new IncrementFlatMapFunction();
        Collector<Long> collector = mock(Collector.class);

        fn.flatMap(2L, collector);

        verify(collector, times(1)).collect(3L);
    }
}

Test harnesses for stateful operators

Functions that use managed state, timers, or event-time logic require a Flink runtime context to run. Flink provides test harnesses that wrap operators and simulate the runtime:
HarnessUse for
OneInputStreamOperatorTestHarnessOperators on DataStream
KeyedOneInputStreamOperatorTestHarnessOperators on KeyedStream
TwoInputStreamOperatorTestHarnessOperators on ConnectedStream
KeyedTwoInputStreamOperatorTestHarnessKeyed connected streams
Add the test dependency to your build:
pom.xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>${flink.version}</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <classifier>tests</classifier>
    <scope>test</scope>
</dependency>

Testing a stateful operator

StatefulFlatMapTest.java
import org.apache.flink.streaming.runtime.operators.StreamFlatMap;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

public class StatefulFlatMapTest {
    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
    private StatefulFlatMapFunction function;

    @Before
    public void setup() throws Exception {
        function = new StatefulFlatMapFunction();

        // Wrap the function in its corresponding operator
        testHarness = new OneInputStreamOperatorTestHarness<>(
            new StreamFlatMap<>(function)
        );

        // Configure execution settings if needed
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

        // open() is called here, which also calls open() on RichFunctions
        testHarness.open();
    }

    @Test
    public void testStateAccumulation() throws Exception {
        // Push elements with timestamps
        testHarness.processElement(2L, 100L);
        testHarness.processElement(3L, 200L);

        // Advance event time by emitting a watermark
        testHarness.processWatermark(150L);

        // Advance processing time
        testHarness.setProcessingTime(500L);

        // Inspect output
        List<Long> output = testHarness.extractOutputValues();
        assertThat(output, containsInAnyOrder(3L, 4L));
    }

    @After
    public void teardown() throws Exception {
        testHarness.close();
    }
}

Testing a keyed operator

For keyed operators, provide a KeySelector and the key’s TypeInformation:
KeyedOperatorTest.java
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.api.common.typeinfo.Types;

KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Long>, Long> harness =
    new KeyedOneInputStreamOperatorTestHarness<>(
        new StreamFlatMap<>(new MyKeyedFlatMap()),
        tuple -> tuple.f0,  // key selector
        Types.STRING        // key type information
    );

harness.open();

harness.processElement(new Tuple2<>("user-1", 10L), 100L);
harness.processElement(new Tuple2<>("user-2", 20L), 100L);
harness.processElement(new Tuple2<>("user-1", 15L), 200L);

// Trigger event-time timers
harness.processWatermark(300L);

List<Long> results = harness.extractOutputValues();

What the test harness gives you

  • processElement(element, timestamp) — injects a record into the operator
  • processWatermark(timestamp) — advances event time and fires event-time timers
  • setProcessingTime(timestamp) — advances processing time and fires processing-time timers
  • getOutput() — returns all emitted records as a Queue
  • extractOutputValues() — returns output values as a List
  • getSideOutput(outputTag) — returns records emitted to a side output

ProcessFunctionTestHarnesses

For ProcessFunction and its variants, Flink provides a factory that simplifies harness setup:
ProcessFunctionHarnessTest.java
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;

public class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
    @Override
    public void processElement(Integer value, Context ctx, Collector<Integer> out) {
        out.collect(value);
    }
}

// Test
@Test
public void testPassThrough() throws Exception {
    PassThroughProcessFunction fn = new PassThroughProcessFunction();

    OneInputStreamOperatorTestHarness<Integer, Integer> harness =
        ProcessFunctionTestHarnesses.forProcessFunction(fn);

    harness.processElement(1, 10L);
    harness.processElement(2, 20L);

    assertEquals(Arrays.asList(1, 2), harness.extractOutputValues());
}
ProcessFunctionTestHarnesses also has factories for:
  • forKeyedProcessFunction(fn, keySelector, keyType)KeyedProcessFunction
  • forCoProcessFunction(fn)CoProcessFunction
  • forKeyedCoProcessFunction(fn, keySelector, keyType)KeyedCoProcessFunction

Testing a KeyedProcessFunction with timers

KeyedProcessFunctionTimerTest.java
@Test
public void testEventTimeTimer() throws Exception {
    MyKeyedProcessFunction fn = new MyKeyedProcessFunction();

    KeyedOneInputStreamOperatorTestHarness<String, Event, Alert> harness =
        ProcessFunctionTestHarnesses.forKeyedProcessFunction(
            fn, Event::getUserId, Types.STRING
        );

    harness.open();

    // Send an event — the function registers a timer 5 minutes from now
    harness.processElement(new Event("user-1", 100L), 100L);

    // Timer should not have fired yet
    assertTrue(harness.extractOutputValues().isEmpty());

    // Advance event time past the timer
    harness.processWatermark(100L + 300_001L);

    // Now the timer fires
    List<Alert> alerts = harness.extractOutputValues();
    assertEquals(1, alerts.size());
    assertEquals("user-1", alerts.get(0).getUserId());
}

Integration testing with MiniCluster

For end-to-end tests that run a full Flink pipeline, use MiniClusterWithClientResource as a JUnit @ClassRule. It starts an embedded Flink cluster inside the test JVM.
pom.xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>${flink.version}</version>
    <scope>test</scope>
</dependency>
IntegrationTest.java
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

public class PipelineIntegrationTest {

    @ClassRule
    public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberTaskManagers(1)
            .setNumberSlotsPerTaskManager(2)
            .build()
    );

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        CollectSink.values.clear();

        env.fromData(1L, 21L, 22L)
           .map(n -> n + 1)
           .addSink(new CollectSink());

        env.execute("increment-test");

        assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
    }

    // A sink that collects results in a static list (static because Flink serializes sinks)
    private static class CollectSink implements SinkFunction<Long> {
        public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Long value, Context ctx) {
            values.add(value);
        }
    }
}

Integration testing tips

Use @ClassRule instead of @Rule so all test methods in the class share the same Flink cluster. Starting and stopping a cluster is slow — sharing it across tests saves significant time.
  • Always clear static collector state between tests (CollectSink.values.clear()).
  • Run tests with parallelism > 1 (env.setParallelism(2)) to catch concurrency bugs.
  • Make sources and sinks pluggable in your production code so you can inject test sources that produce deterministic data.
  • To test failure recovery, throw an exception from a test-only UDF after a specific number of records. Use env.enableCheckpointing(100) so a checkpoint is taken before the failure.

Testing event-time logic in integration tests

Supply a custom parallel source that emits watermarks alongside records:
EventTimeIntegrationTest.java
public class TimestampedSource extends RichParallelSourceFunction<Tuple2<String, Long>>
        implements CheckpointedFunction {

    @Override
    public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
        ctx.collectWithTimestamp(Tuple2.of("a", 1000L), 1000L);
        ctx.emitWatermark(new Watermark(1000L));

        ctx.collectWithTimestamp(Tuple2.of("b", 2000L), 2000L);
        ctx.emitWatermark(new Watermark(2000L));

        // Signal end of stream
        ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
    }

    @Override
    public void cancel() {}
}

Build docs developers (and LLMs) love