The Apache Avro format reads and writes Avro-encoded data. The Avro schema is automatically derived from the Flink table schema—you do not define an Avro schema file separately.
Avro is a serialization schema (for sinks) and a deserialization schema (for sources).
Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro</artifactId>
<version>${flink.version}</version>
</dependency>
Usage with Kafka
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'avro-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
)
Usage with Filesystem
CREATE TABLE user_behavior_archive (
user_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/data/archive/',
'format' = 'avro',
'avro.codec' = 'snappy'
)
| Option | Required | Default | Description |
|---|
format | Yes | — | Must be 'avro'. |
avro.encoding | No | binary | Serialization encoding. binary produces compact messages; json produces human-readable messages. |
avro.codec | No | (none) | Compression codec. Only applies to the Filesystem connector. Valid values: null, deflate, snappy, bzip2, xz. |
avro.timestamp_mapping.legacy | No | true | Use legacy timestamp mapping. Before Flink 1.19, both TIMESTAMP and TIMESTAMP_LTZ were incorrectly mapped to Avro TIMESTAMP. Set to false for correct behavior: TIMESTAMP → Avro LOCAL TIMESTAMP, TIMESTAMP_LTZ → Avro TIMESTAMP. |
If you are upgrading from Flink 1.18 or earlier and your data contains timestamps, set avro.timestamp_mapping.legacy = 'false' to use correct timezone-aware mappings. Changing this option on existing data may require a schema migration.
Data type mapping
The Avro schema is always derived from the Flink table schema. The following table shows the mapping:
| Flink SQL type | Avro type | Avro logical type |
|---|
CHAR / VARCHAR / STRING | string | — |
BOOLEAN | boolean | — |
BINARY / VARBINARY | bytes | — |
DECIMAL | fixed | decimal |
TINYINT | int | — |
SMALLINT | int | — |
INT | int | — |
BIGINT | long | — |
FLOAT | float | — |
DOUBLE | double | — |
DATE | int | date |
TIME | int | time-millis |
TIMESTAMP | long | timestamp-millis (legacy) or local-timestamp-millis |
TIMESTAMP_LTZ | long | local-timestamp-millis (legacy) or timestamp-millis |
ARRAY | array | — |
MAP (string keys) | map | — |
MULTISET (string elements) | map | — |
ROW | record | — |
Nullable Flink types are mapped to Avro union(T, null) where T is the non-nullable Avro equivalent.
Writing Avro with the DataStream API
To write Avro files from a DataStream job, use AvroWriters with FileSink:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Schema schema = new Schema.Parser().parse(schemaString);
DataStream<GenericRecord> stream = ...;
FileSink<GenericRecord> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/avro/"),
AvroWriters.forGenericRecord(schema))
.build();
stream.sinkTo(sink);
For POJO classes with reflection-based schemas:
import org.apache.flink.formats.avro.AvroWriters;
DataStream<User> userStream = ...;
FileSink<User> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/users/"),
AvroWriters.forReflectRecord(User.class))
.build();
userStream.sinkTo(sink);
To enable Snappy compression, use a custom AvroBuilder:
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.formats.avro.AvroWriterFactory;
AvroWriterFactory<Address> factory = new AvroWriterFactory<>(
(AvroBuilder<Address>) out -> {
org.apache.avro.Schema schema = ReflectData.get().getSchema(Address.class);
DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
DataFileWriter<Address> writer = new DataFileWriter<>(datumWriter);
writer.setCodec(CodecFactory.snappyCodec());
writer.create(schema, out);
return writer;
});
DataStream<Address> stream = ...;
stream.sinkTo(FileSink.forBulkFormat(outputPath, factory).build());