Skip to main content

Overview

The Dataset API provides functionality to efficiently work with tabular, potentially larger-than-memory, multi-file datasets. It offers:
  • Unified interface for different file formats (Parquet, CSV, JSON, etc.)
  • Directory-based partitioned datasets
  • Predicate pushdown for efficient filtering
  • Column projection for reading only needed data
  • Parallel reading across multiple files
import pyarrow.dataset as ds

# Open a dataset
dataset = ds.dataset('path/to/data', format='parquet')

# Convert to table
table = dataset.to_table()

Creating Datasets

import pyarrow.dataset as ds

# From a single Parquet file
dataset = ds.dataset('data.parquet', format='parquet')

# From CSV
dataset = ds.dataset('data.csv', format='csv')

# From JSON
dataset = ds.dataset('data.json', format='json')

print(f"Schema: {dataset.schema}")
print(f"Files: {dataset.files}")

Partitioned Datasets

Partitioning organizes data into directories based on column values:

Directory Partitioning

import pyarrow as pa
import pyarrow.dataset as ds

# Example directory structure:
# data/
#   2020/
#     part-0.parquet
#   2021/
#     part-0.parquet
#   2022/
#     part-0.parquet

# Infer partitioning from directories
partitioning = ds.partitioning(
    field_names=['year']
)

dataset = ds.dataset(
    'data/',
    format='parquet',
    partitioning=partitioning
)

print(dataset.schema)  # Includes 'year' field

Hive Partitioning

import pyarrow.dataset as ds

# Example directory structure:
# data/
#   year=2020/month=01/
#     part-0.parquet
#   year=2020/month=02/
#     part-0.parquet
#   year=2021/month=01/
#     part-0.parquet

# Discover Hive partitioning
dataset = ds.dataset(
    'data/',
    format='parquet',
    partitioning='hive'  # Auto-discover from key=value format
)

# Or specify schema
partitioning = ds.partitioning(
    pa.schema([
        ('year', pa.int32()),
        ('month', pa.int32())
    ]),
    flavor='hive'
)

dataset = ds.dataset(
    'data/',
    format='parquet',
    partitioning=partitioning
)

Custom Partitioning

import pyarrow as pa
import pyarrow.dataset as ds

# Explicit schema for partitioning
schema = pa.schema([
    ('year', pa.int32()),
    ('month', pa.int32()),
    ('day', pa.int32())
])

partitioning = ds.partitioning(schema)

dataset = ds.dataset(
    'data/',
    format='parquet',
    partitioning=partitioning
)
Partitioning allows for efficient filtering - only relevant partitions are read when filtering by partition columns.

Reading Data

Basic Reading

import pyarrow.dataset as ds

dataset = ds.dataset('data/', format='parquet')

# Read entire dataset into memory
table = dataset.to_table()

# Read with column selection
table = dataset.to_table(columns=['name', 'age'])

# Read with filter
import pyarrow.compute as pc
table = dataset.to_table(
    filter=pc.field('age') > 25
)

Scanning

import pyarrow.dataset as ds
import pyarrow.compute as pc

dataset = ds.dataset('data/', format='parquet')

# Create a scanner for lazy evaluation
scanner = dataset.scanner(
    columns=['name', 'age', 'score'],
    filter=pc.field('age') > 25
)

# Get schema
print(scanner.projected_schema)

# Read as table
table = scanner.to_table()

# Iterate over batches (memory efficient)
for batch in scanner.to_batches():
    print(f"Batch with {batch.num_rows} rows")
    # Process batch...

# Read as pandas
df = scanner.to_pandas()

Filtering

import pyarrow.dataset as ds
import pyarrow.compute as pc

dataset = ds.dataset('data/', format='parquet')

# Single condition
table = dataset.to_table(
    filter=pc.field('age') > 25
)

# Equality
table = dataset.to_table(
    filter=pc.field('status') == 'active'
)

# In list
table = dataset.to_table(
    filter=pc.field('category').isin(['A', 'B', 'C'])
)
Filters are pushed down to file readers when possible, meaning only relevant data is read from disk.

Writing Datasets

Basic Writing

import pyarrow as pa
import pyarrow.dataset as ds

table = pa.table({
    'year': [2020, 2022, 2021, 2022, 2019, 2021],
    'n_legs': [2, 2, 4, 4, 5, 100],
    'animal': ["Flamingo", "Parrot", "Dog", "Horse",
               "Brittle stars", "Centipede"]
})

# Write to single file
ds.write_dataset(
    table,
    'output.parquet',
    format='parquet'
)

# Write to directory with multiple files
ds.write_dataset(
    table,
    'output_dir/',
    format='parquet',
    basename_template='part-{i}.parquet'
)

Partitioned Writing

import pyarrow as pa
import pyarrow.dataset as ds

table = pa.table({
    'year': [2020, 2022, 2021, 2022, 2019, 2021],
    'month': [1, 3, 6, 11, 4, 8],
    'n_legs': [2, 2, 4, 4, 5, 100],
    'animal': ["Flamingo", "Parrot", "Dog", "Horse",
               "Brittle stars", "Centipede"]
})

