Skip to main content
The Filesystem connector provides a unified Source and Sink for BATCH and STREAMING execution modes. It reads or writes partitioned files on any filesystem supported by the Flink FileSystem abstraction, including POSIX, HDFS, S3, OSS, and ABFS. The connector provides exactly-once semantics for streaming execution when checkpointing is enabled.

Dependency

Add the following to your pom.xml:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-files</artifactId>
  <version>${flink.version}</version>
</dependency>

File Source

FileSource is built on the unified Source API. It splits work between two components:
  • SplitEnumerator: discovers files and assigns splits to readers
  • SourceReader: requests and reads assigned file splits

Bounded vs. unbounded streams

By default, FileSource operates in bounded mode—it enumerates all files once and then finishes. Call monitorContinuously(Duration) to switch to continuous streaming mode, where the enumerator periodically re-scans the paths for new files.
final FileSource<String> source = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path(inputPath))
    .build();

DataStreamSource<String> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "File Source"
);

Format types

FileSource accepts two kinds of format:

StreamFormat

StreamFormat reads the contents of a file record by record from an InputStream. It is the simplest format to implement and handles checkpointing automatically, but cannot apply optimizations like object reuse or batching. The built-in TextLineInputFormat is a StreamFormat that reads UTF-8 lines:
FileSource<String> source = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs"))
    .build();
You can implement SimpleStreamFormat for non-splittable formats:
private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
    private static final long serialVersionUID = 1L;

    @Override
    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
            throws IOException {
        return new ArrayReader(stream);
    }

    @Override
    public TypeInformation<byte[]> getProducedType() {
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }
}

FileSource<byte[]> source = FileSource
    .forRecordStreamFormat(new ArrayReaderFormat(), new Path("/data/binary"))
    .build();
For CSV files, use CsvReaderFormat which derives the schema from a POJO class:
CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source = FileSource
    .forRecordStreamFormat(csvFormat, Path.fromLocalFile(new File("/data/csv")))
    .build();
Add @JsonPropertyOrder({"field1", "field2", ...}) to your POJO class to ensure the field order matches the CSV column order.

BulkFormat

BulkFormat reads batches of records at a time. It is the lowest-level format interface and offers the most control over performance. Parquet and ORC use bulk formats. A SimpleStreamFormat can be wrapped in a StreamFormatAdapter to produce a BulkFormat:
BulkFormat<SomePojo, FileSourceSplit> bulkFormat =
    new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));

FileSource<SomePojo> source = FileSource
    .forBulkFileFormat(bulkFormat, new Path("/data/csv"))
    .build();

File Sink

FileSink writes incoming data into buckets (subdirectories) within a base output path. Data is organized into part files that are rolled according to configurable policies.
Checkpointing must be enabled when using FileSink in STREAMING mode. Part files can only be finalized on a successful checkpoint. With checkpointing disabled, part files remain in in-progress or pending state indefinitely and cannot be read by downstream systems.

Row-encoded formats

Row-encoded sinks serialize each record individually using an Encoder. Use FileSink.forRowFormat(basePath, encoder):
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

DataStream<String> input = ...;

final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
    .build();

input.sinkTo(sink);
This rolls the in-progress part file when any of these conditions is met:
  • The file contains at least 15 minutes of data
  • No new records have arrived in the last 5 minutes
  • The file size has reached 1 GB

Bulk-encoded formats

Bulk-encoded sinks serialize batches of records using a BulkWriter.Factory. Use FileSink.forBulkFormat(basePath, writerFactory).
Bulk formats can only use rolling policies that extend CheckpointRollingPolicy, which rolls on every checkpoint. You may add additional size- or time-based conditions on top.
Flink ships five built-in BulkWriter factories:
  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory

Writing Parquet

Add the dependency:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-parquet</artifactId>
  <version>${flink.version}</version>
</dependency>
Write Avro GenericRecord objects to Parquet:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.avro.Schema;

Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
    .forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema))
    .build();

input.sinkTo(sink);
Write Protobuf messages to Parquet:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

// ProtoRecord is a generated Protobuf Message class.
DataStream<ProtoRecord> input = ...;

final FileSink<ProtoRecord> sink = FileSink
    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
    .build();

input.sinkTo(sink);

Writing Avro

Add the dependency:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>${flink.version}</version>
</dependency>
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.avro.Schema;

