Skip to main content
Apache ORC is a columnar binary storage format designed for high-throughput reads and strong compression. Flink’s ORC format is compatible with Apache Hive. ORC is a serialization schema (for sinks) and a deserialization schema (for sources).

Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc</artifactId>
  <version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-orc</artifactId>
  <version>${flink.version}</version>
</dependency>

Usage with Table API / SQL

Filesystem source and sink

CREATE TABLE user_behavior (
  user_id     BIGINT,
  item_id     BIGINT,
  category_id BIGINT,
  behavior    STRING,
  ts          TIMESTAMP(3),
  dt          STRING
) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
  'path'      = '/tmp/user_behavior',
  'format'    = 'orc'
)
With Snappy compression:
CREATE TABLE compressed_logs (
  log_id   BIGINT,
  message  STRING,
  level    STRING,
  dt       STRING
) PARTITIONED BY (dt) WITH (
  'connector'    = 'filesystem',
  'path'         = '/data/logs/',
  'format'       = 'orc',
  'orc.compress' = 'SNAPPY'
)

Streaming insert

INSERT INTO user_behavior
SELECT
  user_id, item_id, category_id, behavior, ts,
  DATE_FORMAT(ts, 'yyyy-MM-dd') AS dt
FROM kafka_source;
ORC is a bulk-encoded format. Files are finalized on each Flink checkpoint. Tune your checkpoint interval to balance file size and latency.

Format options

OptionRequiredDefaultDescription
formatYesMust be 'orc'.
The ORC format also supports any ORC table property passed directly as a format option:
ORC propertyCommon valuesDescription
orc.compressNONE, ZLIB, SNAPPY, LZO, LZ4, ZSTDCompression codec. Default is ZLIB.
orc.compress.size(integer)Compression chunk size in bytes.
orc.stripe.size(integer)Size of ORC stripes in bytes.
orc.row.index.stride(integer)Number of rows between row index entries.
orc.bloom.filter.columns(comma-separated column names)Columns for which to create Bloom filter indexes.
CREATE TABLE orc_table (
  id   BIGINT,
  name STRING,
  dt   STRING
) PARTITIONED BY (dt) WITH (
  'connector'    = 'filesystem',
  'path'         = '/data/orc/',
  'format'       = 'orc',
  'orc.compress' = 'ZSTD',
  'orc.bloom.filter.columns' = 'id'
)

Data type mapping

ORC format type mapping is compatible with Apache Hive.
Flink Data TypeORC physical typeORC logical type
CHARbytesCHAR
VARCHARbytesVARCHAR
STRINGbytesSTRING
BOOLEANlongBOOLEAN
BYTES / BINARY / VARBINARYbytesBINARY
DECIMALdecimalDECIMAL
TINYINTlongBYTE
SMALLINTlongSHORT
INTlongINT
BIGINTlongLONG
FLOATdoubleFLOAT
DOUBLEdoubleDOUBLE
DATElongDATE
TIMESTAMPtimestampTIMESTAMP
ARRAYLIST
MAPMAP
ROWSTRUCT

Usage with DataStream API

To write ORC files from a DataStream job, implement a Vectorizer that converts your type to ORC’s VectorizedRowBatch and use OrcBulkWriterFactory with FileSink:
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.Vectorizer;

public class EventVectorizer extends Vectorizer<Event> implements Serializable {

    public EventVectorizer(String schema) {
        super(schema);
    }

    @Override
    public void vectorize(Event element, VectorizedRowBatch batch) throws IOException {
        BytesColumnVector idCol     = (BytesColumnVector) batch.cols[0];
        LongColumnVector  tsCol     = (LongColumnVector)  batch.cols[1];
        BytesColumnVector valueCol  = (BytesColumnVector) batch.cols[2];

        int row = batch.size++;
        idCol.setVal(row, element.getId().getBytes(StandardCharsets.UTF_8));
        tsCol.vector[row] = element.getTimestamp();
        valueCol.setVal(row, element.getValue().getBytes(StandardCharsets.UTF_8));
    }
}
Create the sink:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;

String schema = "struct<id:string,ts:bigint,value:string>";
DataStream<Event> stream = ...;

OrcBulkWriterFactory<Event> writerFactory =
    new OrcBulkWriterFactory<>(new EventVectorizer(schema));

FileSink<Event> sink = FileSink
    .forBulkFormat(
        new org.apache.flink.core.fs.Path("/output/orc/"),
        writerFactory)
    .build();

stream.sinkTo(sink);
To configure Hadoop settings and ORC writer properties:
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;

String schema = "struct<id:string,ts:bigint>";
Configuration hadoopConf = new Configuration();

Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "SNAPPY");
writerProps.setProperty("orc.stripe.size", "67108864"); // 64 MB

OrcBulkWriterFactory<Event> writerFactory =
    new OrcBulkWriterFactory<>(new EventVectorizer(schema), writerProps, hadoopConf);

Adding user metadata

You can attach key-value metadata to ORC files from within the vectorize method:
@Override
public void vectorize(Event element, VectorizedRowBatch batch) throws IOException {
    // ... write columns ...
    this.addUserMetadata("producer", ByteBuffer.wrap("flink-job".getBytes()));
    this.addUserMetadata("schema-version", ByteBuffer.wrap("v2".getBytes()));
}

Build docs developers (and LLMs) love