Skip to main content

Overview

The dataset API provides functionality to efficiently work with tabular, potentially larger than memory and multi-file datasets.
import pyarrow.dataset as ds

# Open a dataset from a directory
dataset = ds.dataset('path/to/data', format='parquet')

# Scan and filter the dataset
table = dataset.to_table(filter=ds.field('year') > 2020)

Dataset

Base class for all dataset types.

dataset()

Open a dataset from various sources.
ds.dataset(source, schema=None, format=None, filesystem=None, partitioning=None, 
          partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None)
source
path, list of paths, or list of datasets
Path to a file/directory, list of file paths, or list of dataset objects. Can also be RecordBatch, Table, or iterable of batches.
schema
pyarrow.Schema
default:"None"
Schema for the dataset. If not provided, it will be inferred.
format
str or FileFormat
default:"None"
File format: ‘parquet’, ‘ipc’, ‘feather’, ‘csv’, ‘json’, or ‘orc’.
filesystem
FileSystem or URI str
default:"None"
Filesystem to use. If None, inferred from source.
partitioning
Partitioning, str, or list of str
default:"None"
Partitioning scheme. Can be ‘hive’ for Hive partitioning or a list of field names.
partition_base_dir
str
default:"None"
Base directory for partitioning discovery.
exclude_invalid_files
bool
default:"False"
If True, exclude invalid files during discovery.
ignore_prefixes
list
default:"['.', '_']"
Ignore files with these prefixes during discovery.
dataset
Dataset
A Dataset object (FileSystemDataset, UnionDataset, or InMemoryDataset).

Properties

schema

The dataset schema.
dataset = ds.dataset('data.parquet')
print(dataset.schema)
schema
pyarrow.Schema
The dataset’s schema.

Methods

scanner()

Create a scanner for the dataset.
dataset.scanner(columns=None, filter=None, batch_size=None, use_threads=True)
columns
list of str
default:"None"
Column names to scan. If None, scan all columns.
filter
Expression
default:"None"
Filter expression to apply.
batch_size
int
default:"None"
Number of rows per batch.
use_threads
bool
default:"True"
Whether to use multiple threads.
scanner
Scanner
A Scanner object for reading the dataset.

to_table()

Load the dataset as a Table.
dataset.to_table(columns=None, filter=None, batch_size=None, use_threads=True)
columns
list of str
default:"None"
Columns to load.
filter
Expression
default:"None"
Filter to apply.
table
pyarrow.Table
The loaded table.

to_batches()

Iterate over record batches from the dataset.
for batch in dataset.to_batches(columns=['a', 'b'], filter=ds.field('c') > 5):
    print(batch)
iterator
iterator of RecordBatch
Iterator yielding record batches.

FileSystemDataset

A dataset backed by files on a filesystem.
import pyarrow.dataset as ds

# Open a Parquet dataset
dataset = ds.dataset('s3://bucket/data/', format='parquet')

# With partitioning
dataset = ds.dataset(
    '/data/partitioned',
    format='parquet',
    partitioning=ds.partitioning(
        pa.schema([('year', pa.int16()), ('month', pa.int8())])
    )
)

Properties

files

List of files in the dataset.
dataset = ds.dataset('data/', format='parquet')
print(dataset.files)
files
list of str
File paths in the dataset.

format

File format of the dataset.
format
FileFormat
The file format object.

filesystem

Filesystem used by the dataset.
filesystem
FileSystem
The filesystem object.

Scanner

A scanner for reading datasets with filters and projections.
import pyarrow.dataset as ds

dataset = ds.dataset('data.parquet')
scanner = dataset.scanner(
    columns=['name', 'age'],
    filter=ds.field('age') > 18
)
table = scanner.to_table()

from_dataset()

Create a scanner from a dataset.
Scanner.from_dataset(dataset, columns=None, filter=None, batch_size=None, use_threads=True)

from_batches()

Create a scanner from an iterable of batches.
Scanner.from_batches(batches, schema=None, use_threads=True)
batches
iterable of RecordBatch
Batches to scan.
schema
pyarrow.Schema
Schema of the batches.
scanner
Scanner
A scanner for the batches.

Methods

to_table()

Read all data as a table.
scanner.to_table()
table
pyarrow.Table
The scanned data as a table.

to_batches()

Iterate over record batches.
for batch in scanner.to_batches():
    print(batch)
iterator
iterator of RecordBatch
Iterator over batches.

to_reader()

Get a RecordBatchReader.
reader = scanner.to_reader()
for batch in reader:
    print(batch)
reader
RecordBatchReader
A reader for the scanned data.

Partitioning

Partitioning schemes for organizing dataset files.

partitioning()