Schema schema = ...;
DataStream<GenericRecord> input = ...;

final FileSink<GenericRecord> sink = FileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build();

input.sinkTo(sink);
To enable compression, provide a custom AvroBuilder:
AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
    Schema schema = ReflectData.get().getSchema(Address.class);
    DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
    DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
    dataFileWriter.setCodec(CodecFactory.snappyCodec());
    dataFileWriter.create(schema, out);
    return dataFileWriter;
});

DataStream<Address> stream = ...;
stream.sinkTo(FileSink.forBulkFormat(outputBasePath, factory).build());

Writing ORC

Add the dependency:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc</artifactId>
  <version>${flink.version}</version>
</dependency>
Implement a Vectorizer to convert your type to ORC’s VectorizedRowBatch:
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;

public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
    public PersonVectorizer(String schema) {
        super(schema);
    }

    @Override
    public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
        BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
        LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
        int row = batch.size++;
        nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
        ageColVector.vector[row] = element.getAge();
    }
}
Create the sink:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;

String schema = "struct<_col0:string,_col1:int>";
DataStream<Person> input = ...;

final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));

final FileSink<Person> sink = FileSink
    .forBulkFormat(outputBasePath, writerFactory)
    .build();

input.sinkTo(sink);

Bucket assignment

The BucketAssigner determines which subdirectory each record is written to. Both row and bulk format sinks use DateTimeBucketAssigner by default, which creates hourly buckets in the format yyyy-MM-dd--HH based on the system default timezone. Flink provides two built-in assigners:
AssignerDescription
DateTimeBucketAssignerDefault. Creates time-based subdirectories. Format and timezone are configurable.
BasePathBucketAssignerWrites all part files directly into the base path (single global bucket).
Specify a custom assigner with .withBucketAssigner(assigner):
FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withBucketAssigner(new BasePathBucketAssigner<>())
    .build();

Rolling policy

The RollingPolicy controls when an in-progress part file is closed and moved to the finished state. Only finished files are safe to read by downstream systems.
PolicyDescription
DefaultRollingPolicyRolls based on file size, open duration, or inactivity timeout. Used for row-encoded formats.
OnCheckpointRollingPolicyRolls on every checkpoint. Required for bulk-encoded formats.

Part file lifecycle

Part files transition through three states:
  1. In-progress: currently being written
  2. Pending: closed (rolled) but waiting for checkpoint commit
  3. Finished: committed on a successful checkpoint (streaming) or end of input (batch)
Only finished files are guaranteed to contain complete, non-reverted data.

File compaction

Since Flink 1.15, FileSink supports compacting pending files before committing them. This reduces the number of small files generated at high checkpoint frequencies:
FileSink<Integer> fileSink = FileSink
    .forRowFormat(new Path(path), new SimpleStringEncoder<Integer>())
    .enableCompact(
        FileCompactStrategy.Builder.newBuilder()
            .setSizeThreshold(1024)
            .enableCompactionOnCheckpoint(5)
            .build(),
        new RecordWiseFileCompactor<>(
            new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
    .build();
Once compaction is enabled, you must explicitly call .disableCompact() when building the sink if you want to turn it off later. Compaction increases the time between writing and visibility: approximately checkpoint interval + compaction time.

Important considerations

  • When using Hadoop versions earlier than 2.7, use OnCheckpointRollingPolicy. Earlier versions do not support the truncate() filesystem call that FileSink requires for recovery.
  • Upon normal job termination with a finite input stream, the last in-progress file is not finalized. This is expected behavior.
  • FileSink never overwrites committed data. Restoring from an old checkpoint that references a committed in-progress file will throw an exception.
  • FileSink supports HDFS, S3, OSS, ABFS, and local filesystems.
  • Use only the Hadoop-based S3 FileSystem implementation (s3a://), not the Presto-based one.
  • If you use Presto for checkpointing, use s3p:// for checkpoints and s3a:// for the sink path.
  • FileSink uses S3 Multipart Upload (MPU) for exactly-once semantics. If your bucket has an MPU abort lifecycle rule, set the timeout conservatively to avoid expiring uploads from long-running jobs.
  • The Committer operator always runs with parallelism 1, regardless of the Writer parallelism.
  • Pending files are committed after the entire input has been processed.
  • With High Availability enabled, a JobManager failure during commit may produce duplicate files.

Build docs developers (and LLMs) love