Read and write files on any Flink-supported filesystem using FileSource and FileSink with row and bulk encoding formats.
The Filesystem connector provides a unified Source and Sink for BATCH and STREAMING execution modes. It reads or writes partitioned files on any filesystem supported by the Flink FileSystem abstraction, including POSIX, HDFS, S3, OSS, and ABFS.The connector provides exactly-once semantics for streaming execution when checkpointing is enabled.
By default, FileSource operates in bounded mode—it enumerates all files once and then finishes. Call monitorContinuously(Duration) to switch to continuous streaming mode, where the enumerator periodically re-scans the paths for new files.
Bounded (batch)
Unbounded (streaming)
final FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineInputFormat(), new Path(inputPath)) .build();DataStreamSource<String> stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), "File Source");
final FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineInputFormat(), new Path(inputPath)) .monitorContinuously(Duration.ofSeconds(10)) .build();DataStreamSource<String> stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), "File Source");
StreamFormat reads the contents of a file record by record from an InputStream. It is the simplest format to implement and handles checkpointing automatically, but cannot apply optimizations like object reuse or batching.The built-in TextLineInputFormat is a StreamFormat that reads UTF-8 lines:
FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs")) .build();
You can implement SimpleStreamFormat for non-splittable formats:
private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> { private static final long serialVersionUID = 1L; @Override public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream) throws IOException { return new ArrayReader(stream); } @Override public TypeInformation<byte[]> getProducedType() { return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; }}FileSource<byte[]> source = FileSource .forRecordStreamFormat(new ArrayReaderFormat(), new Path("/data/binary")) .build();
For CSV files, use CsvReaderFormat which derives the schema from a POJO class:
BulkFormat reads batches of records at a time. It is the lowest-level format interface and offers the most control over performance. Parquet and ORC use bulk formats.A SimpleStreamFormat can be wrapped in a StreamFormatAdapter to produce a BulkFormat:
BulkFormat<SomePojo, FileSourceSplit> bulkFormat = new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));FileSource<SomePojo> source = FileSource .forBulkFileFormat(bulkFormat, new Path("/data/csv")) .build();
FileSink writes incoming data into buckets (subdirectories) within a base output path. Data is organized into part files that are rolled according to configurable policies.
Checkpointing must be enabled when using FileSink in STREAMING mode. Part files can only be finalized on a successful checkpoint. With checkpointing disabled, part files remain in in-progress or pending state indefinitely and cannot be read by downstream systems.
Bulk-encoded sinks serialize batches of records using a BulkWriter.Factory. Use FileSink.forBulkFormat(basePath, writerFactory).
Bulk formats can only use rolling policies that extend CheckpointRollingPolicy, which rolls on every checkpoint. You may add additional size- or time-based conditions on top.
The BucketAssigner determines which subdirectory each record is written to. Both row and bulk format sinks use DateTimeBucketAssigner by default, which creates hourly buckets in the format yyyy-MM-dd--HH based on the system default timezone.Flink provides two built-in assigners:
Assigner
Description
DateTimeBucketAssigner
Default. Creates time-based subdirectories. Format and timezone are configurable.
BasePathBucketAssigner
Writes all part files directly into the base path (single global bucket).
Specify a custom assigner with .withBucketAssigner(assigner):
The RollingPolicy controls when an in-progress part file is closed and moved to the finished state. Only finished files are safe to read by downstream systems.
Policy
Description
DefaultRollingPolicy
Rolls based on file size, open duration, or inactivity timeout. Used for row-encoded formats.
OnCheckpointRollingPolicy
Rolls on every checkpoint. Required for bulk-encoded formats.
Since Flink 1.15, FileSink supports compacting pending files before committing them. This reduces the number of small files generated at high checkpoint frequencies:
FileSink<Integer> fileSink = FileSink .forRowFormat(new Path(path), new SimpleStringEncoder<Integer>()) .enableCompact( FileCompactStrategy.Builder.newBuilder() .setSizeThreshold(1024) .enableCompactionOnCheckpoint(5) .build(), new RecordWiseFileCompactor<>( new DecoderBasedReader.Factory<>(SimpleStringDecoder::new))) .build();
Once compaction is enabled, you must explicitly call .disableCompact() when building the sink if you want to turn it off later. Compaction increases the time between writing and visibility: approximately checkpoint interval + compaction time.
When using Hadoop versions earlier than 2.7, use OnCheckpointRollingPolicy. Earlier versions do not support the truncate() filesystem call that FileSink requires for recovery.
Upon normal job termination with a finite input stream, the last in-progress file is not finalized. This is expected behavior.
FileSink never overwrites committed data. Restoring from an old checkpoint that references a committed in-progress file will throw an exception.
FileSink supports HDFS, S3, OSS, ABFS, and local filesystems.
S3-specific
Use only the Hadoop-based S3 FileSystem implementation (s3a://), not the Presto-based one.
If you use Presto for checkpointing, use s3p:// for checkpoints and s3a:// for the sink path.
FileSink uses S3 Multipart Upload (MPU) for exactly-once semantics. If your bucket has an MPU abort lifecycle rule, set the timeout conservatively to avoid expiring uploads from long-running jobs.
Batch mode
The Committer operator always runs with parallelism 1, regardless of the Writer parallelism.
Pending files are committed after the entire input has been processed.
With High Availability enabled, a JobManager failure during commit may produce duplicate files.