Create a partitioning scheme.
ds.partitioning(schema=None, field_names=None, flavor=None, dictionaries=None)
schema
pyarrow.Schema
default:"None"
Schema describing partition fields.
field_names
list of str
default:"None"
Field names for DirectoryPartitioning (types will be inferred).
flavor
str
default:"None"
Partitioning flavor: None (directory), ‘hive’, or ‘filename’.
dictionaries
dict or 'infer'
default:"None"
Dictionary values for dictionary-encoded partition fields.
partitioning
Partitioning or PartitioningFactory
A partitioning scheme.

Examples

import pyarrow as pa
import pyarrow.dataset as ds

# Directory partitioning with schema
part = ds.partitioning(
    pa.schema([('year', pa.int16()), ('month', pa.int8())])
)

# Directory partitioning with field names (types inferred)
part = ds.partitioning(field_names=['year', 'month'])

# Hive partitioning
part = ds.partitioning(
    pa.schema([('year', pa.int16()), ('month', pa.int8())]),
    flavor='hive'
)

# Hive partitioning with inference
part = ds.partitioning(flavor='hive')

Writing Datasets

write_dataset()

Write a dataset to files.
ds.write_dataset(data, base_dir, basename_template=None, format=None, 
                partitioning=None, partitioning_flavor=None, schema=None,
                filesystem=None, file_options=None, use_threads=True,
                max_partitions=1024, max_open_files=1024, max_rows_per_file=0,
                min_rows_per_group=0, max_rows_per_group=1048576,
                file_visitor=None, existing_data_behavior='error')
data
Dataset, Table, RecordBatch, or iterable
Data to write.
base_dir
str
Root directory to write the dataset.
basename_template
str
default:"None"
Template for file names. Use '' for auto-increment. Defaults to ‘part-.[ext]’.
format
str or FileFormat
default:"None"
File format: ‘parquet’, ‘ipc’, ‘feather’, or ‘csv’.
partitioning
Partitioning or list of str
default:"None"
Partitioning scheme.
partitioning_flavor
str
default:"None"
Partitioning flavor when partitioning is a list of field names.
use_threads
bool
default:"True"
Use multiple threads for writing.
max_partitions
int
default:"1024"
Maximum number of partitions.
max_open_files
int
default:"1024"
Maximum number of files to keep open.
max_rows_per_file
int
default:"0"
Maximum rows per file (0 = unlimited).
max_rows_per_group
int
default:"1048576"
Maximum rows per row group.
existing_data_behavior
str
default:"'error'"
How to handle existing data: ‘error’, ‘overwrite_or_ignore’, or ‘delete_matching’.

Example

import pyarrow as pa
import pyarrow.dataset as ds

table = pa.table({
    'year': [2020, 2020, 2021, 2021],
    'month': [1, 2, 1, 2],
    'value': [10, 20, 30, 40]
})

# Write partitioned dataset
ds.write_dataset(
    table,
    'output_dir',
    format='parquet',
    partitioning=['year', 'month']
)

# Write with Hive partitioning
ds.write_dataset(
    table,
    'output_dir',
    format='parquet',
    partitioning=['year', 'month'],
    partitioning_flavor='hive'
)

File Formats

ParquetFileFormat

Parquet file format.
from pyarrow.dataset import ParquetFileFormat

format = ParquetFileFormat()
dataset = ds.dataset('data/', format=format)

IpcFileFormat

Arrow IPC (Feather V2) file format.
from pyarrow.dataset import IpcFileFormat

format = IpcFileFormat()
dataset = ds.dataset('data/', format=format)

CsvFileFormat

CSV file format.
from pyarrow.dataset import CsvFileFormat

format = CsvFileFormat()
dataset = ds.dataset('data/', format=format)

JsonFileFormat

JSON file format.
from pyarrow.dataset import JsonFileFormat

format = JsonFileFormat()
dataset = ds.dataset('data/', format=format)

Fragment

A Fragment represents a subset of a dataset.

Properties

physical_schema

The physical schema of the fragment.
schema
pyarrow.Schema
The fragment’s physical schema.

Methods

to_table()

Read the fragment as a table.
fragment.to_table(schema=None, columns=None, filter=None, use_threads=True)
table
pyarrow.Table
The fragment data as a table.

Expression Filters

Datasets support filtering using compute expressions.
import pyarrow.dataset as ds
import pyarrow.compute as pc

# Simple filter
filter_expr = ds.field('age') > 18

# Combined filters
filter_expr = (ds.field('age') > 18) & (ds.field('country') == 'USA')

# Use with dataset
dataset = ds.dataset('data.parquet')
table = dataset.to_table(filter=filter_expr)

# Complex expressions
filter_expr = (
    (ds.field('year') >= 2020) &
    (ds.field('month').isin([1, 2, 3])) &
    (ds.field('value') > 100)
)

Build docs developers (and LLMs) love