Parquet is a columnar storage format that provides efficient compression and encoding schemes. Apache Arrow provides high-performance readers and writers for Parquet files with seamless integration.
Reading Parquet Files
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
// Read entire Parquet file as a Table
arrow::Result<std::shared_ptr<arrow::Table>> ReadParquetFile(
const std::string& filename) {
// Open the file for reading
ARROW_ASSIGN_OR_RAISE(
auto infile,
arrow::io::ReadableFile::Open(filename));
// Create a Parquet reader
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_ASSIGN_OR_RAISE(
reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
// Read the entire file as a table
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
return table;
}
// Read specific columns from Parquet file
arrow::Result<std::shared_ptr<arrow::Table>> ReadParquetColumns(
const std::string& filename,
const std::vector<int>& column_indices) {
ARROW_ASSIGN_OR_RAISE(
auto infile,
arrow::io::ReadableFile::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(reader->ReadTable(column_indices, &table));
return table;
}
// Read Parquet file as streaming RecordBatches
arrow::Status StreamParquetFile(const std::string& filename) {
ARROW_ASSIGN_OR_RAISE(
auto infile,
arrow::io::ReadableFile::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
// Enable multi-threaded reading
reader->set_use_threads(true);
// Get a RecordBatch reader
ARROW_ASSIGN_OR_RAISE(
auto batch_reader,
reader->GetRecordBatchReader());
// Read batches
std::shared_ptr<arrow::RecordBatch> batch;
while (batch_reader->ReadNext(&batch).ok() && batch != nullptr) {
// Process batch
std::cout << "Read batch with " << batch->num_rows() << " rows" << std::endl;
}
return arrow::Status::OK();
}
import pyarrow as pa
import pyarrow.parquet as pq
# Read entire Parquet file as a Table
table = pq.read_table('data.parquet')
print(f"Loaded table with {table.num_rows} rows")
# Read specific columns
table = pq.read_table('data.parquet', columns=['col1', 'col2'])
# Read with row group filtering
parquet_file = pq.ParquetFile('data.parquet')
first_row_group = parquet_file.read_row_group(0)
# Read multiple row groups
table = parquet_file.read_row_groups([0, 1, 2])
# Streaming read with RecordBatch iterator
parquet_file = pq.ParquetFile('large_data.parquet')
for batch in parquet_file.iter_batches(batch_size=10000):
print(f"Processing batch with {batch.num_rows} rows")
# Read with filters
import pyarrow.compute as pc
table = pq.read_table(
'data.parquet',
filters=pc.field('year') >= 2020
)
# Read from multiple files
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
table = dataset.to_table()
Writing Parquet Files
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/writer.h>
// Write Table to Parquet file
arrow::Status WriteParquetFile(
const std::string& filename,
const std::shared_ptr<arrow::Table>& table) {
// Open output stream
ARROW_ASSIGN_OR_RAISE(
auto outfile,
arrow::io::FileOutputStream::Open(filename));
// Write the table to Parquet
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table,
arrow::default_memory_pool(),
outfile,
/*chunk_size=*/1024 * 1024));
return arrow::Status::OK();
}
// Write with compression and properties
arrow::Status WriteParquetWithOptions(
const std::string& filename,
const std::shared_ptr<arrow::Table>& table) {
ARROW_ASSIGN_OR_RAISE(
auto outfile,
arrow::io::FileOutputStream::Open(filename));
// Set Parquet writer properties
parquet::WriterProperties::Builder builder;
builder.compression(parquet::Compression::SNAPPY);
builder.version(parquet::ParquetVersion::PARQUET_2_6);
std::shared_ptr<parquet::WriterProperties> props = builder.build();
// Set Arrow-specific properties
auto arrow_props = parquet::ArrowWriterProperties::Builder()
.store_schema()
->build();
// Write the table
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
*table,
arrow::default_memory_pool(),
outfile,
/*chunk_size=*/1024 * 1024,
props,
arrow_props));
return arrow::Status::OK();
}
// Incremental write with FileWriter
arrow::Status WriteParquetIncremental(
const std::string& filename,
const std::shared_ptr<arrow::Schema>& schema) {
ARROW_ASSIGN_OR_RAISE(
auto outfile,
arrow::io::FileOutputStream::Open(filename));
ARROW_ASSIGN_OR_RAISE(
auto writer,
parquet::arrow::FileWriter::Open(
*schema,
arrow::default_memory_pool(),
outfile));
// Write data in batches
for (int i = 0; i < 10; ++i) {
// Create a batch
ARROW_ASSIGN_OR_RAISE(auto batch, CreateRecordBatch(schema));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
// Close the writer
ARROW_RETURN_NOT_OK(writer->Close());
return arrow::Status::OK();
}
import pyarrow as pa
import pyarrow.parquet as pq
# Create sample data
table = pa.table({
'id': pa.array([1, 2, 3, 4, 5]),
'name': pa.array(['Alice', 'Bob', 'Charlie', 'David', 'Eve']),
'value': pa.array([10.5, 20.3, 30.1, 40.7, 50.2])
})
# Write table to Parquet file
pq.write_table(table, 'output.parquet')
# Write with compression
pq.write_table(
table,
'output_compressed.parquet',
compression='snappy'
)
# Write with custom properties
pq.write_table(
table,
'output_custom.parquet',
compression='gzip',
compression_level=9,
use_dictionary=True,
row_group_size=100000,
data_page_size=1024 * 1024,
version='2.6'
)
# Incremental writing with ParquetWriter
with pq.ParquetWriter('incremental.parquet', table.schema) as writer:
for i in range(10):
# Write batches incrementally
batch = create_batch(table.schema)
writer.write_batch(batch)
# Write to multiple files (partitioned)
pq.write_to_dataset(
table,
root_path='partitioned_data',
partition_cols=['name']
)
# Write with metadata
table = table.replace_schema_metadata({
'author': 'Apache Arrow',
'created': '2024-01-01'
})
pq.write_table(table, 'with_metadata.parquet')
Configuration Options
Compression
Parquet supports multiple compression codecs:
- UNCOMPRESSED: No compression (fastest, largest files)
- SNAPPY: Good compression ratio with fast speed (default)
- GZIP: Better compression, slower than Snappy
- LZ4: Fast compression similar to Snappy
- ZSTD: Excellent compression ratio, configurable levels
- BROTLI: High compression ratio, slower
parquet::WriterProperties::Builder builder;
builder.compression(parquet::Compression::ZSTD);
builder.compression_level(5); // ZSTD compression level
auto props = builder.build();
pq.write_table(
table,
'output.parquet',
compression='zstd',
compression_level=5
)
Row Groups and Page Size
Row groups and data pages affect both read and write performance:
pq.write_table(
table,
'output.parquet',
row_group_size=1000000, # Rows per row group
data_page_size=1024 * 1024, # 1MB data pages
use_dictionary=True # Enable dictionary encoding
)
Row Group Size: Larger row groups improve compression and reduce metadata overhead but increase memory usage during reads. A typical size is 128-256 MB or 100,000-1,000,000 rows.
Dictionary Encoding
Dictionary encoding is effective for columns with repeated values:
parquet::WriterProperties::Builder builder;
// Enable dictionary for all columns
builder.enable_dictionary();
// Disable for specific column
builder.disable_dictionary("large_string_column");
auto props = builder.build();
pq.write_table(
table,
'output.parquet',
use_dictionary=True,
dictionary_pagesize_limit=1024 * 1024 # 1MB limit
)
auto reader = parquet::arrow::OpenFile(infile, arrow::default_memory_pool()).ValueOrDie();
// Get file metadata
auto metadata = reader->parquet_reader()->metadata();
std::cout << "Number of rows: " << metadata->num_rows() << std::endl;
std::cout << "Number of row groups: " << metadata->num_row_groups() << std::endl;
std::cout << "Number of columns: " << metadata->num_columns() << std::endl;
// Get schema
std::shared_ptr<arrow::Schema> schema;
reader->GetSchema(&schema);
std::cout << schema->ToString() << std::endl;
# Read metadata without loading data
parquet_file = pq.ParquetFile('data.parquet')
# File-level metadata
print(f"Number of rows: {parquet_file.metadata.num_rows}")
print(f"Number of row groups: {parquet_file.num_row_groups}")
print(f"Schema: {parquet_file.schema}")
# Row group metadata
for i in range(parquet_file.num_row_groups):
rg_metadata = parquet_file.metadata.row_group(i)
print(f"Row group {i}: {rg_metadata.num_rows} rows")
# Column statistics
for i in range(parquet_file.metadata.num_row_groups):
for j in range(parquet_file.metadata.num_columns):
col_meta = parquet_file.metadata.row_group(i).column(j)
if col_meta.is_stats_set:
print(f"Column {j}: min={col_meta.statistics.min}, max={col_meta.statistics.max}")
Multi-threaded Reading: Enable multi-threading to parallelize column decoding:# Python
table = pq.read_table('data.parquet', use_threads=True)
# C++
reader->set_use_threads(true);
Column Pruning: Read only the columns you need to reduce I/O:table = pq.read_table('data.parquet', columns=['col1', 'col2'])
Predicate Pushdown: Use filters to read only relevant row groups:import pyarrow.compute as pc
table = pq.read_table(
'data.parquet',
filters=pc.field('year') == 2024
)
Memory Mapping: For files on local disk, memory mapping can improve read performance:parquet_file = pq.ParquetFile('data.parquet', memory_map=True)
Compatibility Considerations
Parquet Versions:
- Version 1.0: Compatible with most readers but limited features
- Version 2.0: Modern features like improved encoding schemes
- Version 2.4+: Latest features including new compression types
Use version 2.6 for maximum compatibility with modern tools:pq.write_table(table, 'output.parquet', version='2.6')
Schema Evolution: Parquet supports schema evolution, but be careful when:
- Adding/removing columns
- Changing data types
- Modifying nested structures
Always test compatibility with your downstream consumers.