A data sink consumes a DataStream and writes records to an external system ā a file system, message queue, database, or any other destination.
Flink provides two sink APIs:
- Sink API (FLIP-143, recommended): Supports stateful, exactly-once sinks with a clear separation of writing, committing, and global coordination. Use
stream.sinkTo(sink).
- Legacy SinkFunction API: The older
SinkFunction / RichSinkFunction interfaces. Simpler but limited ā no two-phase commit support, no participation in checkpointing unless you implement CheckpointedFunction separately.
Built-in sinks
print()
Prints every elementās toString() value to stdout (or stderr with printToErr()). Useful during development.
stream.print();
// With a prefix to identify the output when multiple streams are printed
stream.print("output");
When parallelism is greater than 1, the output is prefixed with the task index:
1> hello
2> world
1> flink
FileSink
FileSink is the recommended way to write to any file system (local, HDFS, S3). It participates in Flink checkpointing to provide exactly-once semantics.
Row-format sink (one record per line):
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
FileSink<String> sink = FileSink
.forRowFormat(
new Path("/output/results"),
new SimpleStringEncoder<String>("UTF-8")
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build()
)
.build();
stream.sinkTo(sink);
Bulk-format sink (columnar formats like Parquet, ORC, Avro):
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
FileSink<MyRecord> sink = FileSink
.forBulkFormat(
new Path("s3://my-bucket/output"),
AvroParquetWriters.forReflectRecord(MyRecord.class)
)
.build();
recordStream.sinkTo(sink);
With bulk formats, files are committed only on checkpoints. You must enable checkpointing for the sink to produce any output: env.enableCheckpointing(60000).
File naming and partitioning
By default, FileSink writes files into subdirectories named after the elementās processing time, e.g. 2024-01-15--12. You can supply a custom BucketAssigner:
CustomBucketAssigner.java
FileSink<LogEvent> sink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<LogEvent>())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.build();
Writing a custom sink (Sink API)
The Sink API breaks sink logic into three optional components:
| Component | Interface | When to implement |
|---|
| Writer | SinkWriter<InputT> | Always ā converts input records to staged output |
| Committer | Committer<CommT> | For transactional sinks ā commits staged data |
| Global committer | GlobalCommitter<CommT, GlobalCommT> | For sinks requiring a single coordinated commit |
Simple (stateless) custom sink
For sinks that do not need transactions, implement just the SinkWriter:
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
public class MySimpleSink implements Sink<String> {
@Override
public SinkWriter<String> createWriter(InitContext context) {
return new MySimpleWriter();
}
static class MySimpleWriter implements SinkWriter<String> {
private final ExternalClient client = new ExternalClient();
@Override
public void write(String element, Context context) throws IOException {
client.send(element);
}
@Override
public void flush(boolean endOfInput) throws IOException {
client.flush();
}
@Override
public void close() {
client.close();
}
}
}
stream.sinkTo(new MySimpleSink());
Exactly-once sink with two-phase commit
Two-phase commit (2PC) works alongside Flink checkpointing to guarantee exactly-once delivery. The protocol has three phases:
- Pre-commit (checkpoint): The writer stages records into a pending transaction and snapshots the transaction handle into state.
- Commit (checkpoint complete): After the checkpoint succeeds, all writers commit their transactions.
- Abort: If the checkpoint fails, pending transactions are aborted.
Flinkās TwoPhaseCommittingSink interface provides this structure:
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
public class ExactlyOnceDatabaseSink
implements TwoPhaseCommittingSink<String, DatabaseTransaction> {
@Override
public PrecommittingSinkWriter<String, DatabaseTransaction> createWriter(InitContext ctx) {
return new DatabaseWriter();
}
@Override
public Committer<DatabaseTransaction> createCommitter() {
return new DatabaseCommitter();
}
@Override
public SimpleVersionedSerializer<DatabaseTransaction> getCommittableSerializer() {
return new TransactionSerializer();
}
// --- Writer ---
static class DatabaseWriter implements PrecommittingSinkWriter<String, DatabaseTransaction> {
private DatabaseTransaction currentTx = new DatabaseTransaction();
@Override
public void write(String record, Context ctx) {
currentTx.buffer(record);
}
@Override
public Collection<DatabaseTransaction> prepareCommit() {
DatabaseTransaction toCommit = currentTx;
currentTx = new DatabaseTransaction();
toCommit.prepare(); // write to staging area
return Collections.singletonList(toCommit);
}
@Override
public void flush(boolean endOfInput) {}
@Override
public void close() {
currentTx.abort();
}
}
// --- Committer ---
static class DatabaseCommitter implements Committer<DatabaseTransaction> {
@Override
public void commit(Collection<CommitRequest<DatabaseTransaction>> requests)
throws IOException {
for (CommitRequest<DatabaseTransaction> req : requests) {
try {
req.getCommittable().commit();
req.signalAlreadyCommitted();
} catch (Exception e) {
req.signalFailedWithKnownReason(e);
}
}
}
@Override
public void close() {}
}
}
Enabling checkpointing for exactly-once sinks
Two-phase commit only works when checkpointing is enabled:
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Legacy SinkFunction
For backward compatibility, you can still use the SinkFunction interface:
public class StdoutSink<T> implements SinkFunction<T> {
@Override
public void invoke(T value, Context context) {
System.out.println(value);
}
}
stream.addSink(new StdoutSink<>());
SinkFunction-based sinks do not participate in Flinkās two-phase commit protocol unless they also implement CheckpointedFunction. For new sinks requiring exactly-once delivery, use the TwoPhaseCommittingSink interface.
Debugging note on write*() methods
DataStream.writeAsText(), writeAsCsv(), and similar methods exist only for quick debugging. They do not participate in checkpointing and have at-least-once semantics at best. Do not use them in production.
// Development only
stream.writeAsText("/tmp/debug-output");
// Production
stream.sinkTo(FileSink.forRowFormat(new Path("/output"), new SimpleStringEncoder<>()).build());