Dataset Basics
Creating Datasets
Datasets can be created from various sources:- C++
- Python
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
using namespace arrow::dataset;
// Create dataset from a directory of Parquet files
auto fs = arrow::fs::LocalFileSystem().ValueOrDie();
auto format = std::make_shared<ParquetFileFormat>();
auto factory = FileSystemDatasetFactory::Make(
fs,
arrow::fs::FileSelector{"/path/to/data"},
format,
FileSystemFactoryOptions{}
).ValueOrDie();
auto dataset = factory->Finish().ValueOrDie();
// Get dataset schema
auto schema = dataset->schema();
import pyarrow as pa
import pyarrow.dataset as ds
# Create dataset from directory
dataset = ds.dataset(
"/path/to/data",
format="parquet",
partitioning="hive"
)
# Get schema
schema = dataset.schema
print(schema)
Partitioned Datasets
Datasets support partition discovery and pruning:- C++
- Python
#include <arrow/dataset/partition.h>
// Hive-style partitioning (year=2023/month=01/)
auto partitioning = arrow::dataset::HivePartitioning::MakeFactory(
arrow::schema({
arrow::field("year", arrow::int32()),
arrow::field("month", arrow::int32())
})
);
FileSystemFactoryOptions options;
options.partitioning = partitioning;
auto factory = FileSystemDatasetFactory::Make(
fs, selector, format, options
).ValueOrDie();
auto dataset = factory->Finish().ValueOrDie();
import pyarrow.dataset as ds
# Hive-style partitioning
dataset = ds.dataset(
"/path/to/data",
format="parquet",
partitioning=ds.HivePartitioning(
pa.schema([
("year", pa.int32()),
("month", pa.int32())
])
)
)
# Directory partitioning (2023/01/)
dataset = ds.dataset(
"/path/to/data",
format="parquet",
partitioning=ds.DirectoryPartitioning(
pa.schema([
("year", pa.int32()),
("month", pa.int32())
])
)
)
Scanning Datasets
Basic Scanning
- C++
- Python
#include <arrow/dataset/scanner.h>
// Create scanner builder
auto scanner_builder = dataset->NewScan().ValueOrDie();
// Configure scan options
scanner_builder->Project({"col_a", "col_b", "col_c"});
scanner_builder->BatchSize(128 * 1024); // 128Ki rows
scanner_builder->UseThreads(true);
// Build scanner
auto scanner = scanner_builder->Finish().ValueOrDie();
// Scan to table
auto table = scanner->ToTable().ValueOrDie();
// Or scan to batches
auto batch_iterator = scanner->ScanBatches().ValueOrDie();
for (auto batch_result : batch_iterator) {
auto tagged_batch = batch_result.ValueOrDie();
auto batch = tagged_batch.record_batch;
// Process batch
}
import pyarrow.dataset as ds
# Scan entire dataset to table
table = dataset.to_table()
# Scan with projection
table = dataset.to_table(
columns=["col_a", "col_b", "col_c"]
)
# Scan to batches
for batch in dataset.to_batches(
batch_size=128 * 1024,
use_threads=True
):
# Process batch
print(batch)
Filtering During Scan
- C++
- Python
#include <arrow/compute/expression.h>
// Build filter expression
auto filter = arrow::compute::and_(
arrow::compute::greater(
arrow::compute::field_ref("price"),
arrow::compute::literal(100.0)
),
arrow::compute::equal(
arrow::compute::field_ref("status"),
arrow::compute::literal("active")
)
);
// Apply filter to scanner
auto scanner_builder = dataset->NewScan().ValueOrDie();
scanner_builder->Filter(filter);
auto scanner = scanner_builder->Finish().ValueOrDie();
auto filtered_table = scanner->ToTable().ValueOrDie();
import pyarrow.dataset as ds
import pyarrow.compute as pc
# Build filter
filter = (pc.field("price") > 100.0) & (pc.field("status") == "active")
# Scan with filter
table = dataset.to_table(filter=filter)
# Partition pruning happens automatically
# Only relevant partitions will be scanned
partition_filter = pc.field("year") == 2023
table = dataset.to_table(filter=partition_filter)
Multiple Formats
Parquet
- C++
- Python
#include <arrow/dataset/file_parquet.h>
auto format = std::make_shared<ParquetFileFormat>();
// Configure Parquet-specific options
auto parquet_opts = std::make_shared<ParquetFragmentScanOptions>();
parquet_opts->arrow_reader_properties->set_batch_size(256 * 1024);
parquet_opts->arrow_reader_properties->set_pre_buffer(true);
auto scanner_builder = dataset->NewScan().ValueOrDie();
scanner_builder->FragmentScanOptions(parquet_opts);
import pyarrow.dataset as ds
import pyarrow.parquet as pq
# Parquet dataset with options
dataset = ds.parquet_dataset(
"/path/to/data",
partitioning="hive",
use_legacy_dataset=False
)
# Configure Parquet options
parquet_opts = ds.ParquetFileFormat().make_fragment_scan_options(
pre_buffer=True,
batch_size=256 * 1024
)
table = dataset.to_table(fragment_scan_options=parquet_opts)
CSV
- C++
- Python
#include <arrow/dataset/file_csv.h>
#include <arrow/csv/options.h>
auto format = std::make_shared<CsvFileFormat>();
// Configure CSV options
auto csv_opts = std::make_shared<CsvFragmentScanOptions>();
csv_opts->convert_options.include_columns = {"col_a", "col_b"};
csv_opts->convert_options.strings_can_be_null = true;
auto factory = FileSystemDatasetFactory::Make(
fs, selector, format, factory_options
).ValueOrDie();
import pyarrow.dataset as ds
import pyarrow.csv as csv
# CSV dataset
csv_format = ds.CsvFileFormat(
parse_options=csv.ParseOptions(delimiter=","),
convert_options=csv.ConvertOptions(
include_columns=["col_a", "col_b"],
strings_can_be_null=True
)
)
dataset = ds.dataset(
"/path/to/data",
format=csv_format
)
IPC/Feather
- C++
- Python
#include <arrow/dataset/file_ipc.h>
auto format = std::make_shared<IpcFileFormat>();
auto factory = FileSystemDatasetFactory::Make(
fs, selector, format, factory_options
).ValueOrDie();
auto dataset = factory->Finish().ValueOrDie();
import pyarrow.dataset as ds
# IPC/Feather dataset
dataset = ds.dataset(
"/path/to/data",
format="ipc"
)
# Or specifically Feather
dataset = ds.dataset(
"/path/to/data",
format="feather"
)
Schema Evolution
Datasets handle schema evolution automatically:- C++
- Python
// Dataset unifies schemas from multiple files
// Missing columns are filled with nulls
// Extra columns can be projected out
auto scanner_builder = dataset->NewScan().ValueOrDie();
// Project only common columns
scanner_builder->Project({"col_a", "col_b"});
auto scanner = scanner_builder->Finish().ValueOrDie();
auto table = scanner->ToTable().ValueOrDie();
# Schema is automatically unified
# Files with different schemas are handled gracefully
dataset = ds.dataset("/path/to/data", format="parquet")
# Unified schema
print(dataset.schema)
# Scan with projection handles missing columns
table = dataset.to_table(columns=["col_a", "col_b"])
Writing Datasets
- C++
- Python
#include <arrow/dataset/dataset_writer.h>
// Write partitioned dataset
auto partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
arrow::schema({{"year", arrow::int32()}, {"month", arrow::int32()}})
);
FileSystemDatasetWriteOptions write_opts;
write_opts.file_write_options = format->DefaultWriteOptions();
write_opts.filesystem = fs;
write_opts.base_dir = "/output/path";
write_opts.partitioning = partitioning;
write_opts.basename_template = "part-{i}.parquet";
auto scanner = /* ... */;
arrow::dataset::FileSystemDataset::Write(
write_opts, scanner
).ValueOrDie();
import pyarrow.dataset as ds
import pyarrow.parquet as pq
# Write partitioned dataset
ds.write_dataset(
data=table,
base_dir="/output/path",
format="parquet",
partitioning=ds.partitioning(
pa.schema([("year", pa.int32()), ("month", pa.int32())]),
flavor="hive"
),
basename_template="part-{i}.parquet"
)
# Write with scanner (for filtering/projection)
ds.write_dataset(
data=dataset.scanner(filter=pc.field("year") == 2023),
base_dir="/output/path",
format="parquet"
)
Fragment-Level Operations
- C++
- Python
// Get fragments from dataset
auto fragments = dataset->GetFragments().ValueOrDie();
for (auto fragment_result : fragments) {
auto fragment = fragment_result.ValueOrDie();
// Get fragment schema
auto schema = fragment->ReadPhysicalSchema().ValueOrDie();
// Count rows (metadata only)
auto count = fragment->CountRows(
arrow::compute::literal(true),
scan_options
).ValueOrDie();
}
# Get fragments
fragments = list(dataset.get_fragments())
for fragment in fragments:
# Fragment metadata
print(f"Path: {fragment.path}")
print(f"Format: {fragment.format.type_name}")
# Partition expression
print(f"Partition: {fragment.partition_expression}")
# Count rows
count = fragment.count_rows()
print(f"Rows: {count}")
Performance Tips
- Use partition pruning: Structure data with meaningful partitions
- Push down filters: Filter as early as possible to reduce I/O
- Project only needed columns: Avoid reading unnecessary data
- Configure batch size: Balance memory usage and processing efficiency
- Enable readahead: Use
batch_readaheadandfragment_readahead - Use parallel scanning: Set
use_threads=truefor better performance
Next Steps
- Learn about Expressions and Filters for advanced filtering
- Explore Acero Query Engine for complex queries
- See Compute Functions for data transformations