Arrow Datasets enable analysis of multi-file and larger-than-memory data using familiar dplyr syntax. The Dataset interface provides lazy evaluation, automatic partitioning, and efficient query execution without loading all data into memory.
What are Datasets?
A Dataset is Arrow’s abstraction for working with data stored in multiple files or data that’s too large to fit in memory. Unlike Tables that hold data in memory, Datasets store metadata about where the data is located and only load necessary portions when computing results.
Example: NYC Taxi Data
The NYC taxi trip dataset is a common example for demonstrating large-scale data processing:
Full dataset : ~1.7 billion rows, 24 columns, ~70GB
Structure : 158 Parquet files (one per month from 2009-2022)
File size : 400-500MB each
Available : S3 and Google Cloud Storage
library (arrow, warn.conflicts = FALSE )
library (dplyr, warn.conflicts = FALSE )
# Connect to cloud storage
bucket <- s3_bucket( "arrow-datasets/nyc-taxi" )
# Or use the tiny version (1/1000th of data, ~70MB)
tiny_bucket <- s3_bucket( "arrow-datasets/nyc-taxi-tiny" )
# Copy to local filesystem
copy_files( from = tiny_bucket, to = "nyc-taxi" )
Opening Datasets
Create a Dataset object that points to your data directory:
ds <- open_dataset( "nyc-taxi" )
ds
#> FileSystemDataset with 158 Parquet files
#> vendor_name: string
#> pickup_datetime: timestamp[ms]
#> dropoff_datetime: timestamp[ms]
#> passenger_count: int64
#> trip_distance: double
#> ...
#> year: int32
#> month: int32
When opening a Dataset, Arrow:
Scans the directory to find relevant files
Parses file paths for Hive-style partitioning
Reads file headers to construct a schema
Does not load actual data values
Opening a Dataset is fast regardless of data size because only metadata is read, not the data itself.
Datasets support multiple file formats:
# Parquet (default)
ds <- open_dataset( "data/parquet" )
# CSV files
ds <- open_csv_dataset( "data/csv" )
# TSV files
ds <- open_tsv_dataset( "data/tsv" )
# Feather/Arrow IPC files
ds <- open_dataset( "data/arrow" , format = "feather" )
# Custom delimited files
ds <- open_delim_dataset( "data/txt" , delim = "|" )
CSV Parsing Options
ds <- open_csv_dataset(
"data/csv" ,
delim = "," ,
quote = " \" " ,
skip = 1 ,
col_names = c ( "col1" , "col2" , "col3" ),
schema = schema( col1 = utf8(), col2 = int32(), col3 = float64())
)
Partitioning
Partitioning splits data into multiple files based on column values, enabling:
Faster queries (skip irrelevant files)
Parallel processing
Organized storage
Hive-Style Partitioning
Hive-style partitioning uses key=value directory names:
nyc-taxi/
year=2009/
month=1/part-0.parquet
month=2/part-0.parquet
year=2010/
month=1/part-0.parquet
# Automatically detects partitioning
ds <- open_dataset( "nyc-taxi" )
# Partition columns appear in the schema
ds $ schema
#> ...
#> year: int32
#> month: int32
Directory Partitioning
For non-self-describing paths, specify partition column names:
nyc-taxi/
2009/01/part-0.parquet
2009/02/part-0.parquet
2010/01/part-0.parquet
# Specify partition columns
ds <- open_dataset( "nyc-taxi" , partitioning = c ( "year" , "month" ))
Custom Partition Schema
Control data types of partition columns:
ds <- open_dataset(
"nyc-taxi" ,
partitioning = schema( year = int16(), month = utf8())
)
Querying Datasets
Query Datasets with dplyr syntax:
result <- ds |>
filter (total_amount > 100 , year == 2015 ) |>
select(tip_amount, total_amount, passenger_count) |>
mutate( tip_pct = 100 * tip_amount / total_amount) |>
group_by(passenger_count) |>
summarise(
median_tip_pct = median (tip_pct),
n = n()
) |>
arrange(desc(median_tip_pct)) |>
collect()
result
#> # A tibble: 10 × 3
#> passenger_count median_tip_pct n
#> <int> <dbl> <int>
#> 1 1 16.6 143087
#> 2 5 16.7 5806
#> 3 6 16.7 3338
Why Is It Fast?
Lazy Evaluation : Query is optimized before execution
Predicate Pushdown : Filters applied during file reading
Partition Pruning : Entire files are skipped based on partitions
Column Projection : Only requested columns are read
Parallel Processing : Multiple files processed simultaneously
Query Without collect()
Inspect the query without executing:
query <- ds |>
filter (total_amount > 100 , year == 2015 ) |>
select(tip_amount, total_amount, passenger_count) |>
mutate( tip_pct = 100 * tip_amount / total_amount)
query
#> FileSystemDataset (query)
#> tip_amount: double
#> total_amount: double
#> passenger_count: int64
#> tip_pct: double
#>
#> See $.data for the source Arrow object
This returns instantly and shows the schema of the eventual result.
Writing Datasets
Transform and repartition data efficiently:
Basic Writing
# Write dataset from data frame
write_dataset(starwars, "output/starwars" )
# Write dataset from existing dataset
ds <- open_dataset( "input/csv" , format = "csv" )
write_dataset(ds, "output/parquet" , format = "parquet" )
Partitioned Writing
Partition data while writing:
# Create sample data
set.seed ( 1234 )
data <- data.frame (
date = seq.Date ( as.Date ( "2024-01-01" ), as.Date ( "2024-12-31" ), by = "day" ),
value = rnorm ( 365 ),
category = sample ( c ( "A" , "B" , "C" ), 365 , replace = TRUE )
)
# Partition by category
data |>
group_by(category) |>
write_dataset( "output/data" , format = "parquet" )
list.files ( "output/data" , recursive = TRUE )
#> [1] "category=A/part-0.parquet"
#> [2] "category=B/part-0.parquet"
#> [3] "category=C/part-0.parquet"
Multi-Column Partitioning
data |>
mutate( year = year(date), month = month(date)) |>
group_by(year, month, category) |>
write_dataset( "output/data" )
# Creates hierarchy: year=2024/month=1/category=A/part-0.parquet
Non-Hive Partitioning
Use bare values instead of key=value:
data |>
group_by(category) |>
write_dataset( "output/data" , hive_style = FALSE )
list.files ( "output/data" , recursive = TRUE )
#> [1] "A/part-0.parquet"
#> [2] "B/part-0.parquet"
#> [3] "C/part-0.parquet"
# Convert CSV to Parquet, keeping only specific data
open_csv_dataset( "input/data.csv" ) |>
filter (year >= 2020 , ! is.na (value)) |>
select( - vendor_id) |> # Drop unnecessary columns
group_by(year, month) |>
write_dataset( "output/parquet" , format = "parquet" )
Batch Processing (Experimental)
Process datasets in batches when you need to apply R functions:
# Sample a large dataset
sampled <- ds |>
filter (year == 2015 ) |>
select(tip_amount, total_amount, passenger_count) |>
map_batches( ~ as_record_batch(sample_frac( as.data.frame (.), 1e-4 ))) |>
mutate( tip_pct = tip_amount / total_amount) |>
collect()
str (sampled)
#> tibble [10,918 × 4]
Batch Aggregation Example
# Fit model on sample
model <- lm (tip_pct ~ total_amount + passenger_count, data = sampled)
# Compute MSE on full dataset using batches
mse <- ds |>
filter (year == 2015 ) |>
select(tip_amount, total_amount, passenger_count) |>
mutate( tip_pct = tip_amount / total_amount) |>
map_batches( function ( batch ) {
batch |>
as.data.frame () |>
mutate( pred = predict (model, newdata = .)) |>
filter ( ! is.nan (tip_pct)) |>
summarize(
sse_partial = sum ((pred - tip_pct) ^ 2 ),
n_partial = n()
) |>
as_record_batch()
}) |>
summarize( mse = sum (sse_partial) / sum (n_partial)) |>
pull(mse)
mse
#> [1] 0.1304284
map_batches() is experimental and not recommended for production use. It processes data in chunks, which can be memory-intensive.
Advanced Dataset Options
Multiple File Paths
Open specific files instead of directories:
files <- c (
"data/file1.parquet" ,
"data/file2.parquet" ,
"data/file3.parquet"
)
ds <- open_dataset(files)
Explicit Schema
Force specific column types:
ds <- open_dataset(
"data" ,
schema = schema(
id = int64(),
name = utf8(),
value = float64(),
timestamp = timestamp ( unit = "ms" , timezone = "UTC" )
)
)
Useful when:
Files have inconsistent types (e.g., int32 vs int64)
You want to ensure specific types
Reading data with non-standard types
Multi-Source Datasets
Combine data from different locations:
# Different directories
ds1 <- open_dataset( "data/2023" )
ds2 <- open_dataset( "data/2024" )
ds_combined <- c (ds1, ds2)
# Different formats
ds_parquet <- open_dataset( "data/parquet" )
ds_csv <- open_csv_dataset( "data/csv" )
ds_all <- c (ds_parquet, ds_csv)
# Cloud and local
ds_s3 <- open_dataset(s3_bucket( "my-bucket/data" ))
ds_local <- open_dataset( "local/data" )
ds_hybrid <- c (ds_s3, ds_local)
Partitioning has both benefits and costs:
Benefits
Faster queries : Skip entire files based on filters
Parallel processing : Process multiple files simultaneously
Organized storage : Logical data organization
Costs
More files : Each file has metadata overhead
Directory traversal : More directories to scan
Small files : Can be inefficient
Best Practices
File Size Guidelines:
Avoid files smaller than 20MB
Avoid files larger than 2GB
Target 100MB-500MB per file
Partition Guidelines:
Avoid more than 10,000 distinct partitions
Partition on commonly filtered columns
Consider cardinality (unique values)
Good Partitioning Example
# Good: Date-based partitioning with reasonable cardinality
data |>
mutate( year = year(date), month = month(date)) |>
group_by(year, month) |> # ~12 partitions per year
write_dataset( "output/data" )
Bad Partitioning Example
# Bad: Too fine partitioning
data |>
mutate( date_str = as.character (date)) |>
group_by(date_str, hour, user_id) |> # Could create millions of files!
write_dataset( "output/data" )
Real-World Example: ETL Pipeline
Complete data processing pipeline:
library (arrow)
library (dplyr)
# 1. Open raw CSV data
raw_ds <- open_csv_dataset( "raw_data/csv/" )
# 2. Clean and transform
processed_ds <- raw_ds |>
filter (
! is.na (amount),
amount > 0 ,
date >= as.Date ( "2020-01-01" )
) |>
mutate(
year = year(date),
month = month(date),
log_amount = log1p (amount)
) |>
select( - internal_id, - temp_field) # Drop unnecessary columns
# 3. Write partitioned Parquet files
processed_ds |>
group_by(year, month) |>
write_dataset(
"processed_data/parquet" ,
format = "parquet" ,
compression = "zstd"
)
# 4. Query the processed data
results <- open_dataset( "processed_data/parquet" ) |>
filter (year == 2023 ) |>
group_by(category) |>
summarize(
total_amount = sum (amount),
avg_amount = mean (amount),
n = n()
) |>
arrange(desc(total_amount)) |>
collect()
print (results)
Cloud Storage Integration
Amazon S3
# Open S3 dataset
ds <- open_dataset( "s3://my-bucket/data/" )
# Write to S3
write_dataset(data, "s3://my-bucket/output/" , format = "parquet" )
# Configure credentials
Sys.setenv (
AWS_ACCESS_KEY_ID = "your-key" ,
AWS_SECRET_ACCESS_KEY = "your-secret" ,
AWS_REGION = "us-east-1"
)
Google Cloud Storage
# Open GCS dataset
ds <- open_dataset( "gs://my-bucket/data/" )
# Write to GCS
write_dataset(data, "gs://my-bucket/output/" , format = "parquet" )
Limitations and Considerations
No Transaction Support
Datasets don’t provide ACID guarantees:
Concurrent reads are safe
Concurrent writes may cause corruption
No atomic operations
Killing a write process may leave partial files
Mitigation strategies:
# Use unique basename for each writer
write_dataset(data, "output" , basename_template = "data_{i}.parquet" )
# Write to temporary location, then move
write_dataset(data, "output/temp" )
file.rename ( "output/temp" , "output/final" )
Unsupported Operations
Datasets raise errors for unsupported dplyr operations:
# This fails
ds |>
mutate( complex_calc = some_r_function(column)) |>
collect()
#> Error: Expression some_r_function(column) not supported
# Solution: collect() first
ds |>
select(relevant_columns) |>
filter (reduce_data) |>
collect() |> # Now in R
mutate( complex_calc = some_r_function(column))
Next Steps
Data Wrangling Learn more about dplyr integration
Reading Files Work with individual files
Additional Resources