ORC (Optimized Row Columnar) is a columnar storage format designed for Hadoop workloads. Apache Arrow provides efficient readers and writers for ORC files with support for stripe-based reading and various compression algorithms.
Reading ORC Files
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/adapters/orc/adapter.h>
// Read entire ORC file as a Table
arrow::Result<std::shared_ptr<arrow::Table>> ReadORCFile(
const std::string& filename) {
// Open the file
ARROW_ASSIGN_OR_RAISE(
auto input,
arrow::io::ReadableFile::Open(filename));
// Create an ORC reader
ARROW_ASSIGN_OR_RAISE(
auto reader,
arrow::adapters::orc::ORCFileReader::Open(
input, arrow::default_memory_pool()));
// Read the entire file as a table
return reader->Read();
}
// Read specific columns from ORC file
arrow::Result<std::shared_ptr<arrow::Table>> ReadORCColumns(
const std::string& filename,
const std::vector<std::string>& column_names) {
ARROW_ASSIGN_OR_RAISE(
auto input,
arrow::io::ReadableFile::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto reader,
arrow::adapters::orc::ORCFileReader::Open(
input, arrow::default_memory_pool()));
return reader->Read(column_names);
}
// Read specific stripe from ORC file
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ReadORCStripe(
const std::string& filename,
int64_t stripe_index) {
ARROW_ASSIGN_OR_RAISE(
auto input,
arrow::io::ReadableFile::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto reader,
arrow::adapters::orc::ORCFileReader::Open(
input, arrow::default_memory_pool()));
return reader->ReadStripe(stripe_index);
}
// Streaming read with RecordBatch iterator
arrow::Status StreamORCFile(const std::string& filename) {
ARROW_ASSIGN_OR_RAISE(
auto input,
arrow::io::ReadableFile::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto reader,
arrow::adapters::orc::ORCFileReader::Open(
input, arrow::default_memory_pool()));
// Get a batch reader for the entire file
int64_t batch_size = 10000;
std::vector<std::string> include_names = {}; // All columns
ARROW_ASSIGN_OR_RAISE(
auto batch_reader,
reader->GetRecordBatchReader(batch_size, include_names));
// Read batches
std::shared_ptr<arrow::RecordBatch> batch;
while (batch_reader->ReadNext(&batch).ok() && batch != nullptr) {
std::cout << "Read batch with " << batch->num_rows() << " rows" << std::endl;
}
return arrow::Status::OK();
}
import pyarrow as pa
import pyarrow.orc as orc
# Read entire ORC file as a Table
table = orc.read_table('data.orc')
print(f"Loaded table with {table.num_rows} rows")
# Read specific columns
table = orc.read_table('data.orc', columns=['col1', 'col2'])
# Read using ORCFile for more control
orc_file = orc.ORCFile('data.orc')
# Get file metadata
print(f"Number of rows: {orc_file.nrows}")
print(f"Number of stripes: {orc_file.nstripes}")
print(f"Schema: {orc_file.schema}")
# Read specific stripe
stripe_data = orc_file.read_stripe(0)
print(f"Stripe 0 has {stripe_data.num_rows} rows")
# Read entire file
table = orc_file.read()
# Read with column selection
table = orc_file.read(columns=['col1', 'col2'])
# Read from filesystem
from pyarrow import fs
filesystem = fs.LocalFileSystem()
table = orc.read_table('data.orc', filesystem=filesystem)
Writing ORC Files
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/adapters/orc/adapter.h>
// Write Table to ORC file
arrow::Status WriteORCFile(
const std::string& filename,
const std::shared_ptr<arrow::Table>& table) {
// Open output stream
ARROW_ASSIGN_OR_RAISE(
auto output,
arrow::io::FileOutputStream::Open(filename));
// Create an ORC writer
ARROW_ASSIGN_OR_RAISE(
auto writer,
arrow::adapters::orc::ORCFileWriter::Open(output.get()));
// Write the table
ARROW_RETURN_NOT_OK(writer->Write(*table));
// Close the writer
ARROW_RETURN_NOT_OK(writer->Close());
return arrow::Status::OK();
}
// Write with compression options
arrow::Status WriteORCWithOptions(
const std::string& filename,
const std::shared_ptr<arrow::Table>& table) {
ARROW_ASSIGN_OR_RAISE(
auto output,
arrow::io::FileOutputStream::Open(filename));
// Configure write options
arrow::adapters::orc::WriteOptions options;
options.compression = arrow::Compression::SNAPPY;
options.compression_block_size = 65536;
options.stripe_size = 64 * 1024 * 1024; // 64 MB
options.row_index_stride = 10000;
ARROW_ASSIGN_OR_RAISE(
auto writer,
arrow::adapters::orc::ORCFileWriter::Open(output.get(), options));
ARROW_RETURN_NOT_OK(writer->Write(*table));
ARROW_RETURN_NOT_OK(writer->Close());
return arrow::Status::OK();
}
import pyarrow as pa
import pyarrow.orc as orc
# Create sample data
table = pa.table({
'id': pa.array([1, 2, 3, 4, 5]),
'name': pa.array(['Alice', 'Bob', 'Charlie', 'David', 'Eve']),
'value': pa.array([10.5, 20.3, 30.1, 40.7, 50.2])
})
# Write table to ORC file
orc.write_table(table, 'output.orc')
# Write with compression
orc.write_table(
table,
'output_compressed.orc',
compression='snappy'
)
# Write with custom options
orc.write_table(
table,
'output_custom.orc',
file_version='0.12',
batch_size=1024,
stripe_size=64 * 1024 * 1024, # 64 MB
compression='zstd',
compression_block_size=65536,
compression_strategy='speed',
row_index_stride=10000
)
# Incremental writing with ORCWriter
with orc.ORCWriter('incremental.orc') as writer:
for i in range(10):
# Write tables incrementally
batch_table = create_table()
writer.write(batch_table)
# Write with dictionary encoding options
orc.write_table(
table,
'output_dict.orc',
dictionary_key_size_threshold=0.5 # Use dict encoding
)
# Write with bloom filters
orc.write_table(
table,
'output_bloom.orc',
bloom_filter_columns=['id', 'name'],
bloom_filter_fpp=0.05 # False positive probability
)
ORC Stripes
ORC files are divided into stripes, which are horizontal partitions of the data:
orc_file = orc.ORCFile('data.orc')
# Get number of stripes
num_stripes = orc_file.nstripes
print(f"File has {num_stripes} stripes")
# Read individual stripes
for i in range(num_stripes):
stripe = orc_file.read_stripe(i)
print(f"Stripe {i}: {stripe.num_rows} rows")
# Read specific columns from a stripe
stripe = orc_file.read_stripe(0, columns=['col1', 'col2'])
Stripe Size: Stripes are typically 64-256 MB. Larger stripes improve compression and reduce metadata overhead, but increase memory usage during reads. Configure with stripe_size parameter when writing.
Configuration Options
Compression
ORC supports multiple compression algorithms:
- UNCOMPRESSED: No compression
- SNAPPY: Fast compression with moderate ratio (default)
- ZLIB: Better compression, slower than Snappy
- LZ4: Very fast compression
- ZSTD: Excellent compression ratio, configurable levels
arrow::adapters::orc::WriteOptions options;
options.compression = arrow::Compression::ZSTD;
options.compression_block_size = 65536; // 64 KB blocks
options.compression_strategy =
arrow::adapters::orc::CompressionStrategy::COMPRESSION;
orc.write_table(
table,
'output.orc',
compression='zstd',
compression_block_size=65536,
compression_strategy='compression' # or 'speed'
)
Stripe and Batch Size
orc.write_table(
table,
'output.orc',
stripe_size=128 * 1024 * 1024, # 128 MB stripes
batch_size=2048, # Rows per batch
row_index_stride=10000 # Rows per index entry
)
Stripe Size: Larger stripes (128-256 MB) improve compression and reduce overhead but require more memory. Default is 64 MB.
Dictionary Encoding
Dictionary encoding is effective for columns with repeated values:
orc.write_table(
table,
'output.orc',
dictionary_key_size_threshold=0.5 # 0-1 range, 0=disable, 1=always
)
Dictionary Threshold: Value between 0 (disable) and 1 (always enable):
- 0.0: Disable dictionary encoding
- 0.5: Use dictionary if it saves space (recommended)
- 1.0: Always use dictionary encoding
Bloom Filters
Bloom filters enable faster filtering on specific columns:
# Add bloom filters to specific columns
orc.write_table(
table,
'output.orc',
bloom_filter_columns=['user_id', 'category'],
bloom_filter_fpp=0.05 # 5% false positive rate
)
Bloom Filters: Enable bloom filters on columns frequently used in WHERE clauses. They add a small overhead to file size but can dramatically speed up queries by skipping irrelevant stripes.
auto reader = arrow::adapters::orc::ORCFileReader::Open(
input, arrow::default_memory_pool()).ValueOrDie();
// Get schema
auto schema = reader->ReadSchema().ValueOrDie();
std::cout << schema->ToString() << std::endl;
// Get file info
std::cout << "Number of rows: " << reader->NumberOfRows() << std::endl;
std::cout << "Number of stripes: " << reader->NumberOfStripes() << std::endl;
std::cout << "File version: " << reader->GetFileVersion() << std::endl;
std::cout << "Compression: " << reader->GetCompression().ValueOrDie() << std::endl;
// Get stripe information
for (int64_t i = 0; i < reader->NumberOfStripes(); ++i) {
auto stripe_info = reader->GetStripeInformation(i);
std::cout << "Stripe " << i << ": offset=" << stripe_info.offset
<< " length=" << stripe_info.length
<< " rows=" << stripe_info.num_rows << std::endl;
}
// Get metadata
auto metadata = reader->ReadMetadata().ValueOrDie();
orc_file = orc.ORCFile('data.orc')
# File metadata
print(f"Schema: {orc_file.schema}")
print(f"Number of rows: {orc_file.nrows}")
print(f"Number of stripes: {orc_file.nstripes}")
print(f"File version: {orc_file.file_version}")
print(f"Software version: {orc_file.software_version}")
print(f"Compression: {orc_file.compression}")
print(f"Compression size: {orc_file.compression_size}")
print(f"Writer: {orc_file.writer}")
print(f"Writer version: {orc_file.writer_version}")
# File statistics
print(f"Content length: {orc_file.content_length}")
print(f"Stripe statistics length: {orc_file.stripe_statistics_length}")
print(f"File footer length: {orc_file.file_footer_length}")
print(f"File length: {orc_file.file_length}")
# Row index info
print(f"Row index stride: {orc_file.row_index_stride}")
# Custom metadata
metadata = orc_file.metadata
if metadata:
for key in metadata.keys():
print(f"{key}: {metadata[key]}")
Stripe-based Reading: For large files, read individual stripes to reduce memory usage:orc_file = orc.ORCFile('large_file.orc')
for i in range(orc_file.nstripes):
stripe = orc_file.read_stripe(i)
process_stripe(stripe)
Column Pruning: Read only the columns you need:table = orc.read_table('data.orc', columns=['col1', 'col2'])
Compression Strategy: Choose compression based on your use case:
- Speed: Use
compression='snappy' and compression_strategy='speed'
- Compression: Use
compression='zstd' and compression_strategy='compression'
orc.write_table(
table, 'output.orc',
compression='zstd',
compression_strategy='compression'
)
Bloom Filter Trade-off: Bloom filters add ~1-5% to file size but can skip entire stripes during filtering, leading to 10-100x speedups for selective queries.
Compatibility Considerations
ORC Versions:
- Version 0.11 (Hive 0.11 / ORC v0): Older version, maximum compatibility
- Version 0.12 (Hive 0.12 / ORC v1): Newer version with improved features (default)
Use version 0.12 unless you need compatibility with older Hive versions:orc.write_table(table, 'output.orc', file_version='0.12')
Hadoop Ecosystem: ORC is widely used in the Hadoop ecosystem. Files written by Arrow are compatible with:
- Apache Hive
- Apache Spark
- Presto/Trino
- Apache Impala
Test compatibility if integrating with specific tools.
Common Use Cases
Converting Parquet to ORC
import pyarrow.parquet as pq
import pyarrow.orc as orc
# Read Parquet
table = pq.read_table('data.parquet')
# Write to ORC
orc.write_table(table, 'data.orc', compression='snappy')
Partitioned Reading
# Read specific stripe range
orc_file = orc.ORCFile('large_file.orc')
# Process stripes in parallel or sequentially
stripe_range = range(10, 20) # Read stripes 10-19
tables = [orc_file.read_stripe(i) for i in stripe_range]
# Combine stripe results
combined = pa.concat_tables(tables)
# Add custom metadata to table
table = table.replace_schema_metadata({
'author': 'Apache Arrow',
'created': '2024-01-01',
'version': '1.0'
})
orc.write_table(table, 'with_metadata.orc')
# Read metadata back
orc_file = orc.ORCFile('with_metadata.orc')
metadata = orc_file.metadata
print(metadata)