Side outputs let a single operator emit records to multiple output streams simultaneously. The main output stream carries the primary results, while one or more side outputs carry ancillary data — late events, rejected records, debug traces, or any secondary data that should flow separately.
Side outputs are typed independently of the main output. A ProcessFunction<Integer, Integer> can emit String errors to a side output while emitting Integer results to the main output.
OutputTag
An OutputTag is the identifier for a side output stream. It carries the element type as a generic type parameter. Define it as an anonymous class so Flink can extract the type information at compile time:
// OutputTag for String side output named "side-output"
OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};
// OutputTag for a custom type
OutputTag<ErrorEvent> errorTag = new OutputTag<ErrorEvent>("errors") {};
// OutputTag for late sensor readings
OutputTag<SensorReading> lateReadingsTag = new OutputTag<SensorReading>("late-readings") {};
The OutputTag must be an anonymous inner class (new OutputTag<T>("name") {}), not a plain new OutputTag<T>("name"). The anonymous class gives the Java compiler enough type information to capture the generic type at runtime.
Emitting to a side output
Side outputs can be emitted from:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
All of these expose a Context object whose output(OutputTag, value) method emits to the named side output:
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
DataStream<Integer> input = env.fromData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
final OutputTag<String> largeValuesTag = new OutputTag<String>("large-values") {};
SingleOutputStreamOperator<Integer> mainOutput = input.process(
new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
if (value > 5) {
// Emit to side output
ctx.output(largeValuesTag, "Large value: " + value);
} else {
// Emit to main output
out.collect(value);
}
}
}
);
Retrieving a side output stream
Call getSideOutput(OutputTag) on the SingleOutputStreamOperator returned by the process function. Each call returns a fully typed DataStream:
// Main output: small values (1–5)
DataStream<Integer> smallValues = mainOutput;
// Side output: string descriptions of large values (6–10)
DataStream<String> largeValues = mainOutput.getSideOutput(largeValuesTag);
smallValues.print("small");
largeValues.print("large");
Handling late events
The most common production use of side outputs is capturing late window events. When you set allowedLateness on a window, elements that arrive after the window closes are silently dropped by default. Route them to a side output instead:
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import java.time.Duration;
DataStream<SensorReading> readings = env.fromSource(
source,
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((r, ts) -> r.getTimestamp()),
"sensors"
);
OutputTag<SensorReading> lateReadings = new OutputTag<SensorReading>("late-readings") {};
SingleOutputStreamOperator<AggregatedReading> aggregated = readings
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
.allowedLateness(Duration.ofSeconds(30))
.sideOutputLateData(lateReadings)
.aggregate(new AverageAggregator());
// Main output: on-time aggregations
aggregated.sinkTo(resultSink);
// Side output: late events for auditing or reprocessing
aggregated.getSideOutput(lateReadings)
.map(r -> "LATE: " + r)
.sinkTo(auditSink);
Multiple side outputs
A single ProcessFunction can emit to multiple side outputs:
OutputTag<Transaction> fraudTag = new OutputTag<Transaction>("fraud") {};
OutputTag<Transaction> reviewTag = new OutputTag<Transaction>("needs-review") {};
SingleOutputStreamOperator<Transaction> processed = transactions.process(
new ProcessFunction<Transaction, Transaction>() {
@Override
public void processElement(Transaction tx, Context ctx, Collector<Transaction> out) {
if (tx.getAmount() > 10_000) {
ctx.output(fraudTag, tx);
} else if (tx.getAmount() > 1_000) {
ctx.output(reviewTag, tx);
} else {
out.collect(tx); // normal transaction → main output
}
}
}
);
DataStream<Transaction> normalTx = processed;
DataStream<Transaction> fraudTx = processed.getSideOutput(fraudTag);
DataStream<Transaction> reviewTx = processed.getSideOutput(reviewTag);
Side outputs in window functions
ProcessWindowFunction and ProcessAllWindowFunction also support side outputs through their Context:
windowedStream.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
private final OutputTag<Event> anomalyTag = new OutputTag<Event>("anomalies") {};
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
List<Event> list = new ArrayList<>();
for (Event e : events) {
if (e.isAnomaly()) {
ctx.output(anomalyTag, e);
} else {
list.add(e);
}
}
out.collect(aggregate(key, list));
}
});
Side outputs are zero-copy within the same task. The only overhead is the tag lookup and the additional type serialization if the side output data is sent to a different downstream operator (which may be on a different TaskManager). For high-throughput side outputs, make sure you set appropriate parallelism on the downstream consumer.