Overview
The Dataset API provides functionality to efficiently work with tabular, potentially larger-than-memory, multi-file datasets. It offers:- Unified interface for different file formats (Parquet, CSV, JSON, etc.)
- Directory-based partitioned datasets
- Predicate pushdown for efficient filtering
- Column projection for reading only needed data
- Parallel reading across multiple files
import pyarrow.dataset as ds
# Open a dataset
dataset = ds.dataset('path/to/data', format='parquet')
# Convert to table
table = dataset.to_table()
Creating Datasets
- Single File
- Directory
- Multiple Files
- In Memory
import pyarrow.dataset as ds
# From a single Parquet file
dataset = ds.dataset('data.parquet', format='parquet')
# From CSV
dataset = ds.dataset('data.csv', format='csv')
# From JSON
dataset = ds.dataset('data.json', format='json')
print(f"Schema: {dataset.schema}")
print(f"Files: {dataset.files}")
import pyarrow.dataset as ds
# Recursively discover all Parquet files
dataset = ds.dataset(
'data_directory/',
format='parquet'
)
# With explicit filesystem
from pyarrow import fs
local_fs = fs.LocalFileSystem()
dataset = ds.dataset(
'data_directory/',
format='parquet',
filesystem=local_fs
)
import pyarrow.dataset as ds
# From list of files
files = [
'part0.parquet',
'part1.parquet',
'part2.parquet'
]
dataset = ds.dataset(files, format='parquet')
# With wildcard patterns (use glob)
import glob
files = glob.glob('data/*.parquet')
dataset = ds.dataset(files, format='parquet')
import pyarrow as pa
import pyarrow.dataset as ds
# Create dataset from table
table = pa.table({
'a': [1, 2, 3],
'b': ['x', 'y', 'z']
})
dataset = ds.dataset(table)
# From record batches
batches = [
pa.record_batch([[1, 2]], names=['a']),
pa.record_batch([[3, 4]], names=['a'])
]
dataset = ds.dataset(batches)
Partitioned Datasets
Partitioning organizes data into directories based on column values:Directory Partitioning
import pyarrow as pa
import pyarrow.dataset as ds
# Example directory structure:
# data/
# 2020/
# part-0.parquet
# 2021/
# part-0.parquet
# 2022/
# part-0.parquet
# Infer partitioning from directories
partitioning = ds.partitioning(
field_names=['year']
)
dataset = ds.dataset(
'data/',
format='parquet',
partitioning=partitioning
)
print(dataset.schema) # Includes 'year' field
Hive Partitioning
import pyarrow.dataset as ds
# Example directory structure:
# data/
# year=2020/month=01/
# part-0.parquet
# year=2020/month=02/
# part-0.parquet
# year=2021/month=01/
# part-0.parquet
# Discover Hive partitioning
dataset = ds.dataset(
'data/',
format='parquet',
partitioning='hive' # Auto-discover from key=value format
)
# Or specify schema
partitioning = ds.partitioning(
pa.schema([
('year', pa.int32()),
('month', pa.int32())
]),
flavor='hive'
)
dataset = ds.dataset(
'data/',
format='parquet',
partitioning=partitioning
)
Custom Partitioning
import pyarrow as pa
import pyarrow.dataset as ds
# Explicit schema for partitioning
schema = pa.schema([
('year', pa.int32()),
('month', pa.int32()),
('day', pa.int32())
])
partitioning = ds.partitioning(schema)
dataset = ds.dataset(
'data/',
format='parquet',
partitioning=partitioning
)
Partitioning allows for efficient filtering - only relevant partitions are read when filtering by partition columns.
Reading Data
Basic Reading
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
# Read entire dataset into memory
table = dataset.to_table()
# Read with column selection
table = dataset.to_table(columns=['name', 'age'])
# Read with filter
import pyarrow.compute as pc
table = dataset.to_table(
filter=pc.field('age') > 25
)
Scanning
import pyarrow.dataset as ds
import pyarrow.compute as pc
dataset = ds.dataset('data/', format='parquet')
# Create a scanner for lazy evaluation
scanner = dataset.scanner(
columns=['name', 'age', 'score'],
filter=pc.field('age') > 25
)
# Get schema
print(scanner.projected_schema)
# Read as table
table = scanner.to_table()
# Iterate over batches (memory efficient)
for batch in scanner.to_batches():
print(f"Batch with {batch.num_rows} rows")
# Process batch...
# Read as pandas
df = scanner.to_pandas()
Filtering
- Simple Filters
- Combined Filters
- String Filters
import pyarrow.dataset as ds
import pyarrow.compute as pc
dataset = ds.dataset('data/', format='parquet')
# Single condition
table = dataset.to_table(
filter=pc.field('age') > 25
)
# Equality
table = dataset.to_table(
filter=pc.field('status') == 'active'
)
# In list
table = dataset.to_table(
filter=pc.field('category').isin(['A', 'B', 'C'])
)
import pyarrow.dataset as ds
import pyarrow.compute as pc
dataset = ds.dataset('data/', format='parquet')
# AND condition
table = dataset.to_table(
filter=(
(pc.field('age') > 25) &
(pc.field('score') >= 80)
)
)
# OR condition
table = dataset.to_table(
filter=(
(pc.field('status') == 'active') |
(pc.field('priority') == 'high')
)
)
# Complex expression
table = dataset.to_table(
filter=(
((pc.field('age') >= 18) & (pc.field('age') <= 65)) &
(pc.field('country').isin(['US', 'UK', 'CA']))
)
)
import pyarrow.dataset as ds
import pyarrow.compute as pc
dataset = ds.dataset('data/', format='parquet')
# String matching
table = dataset.to_table(
filter=pc.match_substring(pc.field('name'), 'Smith')
)
# Starts with
table = dataset.to_table(
filter=pc.starts_with(pc.field('email'), 'admin')
)
# Case-insensitive match
table = dataset.to_table(
filter=pc.match_substring(
pc.utf8_lower(pc.field('name')),
'john'
)
)
Filters are pushed down to file readers when possible, meaning only relevant data is read from disk.
Writing Datasets
Basic Writing
import pyarrow as pa
import pyarrow.dataset as ds
table = pa.table({
'year': [2020, 2022, 2021, 2022, 2019, 2021],
'n_legs': [2, 2, 4, 4, 5, 100],
'animal': ["Flamingo", "Parrot", "Dog", "Horse",
"Brittle stars", "Centipede"]
})
# Write to single file
ds.write_dataset(
table,
'output.parquet',
format='parquet'
)
# Write to directory with multiple files
ds.write_dataset(
table,
'output_dir/',
format='parquet',
basename_template='part-{i}.parquet'
)
Partitioned Writing
import pyarrow as pa
import pyarrow.dataset as ds
table = pa.table({
'year': [2020, 2022, 2021, 2022, 2019, 2021],
'month': [1, 3, 6, 11, 4, 8],
'n_legs': [2, 2, 4, 4, 5, 100],
'animal': ["Flamingo", "Parrot", "Dog", "Horse",
"Brittle stars", "Centipede"]
})
# Partition by year
ds.write_dataset(
table,
'output/',
format='parquet',
partitioning=['year']
)
# Results in:
# output/
# 2019/
# part-0.parquet
# 2020/
# part-0.parquet
# 2021/
# part-0.parquet
# 2022/
# part-0.parquet
# Partition by multiple columns
ds.write_dataset(
table,
'output/',
format='parquet',
partitioning=['year', 'month']
)
# With Hive-style partitioning
ds.write_dataset(
table,
'output/',
format='parquet',
partitioning=['year'],
partitioning_flavor='hive'
)
# Results in: output/year=2020/, output/year=2021/, etc.
Write Options
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
table = pa.table({'a': range(1000), 'b': range(1000)})
# Parquet-specific options
parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(
compression='snappy',
use_dictionary=True
)
ds.write_dataset(
table,
'output/',
format='parquet',
file_options=write_options
)
# Control file sizes
ds.write_dataset(
table,
'output/',
format='parquet',
max_rows_per_file=100, # Split into multiple files
max_rows_per_group=50 # Row group size
)
Incremental Writes
import pyarrow.dataset as ds
# Overwrite existing data
ds.write_dataset(
table,
'output/',
format='parquet',
existing_data_behavior='overwrite_or_ignore'
)
# Delete matching partitions before writing
ds.write_dataset(
table,
'output/',
format='parquet',
partitioning=['year'],
existing_data_behavior='delete_matching'
)
# Error if data exists (default)
ds.write_dataset(
table,
'output/',
format='parquet',
existing_data_behavior='error'
)
File Formats
Parquet Format
import pyarrow.dataset as ds
# With custom options
parquet_format = ds.ParquetFileFormat(
read_options={
'dictionary_columns': ['category']
}
)
dataset = ds.dataset(
'data/',
format=parquet_format
)
# Read specific row groups
scanner = dataset.scanner(
fragment_scan_options=ds.ParquetFragmentScanOptions(
pre_buffer=True
)
)
CSV Format
import pyarrow.dataset as ds
import pyarrow.csv as csv
# With custom CSV parsing options
csv_format = ds.CsvFileFormat(
parse_options=csv.ParseOptions(
delimiter='|',
quote_char='"'
),
read_options=csv.ReadOptions(
skip_rows=1,
column_names=['id', 'name', 'value']
)
)
dataset = ds.dataset(
'data.csv',
format=csv_format
)
JSON Format
import pyarrow.dataset as ds
json_format = ds.JsonFileFormat()
dataset = ds.dataset(
'data.json',
format=json_format
)
Cloud Filesystems
- AWS S3
- Google Cloud Storage
- Azure Blob Storage
import pyarrow.dataset as ds
from pyarrow import fs
# Configure S3 filesystem
s3 = fs.S3FileSystem(
region='us-east-1',
access_key='YOUR_ACCESS_KEY',
secret_key='YOUR_SECRET_KEY'
)
# Read from S3
dataset = ds.dataset(
's3://bucket-name/path/to/data/',
format='parquet',
filesystem=s3
)
# Or use URI directly
dataset = ds.dataset(
's3://bucket-name/path/to/data/',
format='parquet'
)
import pyarrow.dataset as ds
from pyarrow import fs
# Configure GCS filesystem
gcs = fs.GcsFileSystem(
anonymous=False,
project_id='your-project-id'
)
dataset = ds.dataset(
'gs://bucket-name/path/to/data/',
format='parquet',
filesystem=gcs
)
import pyarrow.dataset as ds
from pyarrow import fs
# Configure Azure filesystem
azure = fs.AzureFileSystem(
account_name='your-account',
account_key='YOUR_KEY'
)
dataset = ds.dataset(
'az://container/path/to/data/',
format='parquet',
filesystem=azure
)
Union Datasets
Combine multiple datasets:import pyarrow.dataset as ds
# Create individual datasets
ds1 = ds.dataset('old_data/', format='parquet')
ds2 = ds.dataset('new_data/', format='parquet')
# Combine into union dataset
union_dataset = ds.dataset([ds1, ds2])
# Schemas must be compatible
print(union_dataset.schema)
table = union_dataset.to_table()
Performance Optimization
Parallel Reading
import pyarrow.dataset as ds
dataset = ds.dataset('large_dataset/', format='parquet')
# Control parallelism
scanner = dataset.scanner(
columns=['id', 'value'],
use_threads=True # Enable parallel reading
)
table = scanner.to_table()
Batch Processing
import pyarrow.dataset as ds
dataset = ds.dataset('large_dataset/', format='parquet')
# Process in batches to limit memory usage
scanner = dataset.scanner()
for batch in scanner.to_batches(batch_size=10000):
# Process each batch
result = process_batch(batch)
Fragment-Level Operations
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
# Get fragments (individual files)
for fragment in dataset.get_fragments():
print(f"File: {fragment.path}")
print(f"Partition: {fragment.partition_expression}")
# Read fragment separately
table = fragment.to_table()
Practical Example
Complete example of reading, filtering, and processing a large dataset:import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
# Create sample partitioned dataset
table = pa.table({
'year': [2020, 2021, 2022] * 100,
'month': list(range(1, 13)) * 25,
'value': range(300),
'category': ['A', 'B', 'C'] * 100
})
ds.write_dataset(
table,
'sales_data/',
format='parquet',
partitioning=['year', 'month']
)
# Read with filtering and projection
dataset = ds.dataset('sales_data/', format='parquet')
# Efficient query: only reads relevant partitions
result = dataset.to_table(
columns=['value', 'category'],
filter=(
(pc.field('year') == 2022) &
(pc.field('category').isin(['A', 'B']))
)
)
print(f"Result: {result.num_rows} rows")
Next Steps
- Parquet Files - Deep dive into Parquet format
- Compute Functions - Process dataset results
- Tables and Arrays - Work with dataset outputs