Skip to main content
The Apache Avro format reads and writes Avro-encoded data. The Avro schema is automatically derived from the Flink table schema—you do not define an Avro schema file separately. Avro is a serialization schema (for sinks) and a deserialization schema (for sources).

Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-avro</artifactId>
  <version>${flink.version}</version>
</dependency>

Usage with Kafka

CREATE TABLE user_behavior (
  user_id     BIGINT,
  item_id     BIGINT,
  category_id BIGINT,
  behavior    STRING,
  ts          TIMESTAMP(3)
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id'          = 'avro-consumer',
  'scan.startup.mode'            = 'earliest-offset',
  'format'                       = 'avro'
)

Usage with Filesystem

CREATE TABLE user_behavior_archive (
  user_id  BIGINT,
  behavior STRING,
  ts       TIMESTAMP(3),
  dt       STRING
) PARTITIONED BY (dt) WITH (
  'connector'  = 'filesystem',
  'path'       = '/data/archive/',
  'format'     = 'avro',
  'avro.codec' = 'snappy'
)

Format options

OptionRequiredDefaultDescription
formatYesMust be 'avro'.
avro.encodingNobinarySerialization encoding. binary produces compact messages; json produces human-readable messages.
avro.codecNo(none)Compression codec. Only applies to the Filesystem connector. Valid values: null, deflate, snappy, bzip2, xz.
avro.timestamp_mapping.legacyNotrueUse legacy timestamp mapping. Before Flink 1.19, both TIMESTAMP and TIMESTAMP_LTZ were incorrectly mapped to Avro TIMESTAMP. Set to false for correct behavior: TIMESTAMP → Avro LOCAL TIMESTAMP, TIMESTAMP_LTZ → Avro TIMESTAMP.
If you are upgrading from Flink 1.18 or earlier and your data contains timestamps, set avro.timestamp_mapping.legacy = 'false' to use correct timezone-aware mappings. Changing this option on existing data may require a schema migration.

Data type mapping

The Avro schema is always derived from the Flink table schema. The following table shows the mapping:
Flink SQL typeAvro typeAvro logical type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYbytes
DECIMALfixeddecimal
TINYINTint
SMALLINTint
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DATEintdate
TIMEinttime-millis
TIMESTAMPlongtimestamp-millis (legacy) or local-timestamp-millis
TIMESTAMP_LTZlonglocal-timestamp-millis (legacy) or timestamp-millis
ARRAYarray
MAP (string keys)map
MULTISET (string elements)map
ROWrecord
Nullable Flink types are mapped to Avro union(T, null) where T is the non-nullable Avro equivalent.

Writing Avro with the DataStream API

To write Avro files from a DataStream job, use AvroWriters with FileSink:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

Schema schema = new Schema.Parser().parse(schemaString);
DataStream<GenericRecord> stream = ...;

FileSink<GenericRecord> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/avro/"),
        AvroWriters.forGenericRecord(schema))
    .build();

stream.sinkTo(sink);
For POJO classes with reflection-based schemas:
import org.apache.flink.formats.avro.AvroWriters;

DataStream<User> userStream = ...;

FileSink<User> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/users/"),
        AvroWriters.forReflectRecord(User.class))
    .build();

userStream.sinkTo(sink);
To enable Snappy compression, use a custom AvroBuilder:
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.formats.avro.AvroWriterFactory;

AvroWriterFactory<Address> factory = new AvroWriterFactory<>(
    (AvroBuilder<Address>) out -> {
        org.apache.avro.Schema schema = ReflectData.get().getSchema(Address.class);
        DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
        DataFileWriter<Address> writer = new DataFileWriter<>(datumWriter);
        writer.setCodec(CodecFactory.snappyCodec());
        writer.create(schema, out);
        return writer;
    });

DataStream<Address> stream = ...;
stream.sinkTo(FileSink.forBulkFormat(outputPath, factory).build());

Build docs developers (and LLMs) love