Skip to main content
Side outputs let a single operator emit records to multiple output streams simultaneously. The main output stream carries the primary results, while one or more side outputs carry ancillary data — late events, rejected records, debug traces, or any secondary data that should flow separately. Side outputs are typed independently of the main output. A ProcessFunction<Integer, Integer> can emit String errors to a side output while emitting Integer results to the main output.

OutputTag

An OutputTag is the identifier for a side output stream. It carries the element type as a generic type parameter. Define it as an anonymous class so Flink can extract the type information at compile time:
// OutputTag for String side output named "side-output"
OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};

// OutputTag for a custom type
OutputTag<ErrorEvent> errorTag = new OutputTag<ErrorEvent>("errors") {};

// OutputTag for late sensor readings
OutputTag<SensorReading> lateReadingsTag = new OutputTag<SensorReading>("late-readings") {};
The OutputTag must be an anonymous inner class (new OutputTag<T>("name") {}), not a plain new OutputTag<T>("name"). The anonymous class gives the Java compiler enough type information to capture the generic type at runtime.

Emitting to a side output

Side outputs can be emitted from:
  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction
All of these expose a Context object whose output(OutputTag, value) method emits to the named side output:
SideOutputExample.java
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

DataStream<Integer> input = env.fromData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

final OutputTag<String> largeValuesTag = new OutputTag<String>("large-values") {};

SingleOutputStreamOperator<Integer> mainOutput = input.process(
    new ProcessFunction<Integer, Integer>() {
        @Override
        public void processElement(
                Integer value,
                Context ctx,
                Collector<Integer> out) throws Exception {

            if (value > 5) {
                // Emit to side output
                ctx.output(largeValuesTag, "Large value: " + value);
            } else {
                // Emit to main output
                out.collect(value);
            }
        }
    }
);

Retrieving a side output stream

Call getSideOutput(OutputTag) on the SingleOutputStreamOperator returned by the process function. Each call returns a fully typed DataStream:
// Main output: small values (1–5)
DataStream<Integer> smallValues = mainOutput;

// Side output: string descriptions of large values (6–10)
DataStream<String> largeValues = mainOutput.getSideOutput(largeValuesTag);

smallValues.print("small");
largeValues.print("large");

Handling late events

The most common production use of side outputs is capturing late window events. When you set allowedLateness on a window, elements that arrive after the window closes are silently dropped by default. Route them to a side output instead:
LateEventHandling.java
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import java.time.Duration;

DataStream<SensorReading> readings = env.fromSource(
    source,
    WatermarkStrategy
        .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((r, ts) -> r.getTimestamp()),
    "sensors"
);

OutputTag<SensorReading> lateReadings = new OutputTag<SensorReading>("late-readings") {};

SingleOutputStreamOperator<AggregatedReading> aggregated = readings
    .keyBy(SensorReading::getSensorId)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
    .allowedLateness(Duration.ofSeconds(30))
    .sideOutputLateData(lateReadings)
    .aggregate(new AverageAggregator());

// Main output: on-time aggregations
aggregated.sinkTo(resultSink);

// Side output: late events for auditing or reprocessing
aggregated.getSideOutput(lateReadings)
          .map(r -> "LATE: " + r)
          .sinkTo(auditSink);

Multiple side outputs

A single ProcessFunction can emit to multiple side outputs:
MultiSideOutput.java
OutputTag<Transaction> fraudTag  = new OutputTag<Transaction>("fraud") {};
OutputTag<Transaction> reviewTag = new OutputTag<Transaction>("needs-review") {};

SingleOutputStreamOperator<Transaction> processed = transactions.process(
    new ProcessFunction<Transaction, Transaction>() {
        @Override
        public void processElement(Transaction tx, Context ctx, Collector<Transaction> out) {
            if (tx.getAmount() > 10_000) {
                ctx.output(fraudTag, tx);
            } else if (tx.getAmount() > 1_000) {
                ctx.output(reviewTag, tx);
            } else {
                out.collect(tx); // normal transaction → main output
            }
        }
    }
);

DataStream<Transaction> normalTx  = processed;
DataStream<Transaction> fraudTx   = processed.getSideOutput(fraudTag);
DataStream<Transaction> reviewTx  = processed.getSideOutput(reviewTag);

Side outputs in window functions

ProcessWindowFunction and ProcessAllWindowFunction also support side outputs through their Context:
WindowSideOutput.java
windowedStream.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
    private final OutputTag<Event> anomalyTag = new OutputTag<Event>("anomalies") {};

    @Override
    public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
        List<Event> list = new ArrayList<>();
        for (Event e : events) {
            if (e.isAnomaly()) {
                ctx.output(anomalyTag, e);
            } else {
                list.add(e);
            }
        }
        out.collect(aggregate(key, list));
    }
});

Performance considerations

Side outputs are zero-copy within the same task. The only overhead is the tag lookup and the additional type serialization if the side output data is sent to a different downstream operator (which may be on a different TaskManager). For high-throughput side outputs, make sure you set appropriate parallelism on the downstream consumer.

Build docs developers (and LLMs) love