Skip to main content
The Filesystem SQL connector reads and writes files on any filesystem supported by the Flink FileSystem abstraction. It is bundled with Flink—no additional connector dependency is required. You do need to include a format dependency for the specific file format you want to read or write.

Basic table definition

CREATE TABLE my_table (
  id         INT,
  name       STRING,
  amount     DOUBLE,
  dt         STRING,
  `hour`     STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector' = 'filesystem',
  'path'      = 'file:///data/my_table',
  'format'    = 'parquet'
)
The path option points to a directory, not a file. Flink manages part files inside that directory and you should not reference individual files directly.

Supported formats

FormatNotes
CSVRFC-4180
JSONNewline-delimited JSON (one JSON object per line), not a standard JSON array file.
AvroSupports compression via avro.codec.
ParquetCompatible with Hive.
ORCCompatible with Hive.
Debezium-JSONCDC format for change events from Debezium.
Canal-JSONCDC format for change events from Canal.
RawSingle raw bytes column.

Partitioning

Flink infers partitions from the directory structure using Hive-style partition naming (key=value). Partitions do not need to be pre-registered in a catalog. Example directory layout for a table partitioned by dt and hour:
/data/my_table/
└── dt=2024-01-15/
    ├── hour=10/
    │   ├── part-0.parquet
    │   └── part-1.parquet
    └── hour=11/
        └── part-0.parquet
Use INSERT OVERWRITE to replace a specific partition without touching others:
INSERT OVERWRITE my_table PARTITION (dt='2024-01-15', `hour`='10')
SELECT id, name, amount FROM corrections WHERE dt = '2024-01-15' AND h = '10';

Source

By default the source is bounded—it scans the path once and finishes. Enable continuous watching to treat it as an unbounded source:
CREATE TABLE streaming_input (
  id     INT,
  value  STRING
) WITH (
  'connector'              = 'filesystem',
  'path'                   = 's3://my-bucket/input/',
  'format'                 = 'json',
  'source.monitor-interval' = '30 s'
)
source.monitor-interval accepts any Duration value (e.g. 10 s, 1 min). When set, the source periodically re-scans the directory for new files. Each file is tracked by its path and processed exactly once.

Metadata columns

The filesystem source exposes read-only metadata:
KeyTypeDescription
file.pathSTRING NOT NULLFull path of the file.
file.nameSTRING NOT NULLFile name (last path component).
file.sizeBIGINT NOT NULLFile size in bytes.
file.modification-timeTIMESTAMP_LTZ(3) NOT NULLLast modification time.
CREATE TABLE files_with_metadata (
  value        STRING,
  `file.path`  STRING NOT NULL METADATA,
  `file.size`  BIGINT NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path'      = '/data/input/',
  'format'    = 'json'
)

Streaming sink

The filesystem connector supports streaming writes. For row-encoded formats (CSV, JSON), data is written continuously; for bulk-encoded formats (Parquet, ORC, Avro), files are finalized on each checkpoint.

Rolling policy options

OptionDefaultDescription
sink.rolling-policy.file-size128MBRoll the part file when it reaches this size.
sink.rolling-policy.rollover-interval30 minRoll the part file after it has been open for this duration.
sink.rolling-policy.check-interval1 minHow often to check time-based rolling conditions.

File compaction

Enable compaction to merge small part files generated at high checkpoint frequencies:
OptionDefaultDescription
auto-compactionfalseEnable automatic compaction of pending files after each checkpoint.
compaction.file-size(none)Target size for compacted files. Defaults to the rolling file size.

Partition commit

After writing a partition, you can notify downstream systems (such as Hive Metastore) that the partition is complete. Configure the trigger and policy:
CREATE TABLE output_table (
  user_id    STRING,
  amount     DOUBLE,
  dt         STRING,
  `hour`     STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'                         = 'filesystem',
  'path'                              = '/data/output/',
  'format'                            = 'parquet',
  'sink.partition-commit.trigger'     = 'process-time',
  'sink.partition-commit.delay'       = '1 h',
  'sink.partition-commit.policy.kind' = 'success-file'
)

Trigger types

TriggerDescription
process-timeCommits when system time passes partition creation time plus delay. No watermark required.
partition-timeCommits when the watermark passes the time extracted from partition values plus delay. Requires watermark generation and time-based partitioning.

Commit policies

PolicyDescription
success-fileWrites an empty _SUCCESS file in the partition directory.
metastoreAdds the partition to the Hive Metastore. Only works with Hive tables.

Full example: Kafka to filesystem

CREATE TABLE kafka_source (
  user_id    STRING,
  amount     DOUBLE,
  log_ts     TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'                       = 'json'
);

CREATE TABLE fs_sink (
  user_id STRING,
  amount  DOUBLE,
  dt      STRING,
  `hour`  STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'                         = 'filesystem',
  'path'                              = 's3://my-bucket/orders/',
  'format'                            = 'parquet',
  'sink.partition-commit.delay'       = '1 h',
  'sink.partition-commit.policy.kind' = 'success-file'
);

-- Streaming insert from Kafka into partitioned Parquet files
INSERT INTO fs_sink
SELECT
  user_id,
  amount,
  DATE_FORMAT(log_ts, 'yyyy-MM-dd') AS dt,
  DATE_FORMAT(log_ts, 'HH')         AS `hour`
FROM kafka_source;

-- Batch query with partition pruning
SELECT * FROM fs_sink WHERE dt = '2024-01-15' AND `hour` = '12';

Sink parallelism

You can set the parallelism of the file-writing operator independently of its upstream operators:
CREATE TABLE parallel_sink (
  id INT,
  val STRING
) WITH (
  'connector'      = 'filesystem',
  'path'           = '/data/parallel/',
  'format'         = 'csv',
  'sink.parallelism' = '4'
)
Configuring sink.parallelism is only supported when the upstream changelog mode is INSERT-ONLY.

Build docs developers (and LLMs) love