Overview
The dataset API provides functionality to efficiently work with tabular, potentially larger than memory and multi-file datasets.
import pyarrow.dataset as ds
# Open a dataset from a directory
dataset = ds.dataset('path/to/data', format='parquet')
# Scan and filter the dataset
table = dataset.to_table(filter=ds.field('year') > 2020)
Dataset
Base class for all dataset types.
dataset()
Open a dataset from various sources.
ds.dataset(source, schema=None, format=None, filesystem=None, partitioning=None,
partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None)
source
path, list of paths, or list of datasets
Path to a file/directory, list of file paths, or list of dataset objects. Can also be RecordBatch, Table, or iterable of batches.
schema
pyarrow.Schema
default:"None"
Schema for the dataset. If not provided, it will be inferred.
format
str or FileFormat
default:"None"
File format: ‘parquet’, ‘ipc’, ‘feather’, ‘csv’, ‘json’, or ‘orc’.
filesystem
FileSystem or URI str
default:"None"
Filesystem to use. If None, inferred from source.
partitioning
Partitioning, str, or list of str
default:"None"
Partitioning scheme. Can be ‘hive’ for Hive partitioning or a list of field names.
Base directory for partitioning discovery.
If True, exclude invalid files during discovery.
Ignore files with these prefixes during discovery.
A Dataset object (FileSystemDataset, UnionDataset, or InMemoryDataset).
Properties
schema
The dataset schema.
dataset = ds.dataset('data.parquet')
print(dataset.schema)
Methods
scanner()
Create a scanner for the dataset.
dataset.scanner(columns=None, filter=None, batch_size=None, use_threads=True)
columns
list of str
default:"None"
Column names to scan. If None, scan all columns.
Filter expression to apply.
Number of rows per batch.
Whether to use multiple threads.
A Scanner object for reading the dataset.
to_table()
Load the dataset as a Table.
dataset.to_table(columns=None, filter=None, batch_size=None, use_threads=True)
columns
list of str
default:"None"
Columns to load.
to_batches()
Iterate over record batches from the dataset.
for batch in dataset.to_batches(columns=['a', 'b'], filter=ds.field('c') > 5):
print(batch)
Iterator yielding record batches.
FileSystemDataset
A dataset backed by files on a filesystem.
import pyarrow.dataset as ds
# Open a Parquet dataset
dataset = ds.dataset('s3://bucket/data/', format='parquet')
# With partitioning
dataset = ds.dataset(
'/data/partitioned',
format='parquet',
partitioning=ds.partitioning(
pa.schema([('year', pa.int16()), ('month', pa.int8())])
)
)
Properties
files
List of files in the dataset.
dataset = ds.dataset('data/', format='parquet')
print(dataset.files)
File paths in the dataset.
File format of the dataset.
filesystem
Filesystem used by the dataset.
Scanner
A scanner for reading datasets with filters and projections.
import pyarrow.dataset as ds
dataset = ds.dataset('data.parquet')
scanner = dataset.scanner(
columns=['name', 'age'],
filter=ds.field('age') > 18
)
table = scanner.to_table()
from_dataset()
Create a scanner from a dataset.
Scanner.from_dataset(dataset, columns=None, filter=None, batch_size=None, use_threads=True)
from_batches()
Create a scanner from an iterable of batches.
Scanner.from_batches(batches, schema=None, use_threads=True)
A scanner for the batches.
Methods
to_table()
Read all data as a table.
The scanned data as a table.
to_batches()
Iterate over record batches.
for batch in scanner.to_batches():
print(batch)
to_reader()
Get a RecordBatchReader.
reader = scanner.to_reader()
for batch in reader:
print(batch)
A reader for the scanned data.
Partitioning
Partitioning schemes for organizing dataset files.
partitioning()
Create a partitioning scheme.
ds.partitioning(schema=None, field_names=None, flavor=None, dictionaries=None)
schema
pyarrow.Schema
default:"None"
Schema describing partition fields.
field_names
list of str
default:"None"
Field names for DirectoryPartitioning (types will be inferred).
Partitioning flavor: None (directory), ‘hive’, or ‘filename’.
dictionaries
dict or 'infer'
default:"None"
Dictionary values for dictionary-encoded partition fields.
partitioning
Partitioning or PartitioningFactory
A partitioning scheme.
Examples
import pyarrow as pa
import pyarrow.dataset as ds
# Directory partitioning with schema
part = ds.partitioning(
pa.schema([('year', pa.int16()), ('month', pa.int8())])
)
# Directory partitioning with field names (types inferred)
part = ds.partitioning(field_names=['year', 'month'])
# Hive partitioning
part = ds.partitioning(
pa.schema([('year', pa.int16()), ('month', pa.int8())]),
flavor='hive'
)
# Hive partitioning with inference
part = ds.partitioning(flavor='hive')
Writing Datasets
write_dataset()
Write a dataset to files.
ds.write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, partitioning_flavor=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
max_partitions=1024, max_open_files=1024, max_rows_per_file=0,
min_rows_per_group=0, max_rows_per_group=1048576,
file_visitor=None, existing_data_behavior='error')
data
Dataset, Table, RecordBatch, or iterable
Data to write.
Root directory to write the dataset.
Template for file names. Use '' for auto-increment. Defaults to ‘part-.[ext]’.
format
str or FileFormat
default:"None"
File format: ‘parquet’, ‘ipc’, ‘feather’, or ‘csv’.
partitioning
Partitioning or list of str
default:"None"
Partitioning scheme.
Partitioning flavor when partitioning is a list of field names.
Use multiple threads for writing.
Maximum number of partitions.
Maximum number of files to keep open.
Maximum rows per file (0 = unlimited).
Maximum rows per row group.
How to handle existing data: ‘error’, ‘overwrite_or_ignore’, or ‘delete_matching’.
Example
import pyarrow as pa
import pyarrow.dataset as ds
table = pa.table({
'year': [2020, 2020, 2021, 2021],
'month': [1, 2, 1, 2],
'value': [10, 20, 30, 40]
})
# Write partitioned dataset
ds.write_dataset(
table,
'output_dir',
format='parquet',
partitioning=['year', 'month']
)
# Write with Hive partitioning
ds.write_dataset(
table,
'output_dir',
format='parquet',
partitioning=['year', 'month'],
partitioning_flavor='hive'
)
Parquet file format.
from pyarrow.dataset import ParquetFileFormat
format = ParquetFileFormat()
dataset = ds.dataset('data/', format=format)
Arrow IPC (Feather V2) file format.
from pyarrow.dataset import IpcFileFormat
format = IpcFileFormat()
dataset = ds.dataset('data/', format=format)
CSV file format.
from pyarrow.dataset import CsvFileFormat
format = CsvFileFormat()
dataset = ds.dataset('data/', format=format)
JSON file format.
from pyarrow.dataset import JsonFileFormat
format = JsonFileFormat()
dataset = ds.dataset('data/', format=format)
Fragment
A Fragment represents a subset of a dataset.
Properties
physical_schema
The physical schema of the fragment.
The fragment’s physical schema.
Methods
to_table()
Read the fragment as a table.
fragment.to_table(schema=None, columns=None, filter=None, use_threads=True)
The fragment data as a table.
Expression Filters
Datasets support filtering using compute expressions.
import pyarrow.dataset as ds
import pyarrow.compute as pc
# Simple filter
filter_expr = ds.field('age') > 18
# Combined filters
filter_expr = (ds.field('age') > 18) & (ds.field('country') == 'USA')
# Use with dataset
dataset = ds.dataset('data.parquet')
table = dataset.to_table(filter=filter_expr)
# Complex expressions
filter_expr = (
(ds.field('year') >= 2020) &
(ds.field('month').isin([1, 2, 3])) &
(ds.field('value') > 100)
)