Skip to main content
A data sink consumes a DataStream and writes records to an external system — a file system, message queue, database, or any other destination. Flink provides two sink APIs:
  • Sink API (FLIP-143, recommended): Supports stateful, exactly-once sinks with a clear separation of writing, committing, and global coordination. Use stream.sinkTo(sink).
  • Legacy SinkFunction API: The older SinkFunction / RichSinkFunction interfaces. Simpler but limited — no two-phase commit support, no participation in checkpointing unless you implement CheckpointedFunction separately.

Built-in sinks

print()

Prints every element’s toString() value to stdout (or stderr with printToErr()). Useful during development.
stream.print();

// With a prefix to identify the output when multiple streams are printed
stream.print("output");
When parallelism is greater than 1, the output is prefixed with the task index:
1> hello
2> world
1> flink

FileSink

FileSink is the recommended way to write to any file system (local, HDFS, S3). It participates in Flink checkpointing to provide exactly-once semantics. Row-format sink (one record per line):
FileSinkRowFormat.java
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;

FileSink<String> sink = FileSink
    .forRowFormat(
        new Path("/output/results"),
        new SimpleStringEncoder<String>("UTF-8")
    )
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build()
    )
    .build();

stream.sinkTo(sink);
Bulk-format sink (columnar formats like Parquet, ORC, Avro):
FileSinkBulkFormat.java
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

FileSink<MyRecord> sink = FileSink
    .forBulkFormat(
        new Path("s3://my-bucket/output"),
        AvroParquetWriters.forReflectRecord(MyRecord.class)
    )
    .build();

recordStream.sinkTo(sink);
With bulk formats, files are committed only on checkpoints. You must enable checkpointing for the sink to produce any output: env.enableCheckpointing(60000).

File naming and partitioning

By default, FileSink writes files into subdirectories named after the element’s processing time, e.g. 2024-01-15--12. You can supply a custom BucketAssigner:
CustomBucketAssigner.java
FileSink<LogEvent> sink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<LogEvent>())
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .build();

Writing a custom sink (Sink API)

The Sink API breaks sink logic into three optional components:
ComponentInterfaceWhen to implement
WriterSinkWriter<InputT>Always — converts input records to staged output
CommitterCommitter<CommT>For transactional sinks — commits staged data
Global committerGlobalCommitter<CommT, GlobalCommT>For sinks requiring a single coordinated commit

Simple (stateless) custom sink

For sinks that do not need transactions, implement just the SinkWriter:
MySimpleSink.java
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

public class MySimpleSink implements Sink<String> {

    @Override
    public SinkWriter<String> createWriter(InitContext context) {
        return new MySimpleWriter();
    }

    static class MySimpleWriter implements SinkWriter<String> {

        private final ExternalClient client = new ExternalClient();

        @Override
        public void write(String element, Context context) throws IOException {
            client.send(element);
        }

        @Override
        public void flush(boolean endOfInput) throws IOException {
            client.flush();
        }

        @Override
        public void close() {
            client.close();
        }
    }
}

stream.sinkTo(new MySimpleSink());

Exactly-once sink with two-phase commit

Two-phase commit (2PC) works alongside Flink checkpointing to guarantee exactly-once delivery. The protocol has three phases:
  1. Pre-commit (checkpoint): The writer stages records into a pending transaction and snapshots the transaction handle into state.
  2. Commit (checkpoint complete): After the checkpoint succeeds, all writers commit their transactions.
  3. Abort: If the checkpoint fails, pending transactions are aborted.
Flink’s TwoPhaseCommittingSink interface provides this structure:
ExactlyOnceSink.java
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;

public class ExactlyOnceDatabaseSink
        implements TwoPhaseCommittingSink<String, DatabaseTransaction> {

    @Override
    public PrecommittingSinkWriter<String, DatabaseTransaction> createWriter(InitContext ctx) {
        return new DatabaseWriter();
    }

    @Override
    public Committer<DatabaseTransaction> createCommitter() {
        return new DatabaseCommitter();
    }

    @Override
    public SimpleVersionedSerializer<DatabaseTransaction> getCommittableSerializer() {
        return new TransactionSerializer();
    }

    // --- Writer ---
    static class DatabaseWriter implements PrecommittingSinkWriter<String, DatabaseTransaction> {
        private DatabaseTransaction currentTx = new DatabaseTransaction();

        @Override
        public void write(String record, Context ctx) {
            currentTx.buffer(record);
        }

        @Override
        public Collection<DatabaseTransaction> prepareCommit() {
            DatabaseTransaction toCommit = currentTx;
            currentTx = new DatabaseTransaction();
            toCommit.prepare(); // write to staging area
            return Collections.singletonList(toCommit);
        }

        @Override
        public void flush(boolean endOfInput) {}

        @Override
        public void close() {
            currentTx.abort();
        }
    }

    // --- Committer ---
    static class DatabaseCommitter implements Committer<DatabaseTransaction> {
        @Override
        public void commit(Collection<CommitRequest<DatabaseTransaction>> requests)
                throws IOException {
            for (CommitRequest<DatabaseTransaction> req : requests) {
                try {
                    req.getCommittable().commit();
                    req.signalAlreadyCommitted();
                } catch (Exception e) {
                    req.signalFailedWithKnownReason(e);
                }
            }
        }

        @Override
        public void close() {}
    }
}

Enabling checkpointing for exactly-once sinks

Two-phase commit only works when checkpointing is enabled:
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Legacy SinkFunction

For backward compatibility, you can still use the SinkFunction interface:
LegacySinkFunction.java
public class StdoutSink<T> implements SinkFunction<T> {
    @Override
    public void invoke(T value, Context context) {
        System.out.println(value);
    }
}

stream.addSink(new StdoutSink<>());
SinkFunction-based sinks do not participate in Flink’s two-phase commit protocol unless they also implement CheckpointedFunction. For new sinks requiring exactly-once delivery, use the TwoPhaseCommittingSink interface.

Debugging note on write*() methods

DataStream.writeAsText(), writeAsCsv(), and similar methods exist only for quick debugging. They do not participate in checkpointing and have at-least-once semantics at best. Do not use them in production.
// Development only
stream.writeAsText("/tmp/debug-output");

// Production
stream.sinkTo(FileSink.forRowFormat(new Path("/output"), new SimpleStringEncoder<>()).build());

Build docs developers (and LLMs) love