Apache Parquet is a columnar binary storage format that provides efficient compression and fast read performance, particularly for analytical queries that access subsets of columns. Flink’s Parquet format is compatible with Apache Hive.
Parquet is a serialization schema (for sinks) and a deserialization schema (for sources).
Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
Usage with Table API / SQL
Filesystem source and sink
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/tmp/user_behavior',
'format' = 'parquet'
)
With Snappy compression:
CREATE TABLE compressed_events (
id BIGINT,
value STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/data/events/',
'format' = 'parquet',
'parquet.compression' = 'SNAPPY'
)
Streaming insert into partitioned Parquet table
INSERT INTO user_behavior
SELECT
user_id, item_id, category_id, behavior, ts,
DATE_FORMAT(ts, 'yyyy-MM-dd') AS dt
FROM kafka_source;
For bulk-encoded formats like Parquet, files are finalized on each Flink checkpoint. Set your checkpoint interval according to your target file size and latency requirements.
| Option | Required | Default | Description |
|---|
format | Yes | — | Must be 'parquet'. |
parquet.utc-timezone | No | false | Use UTC for epoch-to-LocalDateTime conversion. Set to true for Hive 3.x compatibility; leave false for Hive 0.x/1.x/2.x. |
timestamp.time.unit | No | micros | Precision for int64 timestamps. Valid values: nanos, micros, millis. |
write.int64.timestamp | No | false | Write timestamps as int64 with logical type instead of int96. Required for Spark compatibility. Note: timestamps are not converted between time zones. |
Parquet also supports Hadoop-level configuration options via ParquetOutputFormat. Pass them directly as format properties:
CREATE TABLE compressed_table (...) WITH (
'connector' = 'filesystem',
'path' = '/data/',
'format' = 'parquet',
'parquet.compression' = 'GZIP'
)
Supported compression codecs: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.
Data type mapping
Flink’s Parquet type mapping is compatible with Apache Hive by default. Spark requires write.int64.timestamp = true for timestamp compatibility.
| Flink Data Type | Parquet type | Parquet logical type |
|---|
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | — |
BINARY / VARBINARY | BINARY | — |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT | INT32 | — |
BIGINT | INT64 | — |
FLOAT | FLOAT | — |
DOUBLE | DOUBLE | — |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP | INT96 (or INT64 with write.int64.timestamp) | — |
ARRAY | — | LIST |
MAP | — | MAP |
MULTISET | — | MAP |
ROW | — | STRUCT |
Parquet does not support nullable map keys. Flink maps both MAP and MULTISET to Parquet MAP, which requires non-null keys.
Usage with DataStream API
Reading Parquet files with FileSource
Use AvroParquetReaders to read Parquet files that contain Avro-encoded data:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
Schema schema = new Schema.Parser().parse(schemaString);
FileSource<GenericRecord> source = FileSource
.forBulkFileFormat(
AvroParquetReaders.forGenericRecord(schema),
new org.apache.flink.core.fs.Path("/data/parquet/"))
.build();
DataStreamSource<GenericRecord> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Parquet Source"
);
Writing Parquet files with FileSink
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
Schema schema = new Schema.Parser().parse(schemaString);
DataStream<GenericRecord> stream = ...;
FileSink<GenericRecord> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/parquet/"),
AvroParquetWriters.forGenericRecord(schema))
.build();
stream.sinkTo(sink);
For POJO types:
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
DataStream<User> userStream = ...;
FileSink<User> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/users/"),
AvroParquetWriters.forReflectRecord(User.class))
.build();
userStream.sinkTo(sink);
For Protobuf-generated classes:
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
DataStream<UserProto> protoStream = ...;
FileSink<UserProto> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/proto_parquet/"),
ParquetProtoWriters.forType(UserProto.class))
.build();
protoStream.sinkTo(sink);