Skip to main content
A format defines how binary data is mapped to table columns—it is the serialization and deserialization layer that sits between a connector and the data it reads or writes. Most connectors require you to specify a format in the WITH clause.
CREATE TABLE my_table (
  id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic'     = 'my_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'    = 'json'  -- <-- the format
)

How formats work

Formats implement either a DeserializationSchema (for reading), a SerializationSchema (for writing), or both. Flink discovers format factories via the same SPI mechanism as connectors: the format key in the WITH clause maps to a registered format factory. Formats that support both reading and writing are labeled:
  • Serialization Schema – can be used for sinks
  • Deserialization Schema – can be used for sources

Available formats

FormatReadWriteSupported connectors
JSONYesYesKafka, Upsert Kafka, Kinesis, Firehose, Filesystem, Elasticsearch
AvroYesYesKafka, Upsert Kafka, Kinesis, Firehose, Filesystem
ParquetYesYesFilesystem
ORCYesYesFilesystem
CSVYesYesKafka, Upsert Kafka, Kinesis, Firehose, Filesystem
Confluent AvroYesYesKafka, Upsert Kafka
ProtobufYesYesKafka
Debezium CDCYesYesKafka, Filesystem
Canal CDCYesYesKafka, Filesystem
Maxwell CDCYesYesKafka, Filesystem
OGG CDCYesYesKafka, Filesystem
RawYesYesKafka, Upsert Kafka, Kinesis, Firehose, Filesystem

JSON

Human-readable format. Derive schema from table DDL. Configurable error handling and timestamp formats.

Avro

Compact binary format with schema evolution support. Schema is derived from the table schema.

Parquet

Columnar binary format. High compression and read performance. Compatible with Hive and Spark.

ORC

Columnar binary format. High compression and read performance. Compatible with Hive.

Formats and the DataStream API

Formats also integrate with the DataStream API through FileSource and FileSink:
  • StreamFormat: reads records from an input stream one at a time (e.g., TextLineInputFormat, CsvReaderFormat)
  • BulkFormat: reads batches of records at a time (e.g., Parquet and ORC readers)
  • BulkWriter.Factory: writes batches of records (e.g., AvroWriterFactory, ParquetWriterFactory, OrcBulkWriterFactory)
See the Filesystem Connector (DataStream) page for details on using these in a DataStream job.

CDC formats

Flink supports several Change Data Capture (CDC) formats that represent database change events as Flink rows with a changelog mode:
FormatSource systemChangelog mode
Debezium-JSONMySQL, PostgreSQL, MongoDB, SQL Server, and othersInsert, Update, Delete
Canal-JSONMySQL (via Alibaba Canal)Insert, Update, Delete
Maxwell-JSONMySQL (via Maxwell)Insert, Update, Delete
OGG-JSONOracle GoldenGateInsert, Update, Delete
CDC formats enable Flink to maintain stateful aggregations over mutable database tables, making them useful for building real-time analytical views.

Resolving format dependency conflicts

If you package multiple formats in an uber-JAR, their META-INF/services files may conflict. See the Table / SQL Connectors Overview for instructions on using Maven Shade’s ServicesResourceTransformer to merge them correctly.

Build docs developers (and LLMs) love