The Filesystem SQL connector reads and writes files on any filesystem supported by the Flink FileSystem abstraction. It is bundled with Flink—no additional connector dependency is required. You do need to include a format dependency for the specific file format you want to read or write.
Basic table definition
CREATE TABLE my_table (
id INT,
name STRING,
amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/my_table',
'format' = 'parquet'
)
The path option points to a directory, not a file. Flink manages part files inside that directory and you should not reference individual files directly.
| Format | Notes |
|---|
| CSV | RFC-4180 |
| JSON | Newline-delimited JSON (one JSON object per line), not a standard JSON array file. |
| Avro | Supports compression via avro.codec. |
| Parquet | Compatible with Hive. |
| ORC | Compatible with Hive. |
| Debezium-JSON | CDC format for change events from Debezium. |
| Canal-JSON | CDC format for change events from Canal. |
| Raw | Single raw bytes column. |
Partitioning
Flink infers partitions from the directory structure using Hive-style partition naming (key=value). Partitions do not need to be pre-registered in a catalog.
Example directory layout for a table partitioned by dt and hour:
/data/my_table/
└── dt=2024-01-15/
├── hour=10/
│ ├── part-0.parquet
│ └── part-1.parquet
└── hour=11/
└── part-0.parquet
Use INSERT OVERWRITE to replace a specific partition without touching others:
INSERT OVERWRITE my_table PARTITION (dt='2024-01-15', `hour`='10')
SELECT id, name, amount FROM corrections WHERE dt = '2024-01-15' AND h = '10';
Source
By default the source is bounded—it scans the path once and finishes. Enable continuous watching to treat it as an unbounded source:
CREATE TABLE streaming_input (
id INT,
value STRING
) WITH (
'connector' = 'filesystem',
'path' = 's3://my-bucket/input/',
'format' = 'json',
'source.monitor-interval' = '30 s'
)
source.monitor-interval accepts any Duration value (e.g. 10 s, 1 min). When set, the source periodically re-scans the directory for new files. Each file is tracked by its path and processed exactly once.
The filesystem source exposes read-only metadata:
| Key | Type | Description |
|---|
file.path | STRING NOT NULL | Full path of the file. |
file.name | STRING NOT NULL | File name (last path component). |
file.size | BIGINT NOT NULL | File size in bytes. |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | Last modification time. |
CREATE TABLE files_with_metadata (
value STRING,
`file.path` STRING NOT NULL METADATA,
`file.size` BIGINT NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = '/data/input/',
'format' = 'json'
)
Streaming sink
The filesystem connector supports streaming writes. For row-encoded formats (CSV, JSON), data is written continuously; for bulk-encoded formats (Parquet, ORC, Avro), files are finalized on each checkpoint.
Rolling policy options
| Option | Default | Description |
|---|
sink.rolling-policy.file-size | 128MB | Roll the part file when it reaches this size. |
sink.rolling-policy.rollover-interval | 30 min | Roll the part file after it has been open for this duration. |
sink.rolling-policy.check-interval | 1 min | How often to check time-based rolling conditions. |
File compaction
Enable compaction to merge small part files generated at high checkpoint frequencies:
| Option | Default | Description |
|---|
auto-compaction | false | Enable automatic compaction of pending files after each checkpoint. |
compaction.file-size | (none) | Target size for compacted files. Defaults to the rolling file size. |
Partition commit
After writing a partition, you can notify downstream systems (such as Hive Metastore) that the partition is complete. Configure the trigger and policy:
Process-time trigger
Partition-time trigger
CREATE TABLE output_table (
user_id STRING,
amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = '/data/output/',
'format' = 'parquet',
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'success-file'
)
CREATE TABLE output_table_wm (
user_id STRING,
amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = '/data/output_wm/',
'format' = 'parquet',
'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.watermark-time-zone' = 'UTC',
'sink.partition-commit.policy.kind' = 'success-file'
)
Trigger types
| Trigger | Description |
|---|
process-time | Commits when system time passes partition creation time plus delay. No watermark required. |
partition-time | Commits when the watermark passes the time extracted from partition values plus delay. Requires watermark generation and time-based partitioning. |
Commit policies
| Policy | Description |
|---|
success-file | Writes an empty _SUCCESS file in the partition directory. |
metastore | Adds the partition to the Hive Metastore. Only works with Hive tables. |
Full example: Kafka to filesystem
CREATE TABLE kafka_source (
user_id STRING,
amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
CREATE TABLE fs_sink (
user_id STRING,
amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 's3://my-bucket/orders/',
'format' = 'parquet',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'success-file'
);
-- Streaming insert from Kafka into partitioned Parquet files
INSERT INTO fs_sink
SELECT
user_id,
amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd') AS dt,
DATE_FORMAT(log_ts, 'HH') AS `hour`
FROM kafka_source;
-- Batch query with partition pruning
SELECT * FROM fs_sink WHERE dt = '2024-01-15' AND `hour` = '12';
Sink parallelism
You can set the parallelism of the file-writing operator independently of its upstream operators:
CREATE TABLE parallel_sink (
id INT,
val STRING
) WITH (
'connector' = 'filesystem',
'path' = '/data/parallel/',
'format' = 'csv',
'sink.parallelism' = '4'
)
Configuring sink.parallelism is only supported when the upstream changelog mode is INSERT-ONLY.