# Partition by year
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    partitioning=['year']
)

# Results in:
# output/
#   2019/
#     part-0.parquet
#   2020/
#     part-0.parquet
#   2021/
#     part-0.parquet
#   2022/
#     part-0.parquet

# Partition by multiple columns
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    partitioning=['year', 'month']
)

# With Hive-style partitioning
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    partitioning=['year'],
    partitioning_flavor='hive'
)
# Results in: output/year=2020/, output/year=2021/, etc.

Write Options

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

table = pa.table({'a': range(1000), 'b': range(1000)})

# Parquet-specific options
parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(
    compression='snappy',
    use_dictionary=True
)

ds.write_dataset(
    table,
    'output/',
    format='parquet',
    file_options=write_options
)

# Control file sizes
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    max_rows_per_file=100,  # Split into multiple files
    max_rows_per_group=50    # Row group size
)

Incremental Writes

import pyarrow.dataset as ds

# Overwrite existing data
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    existing_data_behavior='overwrite_or_ignore'
)

# Delete matching partitions before writing
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    partitioning=['year'],
    existing_data_behavior='delete_matching'
)

# Error if data exists (default)
ds.write_dataset(
    table,
    'output/',
    format='parquet',
    existing_data_behavior='error'
)

File Formats

Parquet Format

import pyarrow.dataset as ds

# With custom options
parquet_format = ds.ParquetFileFormat(
    read_options={
        'dictionary_columns': ['category']
    }
)

dataset = ds.dataset(
    'data/',
    format=parquet_format
)

# Read specific row groups
scanner = dataset.scanner(
    fragment_scan_options=ds.ParquetFragmentScanOptions(
        pre_buffer=True
    )
)

CSV Format

import pyarrow.dataset as ds
import pyarrow.csv as csv

# With custom CSV parsing options
csv_format = ds.CsvFileFormat(
    parse_options=csv.ParseOptions(
        delimiter='|',
        quote_char='"'
    ),
    read_options=csv.ReadOptions(
        skip_rows=1,
        column_names=['id', 'name', 'value']
    )
)

dataset = ds.dataset(
    'data.csv',
    format=csv_format
)

JSON Format

import pyarrow.dataset as ds

json_format = ds.JsonFileFormat()
dataset = ds.dataset(
    'data.json',
    format=json_format
)

Cloud Filesystems

import pyarrow.dataset as ds
from pyarrow import fs

# Configure S3 filesystem
s3 = fs.S3FileSystem(
    region='us-east-1',
    access_key='YOUR_ACCESS_KEY',
    secret_key='YOUR_SECRET_KEY'
)

# Read from S3
dataset = ds.dataset(
    's3://bucket-name/path/to/data/',
    format='parquet',
    filesystem=s3
)

# Or use URI directly
dataset = ds.dataset(
    's3://bucket-name/path/to/data/',
    format='parquet'
)

Union Datasets

Combine multiple datasets:
import pyarrow.dataset as ds

# Create individual datasets
ds1 = ds.dataset('old_data/', format='parquet')
ds2 = ds.dataset('new_data/', format='parquet')

# Combine into union dataset
union_dataset = ds.dataset([ds1, ds2])

# Schemas must be compatible
print(union_dataset.schema)
table = union_dataset.to_table()

Performance Optimization

Parallel Reading

import pyarrow.dataset as ds

dataset = ds.dataset('large_dataset/', format='parquet')

# Control parallelism
scanner = dataset.scanner(
    columns=['id', 'value'],
    use_threads=True  # Enable parallel reading
)

table = scanner.to_table()

Batch Processing

import pyarrow.dataset as ds

dataset = ds.dataset('large_dataset/', format='parquet')

# Process in batches to limit memory usage
scanner = dataset.scanner()

for batch in scanner.to_batches(batch_size=10000):
    # Process each batch
    result = process_batch(batch)

Fragment-Level Operations

import pyarrow.dataset as ds

dataset = ds.dataset('data/', format='parquet')

# Get fragments (individual files)
for fragment in dataset.get_fragments():
    print(f"File: {fragment.path}")
    print(f"Partition: {fragment.partition_expression}")
    
    # Read fragment separately
    table = fragment.to_table()

Practical Example

Complete example of reading, filtering, and processing a large dataset:
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc

# Create sample partitioned dataset
table = pa.table({
    'year': [2020, 2021, 2022] * 100,
    'month': list(range(1, 13)) * 25,
    'value': range(300),
    'category': ['A', 'B', 'C'] * 100
})

ds.write_dataset(
    table,
    'sales_data/',
    format='parquet',
    partitioning=['year', 'month']
)

# Read with filtering and projection
dataset = ds.dataset('sales_data/', format='parquet')

# Efficient query: only reads relevant partitions
result = dataset.to_table(
    columns=['value', 'category'],
    filter=(
        (pc.field('year') == 2022) &
        (pc.field('category').isin(['A', 'B']))
    )
)

print(f"Result: {result.num_rows} rows")

Next Steps

Build docs developers (and LLMs) love