Overview
The IPC format features:- Zero-copy reads: Data can be memory-mapped or read without copying
- Schema preservation: Type information is included in the stream
- Streaming support: Process data incrementally as it arrives
- Compression: Optional per-buffer compression (LZ4, ZSTD, etc.)
- Dictionary encoding: Efficient encoding for low-cardinality data
- Random access: File format supports seeking to specific batches
Format Types
Arrow provides two IPC format variants:- Stream Format: Sequential access, ideal for pipes and network streams
- File Format: Random access with footer metadata, ideal for disk storage
Writing Streams
- C++
- Python
- Java
- Go
#include <arrow/io/file.h>
#include <arrow/ipc/writer.h>
#include <arrow/record_batch.h>
using namespace arrow;
// Create schema
auto schema = arrow::schema({
field("id", int64()),
field("name", utf8()),
field("value", float64())
});
// Open output stream
ARROW_ASSIGN_OR_RAISE(auto output,
io::FileOutputStream::Open("data.arrow"));
// Create IPC stream writer
ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::MakeStreamWriter(output, schema));
// Write record batches
std::shared_ptr<RecordBatch> batch;
// ... create batch ...
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
// Write more batches as needed
// ...
// Close writer (writes EOS marker)
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_RETURN_NOT_OK(output->Close());
import pyarrow as pa
import pyarrow.ipc as ipc
# Create schema
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64())
])
# Open output stream
with pa.OSFile('data.arrow', 'wb') as sink:
# Create IPC stream writer
with ipc.new_stream(sink, schema) as writer:
# Write record batches
batch = pa.record_batch([
pa.array([1, 2, 3]),
pa.array(['Alice', 'Bob', 'Charlie']),
pa.array([10.5, 20.3, 30.1])
], schema=schema)
writer.write_batch(batch)
# Write more batches as needed
# ...
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import java.io.FileOutputStream;
// Create allocator
BufferAllocator allocator = new RootAllocator();
// Create schema
Schema schema = new Schema(Arrays.asList(
Field.nullable("id", new ArrowType.Int(64, true)),
Field.nullable("name", new ArrowType.Utf8()),
Field.nullable("value", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
));
// Create vector schema root
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
// Open output stream and create writer
try (FileOutputStream out = new FileOutputStream("data.arrow");
ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out.getChannel())) {
writer.start();
// Populate vectors and write batches
root.setRowCount(3);
BigIntVector idVector = (BigIntVector) root.getVector("id");
idVector.setSafe(0, 1);
idVector.setSafe(1, 2);
idVector.setSafe(2, 3);
// ... populate other vectors ...
writer.writeBatch();
writer.end();
}
import (
"os"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/memory"
)
// Create schema
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
},
nil,
)
// Open output file
f, err := os.Create("data.arrow")
if err != nil {
panic(err)
}
defer f.Close()
// Create writer
writer := ipc.NewWriter(f, ipc.WithSchema(schema))
defer writer.Close()
// Create record batch
mem := memory.NewGoAllocator()
builder := array.NewRecordBuilder(mem, schema)
defer builder.Release()
builder.Field(0).(*array.Int64Builder).AppendValues(
[]int64{1, 2, 3}, nil)
builder.Field(1).(*array.StringBuilder).AppendValues(
[]string{"Alice", "Bob", "Charlie"}, nil)
builder.Field(2).(*array.Float64Builder).AppendValues(
[]float64{10.5, 20.3, 30.1}, nil)
rec := builder.NewRecord()
defer rec.Release()
// Write record batch
err = writer.Write(rec)
if err != nil {
panic(err)
}
Reading Streams
- C++
- Python
- Java
#include <arrow/io/file.h>
#include <arrow/ipc/reader.h>
// Open input stream
ARROW_ASSIGN_OR_RAISE(auto input,
io::ReadableFile::Open("data.arrow"));
// Create IPC stream reader
ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchStreamReader::Open(input));
// Read schema
auto schema = reader->schema();
std::cout << "Schema: " << schema->ToString() << std::endl;
// Read batches one by one
std::shared_ptr<RecordBatch> batch;
while (true) {
ARROW_ASSIGN_OR_RAISE(batch, reader->Next());
if (batch == nullptr) break; // End of stream
std::cout << "Batch with " << batch->num_rows()
<< " rows" << std::endl;
}
// Or read all batches into a table
ARROW_ASSIGN_OR_RAISE(auto input2,
io::ReadableFile::Open("data.arrow"));
ARROW_ASSIGN_OR_RAISE(auto reader2,
ipc::RecordBatchStreamReader::Open(input2));
ARROW_ASSIGN_OR_RAISE(auto table, reader2->ToTable());
std::cout << "Total rows: " << table->num_rows() << std::endl;
import pyarrow as pa
import pyarrow.ipc as ipc
# Open input stream
with pa.OSFile('data.arrow', 'rb') as source:
# Create IPC stream reader
with ipc.open_stream(source) as reader:
# Read schema
schema = reader.schema
print(f"Schema: {schema}")
# Read batches one by one
for batch in reader:
print(f"Batch with {len(batch)} rows")
# Or read all at once
reader.seek(0)
table = reader.read_all()
print(f"Total rows: {len(table)}")
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import java.io.FileInputStream;
BufferAllocator allocator = new RootAllocator();
try (FileInputStream in = new FileInputStream("data.arrow");
ArrowStreamReader reader = new ArrowStreamReader(in.getChannel(), allocator)) {
// Read schema
Schema schema = reader.getVectorSchemaRoot().getSchema();
System.out.println("Schema: " + schema);
// Read batches
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
System.out.println("Batch with " + root.getRowCount() + " rows");
}
}
File Format (Random Access)
- C++
- Python
#include <arrow/io/file.h>
#include <arrow/ipc/writer.h>
#include <arrow/ipc/reader.h>
// Writing file format
ARROW_ASSIGN_OR_RAISE(auto output,
io::FileOutputStream::Open("data.feather"));
ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::MakeFileWriter(output, schema));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
ARROW_RETURN_NOT_OK(writer->Close());
// Reading file format (random access)
ARROW_ASSIGN_OR_RAISE(auto input,
io::ReadableFile::Open("data.feather"));
ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchFileReader::Open(input));
// Get metadata
int num_batches = reader->num_record_batches();
std::cout << "File contains " << num_batches
<< " record batches" << std::endl;
// Read specific batch by index
ARROW_ASSIGN_OR_RAISE(auto batch_3,
reader->ReadRecordBatch(3));
// Read all batches
ARROW_ASSIGN_OR_RAISE(auto table,
reader->ReadTable());
import pyarrow as pa
import pyarrow.ipc as ipc
# Writing file format
with pa.OSFile('data.feather', 'wb') as sink:
with ipc.new_file(sink, schema) as writer:
writer.write_batch(batch)
# Reading file format (random access)
with pa.OSFile('data.feather', 'rb') as source:
with ipc.open_file(source) as reader:
# Get metadata
num_batches = reader.num_record_batches
print(f"File contains {num_batches} record batches")
# Read specific batch by index
batch_3 = reader.get_batch(3)
# Read all batches
table = reader.read_all()
Compression
- C++
- Python
#include <arrow/ipc/options.h>
// Configure compression
ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults();
options.codec = util::Codec::Create(Compression::LZ4_FRAME).ValueOrDie();
// Create writer with compression
ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::MakeStreamWriter(output, schema, options));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
ARROW_RETURN_NOT_OK(writer->Close());
// Reader automatically detects compression
ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchStreamReader::Open(input));
import pyarrow as pa
import pyarrow.ipc as ipc
# Configure compression
options = ipc.IpcWriteOptions(
compression='lz4', # or 'zstd', 'gzip'
)
# Create writer with compression
with pa.OSFile('data.arrow', 'wb') as sink:
with ipc.new_stream(sink, schema, options=options) as writer:
writer.write_batch(batch)
# Reader automatically detects compression
with pa.OSFile('data.arrow', 'rb') as source:
with ipc.open_stream(source) as reader:
table = reader.read_all()
Memory-Mapped Reading
- C++
- Python
#include <arrow/io/file.h>
// Open file with memory mapping
ARROW_ASSIGN_OR_RAISE(auto input,
io::MemoryMappedFile::Open("data.feather", io::FileMode::READ));
ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchFileReader::Open(input));
// Read batches - data is memory-mapped, not copied
ARROW_ASSIGN_OR_RAISE(auto batch,
reader->ReadRecordBatch(0));
// Access data directly from memory-mapped region
auto column = batch->column(0);
import pyarrow as pa
import pyarrow.ipc as ipc
# Open file with memory mapping
with pa.memory_map('data.feather', 'r') as source:
with ipc.open_file(source) as reader:
# Read batches - data is memory-mapped
batch = reader.get_batch(0)
# Access data directly from memory-mapped region
column = batch.column(0)
Streaming Over Networks
- C++
- Python
#include <arrow/io/memory.h>
// Sender: Write to buffer
ARROW_ASSIGN_OR_RAISE(auto buffer_stream,
io::BufferOutputStream::Create());
ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::MakeStreamWriter(buffer_stream, schema));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_ASSIGN_OR_RAISE(auto buffer,
buffer_stream->Finish());
// Send buffer->data() over network...
// Receiver: Read from buffer
ARROW_ASSIGN_OR_RAISE(auto buffer_reader,
io::BufferReader::FromBuffer(buffer));
ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchStreamReader::Open(buffer_reader));
ARROW_ASSIGN_OR_RAISE(auto received_batch,
reader->Next());
import pyarrow as pa
import pyarrow.ipc as ipc
# Sender: Write to buffer
sink = pa.BufferOutputStream()
with ipc.new_stream(sink, schema) as writer:
writer.write_batch(batch)
buffer = sink.getvalue()
# Send buffer over network...
# Receiver: Read from buffer
with pa.BufferReader(buffer) as source:
with ipc.open_stream(source) as reader:
received_batch = reader.read_next_batch()
Dictionary Encoding
- C++
- Python
// Dictionary encoded arrays are automatically handled
auto dict_type = dictionary(int32(), utf8());
auto field = arrow::field("category", dict_type);
// Create dictionary array
auto indices = ArrayFromJSON(int32(), "[0, 1, 0, 2, 1]");
auto dictionary = ArrayFromJSON(utf8(), "[\"A\", \"B\", \"C\"]");
ARROW_ASSIGN_OR_RAISE(auto dict_array,
DictionaryArray::FromArrays(dict_type, indices, dictionary));
// Write - dictionary is written once, then referenced
auto schema = arrow::schema({field});
auto batch = RecordBatch::Make(schema, 5, {dict_array});
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
# Dictionary encoded arrays are automatically handled
indices = pa.array([0, 1, 0, 2, 1])
dictionary = pa.array(['A', 'B', 'C'])
dict_array = pa.DictionaryArray.from_arrays(
indices, dictionary
)
# Write - dictionary is written once, then referenced
schema = pa.schema([('category', dict_array.type)])
batch = pa.record_batch([dict_array], schema=schema)
with ipc.new_stream(sink, schema) as writer:
writer.write_batch(batch)
Custom Metadata
- C++
- Python
// Add schema metadata
auto metadata = key_value_metadata({
{"source", "sensor_data"},
{"version", "1.0"},
{"timestamp", "2024-03-15T10:00:00Z"}
});
auto schema = arrow::schema(
{field("value", float64())},
metadata
);
// Add batch metadata
auto batch_metadata = key_value_metadata({
{"batch_id", "123"},
{"partition", "2024-03"}
});
ARROW_RETURN_NOT_OK(
writer->WriteRecordBatch(*batch, batch_metadata));
// Read metadata
auto read_schema = reader->schema();
if (read_schema->metadata()) {
std::cout << read_schema->metadata()->ToString() << std::endl;
}
# Add schema metadata
metadata = {
b'source': b'sensor_data',
b'version': b'1.0',
b'timestamp': b'2024-03-15T10:00:00Z'
}
schema = pa.schema(
[('value', pa.float64())],
metadata=metadata
)
# Add batch metadata
batch_metadata = {
b'batch_id': b'123',
b'partition': b'2024-03'
}
writer.write_batch(batch, custom_metadata=batch_metadata)
# Read metadata
read_schema = reader.schema
if read_schema.metadata:
print(read_schema.metadata)
Performance Tips
- Batch Size: Use 64K-1M rows per batch for optimal performance
- Memory Mapping: Use memory-mapped files for large datasets
- Compression: Enable compression for network I/O or storage
- Dictionary Encoding: Use for low-cardinality string columns
- Zero-Copy: Prefer file format for zero-copy reads
- Parallel I/O: Read multiple batches in parallel when possible
Related Topics
- Arrow Flight RPC - Network transport using IPC format
- Arrow Flight SQL - SQL over Flight
- C Data Interface - Zero-copy language integration