Skip to main content
Apache Parquet is a columnar binary storage format that provides efficient compression and fast read performance, particularly for analytical queries that access subsets of columns. Flink’s Parquet format is compatible with Apache Hive. Parquet is a serialization schema (for sinks) and a deserialization schema (for sources).

Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-parquet</artifactId>
  <version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-parquet</artifactId>
  <version>${flink.version}</version>
</dependency>

Usage with Table API / SQL

Filesystem source and sink

CREATE TABLE user_behavior (
  user_id     BIGINT,
  item_id     BIGINT,
  category_id BIGINT,
  behavior    STRING,
  ts          TIMESTAMP(3),
  dt          STRING
) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
  'path'      = '/tmp/user_behavior',
  'format'    = 'parquet'
)
With Snappy compression:
CREATE TABLE compressed_events (
  id    BIGINT,
  value STRING,
  dt    STRING
) PARTITIONED BY (dt) WITH (
  'connector'           = 'filesystem',
  'path'                = '/data/events/',
  'format'              = 'parquet',
  'parquet.compression' = 'SNAPPY'
)

Streaming insert into partitioned Parquet table

INSERT INTO user_behavior
SELECT
  user_id, item_id, category_id, behavior, ts,
  DATE_FORMAT(ts, 'yyyy-MM-dd') AS dt
FROM kafka_source;
For bulk-encoded formats like Parquet, files are finalized on each Flink checkpoint. Set your checkpoint interval according to your target file size and latency requirements.

Format options

OptionRequiredDefaultDescription
formatYesMust be 'parquet'.
parquet.utc-timezoneNofalseUse UTC for epoch-to-LocalDateTime conversion. Set to true for Hive 3.x compatibility; leave false for Hive 0.x/1.x/2.x.
timestamp.time.unitNomicrosPrecision for int64 timestamps. Valid values: nanos, micros, millis.
write.int64.timestampNofalseWrite timestamps as int64 with logical type instead of int96. Required for Spark compatibility. Note: timestamps are not converted between time zones.
Parquet also supports Hadoop-level configuration options via ParquetOutputFormat. Pass them directly as format properties:
CREATE TABLE compressed_table (...) WITH (
  'connector'           = 'filesystem',
  'path'                = '/data/',
  'format'              = 'parquet',
  'parquet.compression' = 'GZIP'
)
Supported compression codecs: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD.

Data type mapping

Flink’s Parquet type mapping is compatible with Apache Hive by default. Spark requires write.int64.timestamp = true for timestamp compatibility.
Flink Data TypeParquet typeParquet logical type
CHAR / VARCHAR / STRINGBINARYUTF8
BOOLEANBOOLEAN
BINARY / VARBINARYBINARY
DECIMALFIXED_LEN_BYTE_ARRAYDECIMAL
TINYINTINT32INT_8
SMALLINTINT32INT_16
INTINT32
BIGINTINT64
FLOATFLOAT
DOUBLEDOUBLE
DATEINT32DATE
TIMEINT32TIME_MILLIS
TIMESTAMPINT96 (or INT64 with write.int64.timestamp)
ARRAYLIST
MAPMAP
MULTISETMAP
ROWSTRUCT
Parquet does not support nullable map keys. Flink maps both MAP and MULTISET to Parquet MAP, which requires non-null keys.

Usage with DataStream API

Reading Parquet files with FileSource

Use AvroParquetReaders to read Parquet files that contain Avro-encoded data:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;

Schema schema = new Schema.Parser().parse(schemaString);

FileSource<GenericRecord> source = FileSource
    .forBulkFileFormat(
        AvroParquetReaders.forGenericRecord(schema),
        new org.apache.flink.core.fs.Path("/data/parquet/"))
    .build();

DataStreamSource<GenericRecord> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "Parquet Source"
);

Writing Parquet files with FileSink

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;

Schema schema = new Schema.Parser().parse(schemaString);
DataStream<GenericRecord> stream = ...;

FileSink<GenericRecord> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/parquet/"),
        AvroParquetWriters.forGenericRecord(schema))
    .build();

stream.sinkTo(sink);
For POJO types:
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

DataStream<User> userStream = ...;

FileSink<User> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/users/"),
        AvroParquetWriters.forReflectRecord(User.class))
    .build();

userStream.sinkTo(sink);
For Protobuf-generated classes:
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

DataStream<UserProto> protoStream = ...;

FileSink<UserProto> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/proto_parquet/"),
        ParquetProtoWriters.forType(UserProto.class))
    .build();

protoStream.sinkTo(sink);

Build docs developers (and LLMs) love