Skip to main content
Arrow Datasets enable analysis of multi-file and larger-than-memory data using familiar dplyr syntax. The Dataset interface provides lazy evaluation, automatic partitioning, and efficient query execution without loading all data into memory.

What are Datasets?

A Dataset is Arrow’s abstraction for working with data stored in multiple files or data that’s too large to fit in memory. Unlike Tables that hold data in memory, Datasets store metadata about where the data is located and only load necessary portions when computing results.

Example: NYC Taxi Data

The NYC taxi trip dataset is a common example for demonstrating large-scale data processing:
  • Full dataset: ~1.7 billion rows, 24 columns, ~70GB
  • Structure: 158 Parquet files (one per month from 2009-2022)
  • File size: 400-500MB each
  • Available: S3 and Google Cloud Storage
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

# Connect to cloud storage
bucket <- s3_bucket("arrow-datasets/nyc-taxi")

# Or use the tiny version (1/1000th of data, ~70MB)
tiny_bucket <- s3_bucket("arrow-datasets/nyc-taxi-tiny")

# Copy to local filesystem
copy_files(from = tiny_bucket, to = "nyc-taxi")

Opening Datasets

Create a Dataset object that points to your data directory:
ds <- open_dataset("nyc-taxi")
ds
#> FileSystemDataset with 158 Parquet files
#> vendor_name: string
#> pickup_datetime: timestamp[ms]
#> dropoff_datetime: timestamp[ms]
#> passenger_count: int64
#> trip_distance: double
#> ...
#> year: int32
#> month: int32
When opening a Dataset, Arrow:
  1. Scans the directory to find relevant files
  2. Parses file paths for Hive-style partitioning
  3. Reads file headers to construct a schema
  4. Does not load actual data values
Opening a Dataset is fast regardless of data size because only metadata is read, not the data itself.

File Formats

Datasets support multiple file formats:
# Parquet (default)
ds <- open_dataset("data/parquet")

# CSV files
ds <- open_csv_dataset("data/csv")

# TSV files
ds <- open_tsv_dataset("data/tsv")

# Feather/Arrow IPC files
ds <- open_dataset("data/arrow", format = "feather")

# Custom delimited files
ds <- open_delim_dataset("data/txt", delim = "|")

CSV Parsing Options

ds <- open_csv_dataset(
  "data/csv",
  delim = ",",
  quote = "\"",
  skip = 1,
  col_names = c("col1", "col2", "col3"),
  schema = schema(col1 = utf8(), col2 = int32(), col3 = float64())
)

Partitioning

Partitioning splits data into multiple files based on column values, enabling:
  • Faster queries (skip irrelevant files)
  • Parallel processing
  • Organized storage

Hive-Style Partitioning

Hive-style partitioning uses key=value directory names:
nyc-taxi/
  year=2009/
    month=1/part-0.parquet
    month=2/part-0.parquet
  year=2010/
    month=1/part-0.parquet
# Automatically detects partitioning
ds <- open_dataset("nyc-taxi")

# Partition columns appear in the schema
ds$schema
#> ...
#> year: int32
#> month: int32

Directory Partitioning

For non-self-describing paths, specify partition column names:
nyc-taxi/
  2009/01/part-0.parquet
  2009/02/part-0.parquet
  2010/01/part-0.parquet
# Specify partition columns
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

Custom Partition Schema

Control data types of partition columns:
ds <- open_dataset(
  "nyc-taxi",
  partitioning = schema(year = int16(), month = utf8())
)

Querying Datasets

Query Datasets with dplyr syntax:
result <- ds |>
  filter(total_amount > 100, year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = 100 * tip_amount / total_amount) |>
  group_by(passenger_count) |>
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) |>
  arrange(desc(median_tip_pct)) |>
  collect()

result
#> # A tibble: 10 × 3
#>    passenger_count median_tip_pct      n
#>              <int>          <dbl>  <int>
#>  1               1           16.6 143087
#>  2               5           16.7   5806
#>  3               6           16.7   3338

Why Is It Fast?

  1. Lazy Evaluation: Query is optimized before execution
  2. Predicate Pushdown: Filters applied during file reading
  3. Partition Pruning: Entire files are skipped based on partitions
  4. Column Projection: Only requested columns are read
  5. Parallel Processing: Multiple files processed simultaneously

Query Without collect()

Inspect the query without executing:
query <- ds |>
  filter(total_amount > 100, year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = 100 * tip_amount / total_amount)

query
#> FileSystemDataset (query)
#> tip_amount: double
#> total_amount: double
#> passenger_count: int64
#> tip_pct: double
#>
#> See $.data for the source Arrow object
This returns instantly and shows the schema of the eventual result.

Writing Datasets

Transform and repartition data efficiently:

Basic Writing

# Write dataset from data frame
write_dataset(starwars, "output/starwars")

# Write dataset from existing dataset
ds <- open_dataset("input/csv", format = "csv")
write_dataset(ds, "output/parquet", format = "parquet")

Partitioned Writing

Partition data while writing:
# Create sample data
set.seed(1234)
data <- data.frame(
  date = seq.Date(as.Date("2024-01-01"), as.Date("2024-12-31"), by = "day"),
  value = rnorm(365),
  category = sample(c("A", "B", "C"), 365, replace = TRUE)
)

# Partition by category
data |>
  group_by(category) |>
  write_dataset("output/data", format = "parquet")

list.files("output/data", recursive = TRUE)
#> [1] "category=A/part-0.parquet"
#> [2] "category=B/part-0.parquet"
#> [3] "category=C/part-0.parquet"

Multi-Column Partitioning

data |>
  mutate(year = year(date), month = month(date)) |>
  group_by(year, month, category) |>
  write_dataset("output/data")

# Creates hierarchy: year=2024/month=1/category=A/part-0.parquet

Non-Hive Partitioning

Use bare values instead of key=value:
data |>
  group_by(category) |>
  write_dataset("output/data", hive_style = FALSE)

list.files("output/data", recursive = TRUE)
#> [1] "A/part-0.parquet"
#> [2] "B/part-0.parquet"
#> [3] "C/part-0.parquet"

Format Conversion with Filtering

# Convert CSV to Parquet, keeping only specific data
open_csv_dataset("input/data.csv") |>
  filter(year >= 2020, !is.na(value)) |>
  select(-vendor_id) |>  # Drop unnecessary columns
  group_by(year, month) |>
  write_dataset("output/parquet", format = "parquet")

Batch Processing (Experimental)

Process datasets in batches when you need to apply R functions:
# Sample a large dataset
sampled <- ds |>
  filter(year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) |>
  mutate(tip_pct = tip_amount / total_amount) |>
  collect()

str(sampled)
#> tibble [10,918 × 4]

Batch Aggregation Example

# Fit model on sample
model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled)

# Compute MSE on full dataset using batches
mse <- ds |>
  filter(year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = tip_amount / total_amount) |>
  map_batches(function(batch) {
    batch |>
      as.data.frame() |>
      mutate(pred = predict(model, newdata = .)) |>
      filter(!is.nan(tip_pct)) |>
      summarize(
        sse_partial = sum((pred - tip_pct)^2),
        n_partial = n()
      ) |>
      as_record_batch()
  }) |>
  summarize(mse = sum(sse_partial) / sum(n_partial)) |>
  pull(mse)

mse
#> [1] 0.1304284
map_batches() is experimental and not recommended for production use. It processes data in chunks, which can be memory-intensive.

Advanced Dataset Options

Multiple File Paths

Open specific files instead of directories:
files <- c(
  "data/file1.parquet",
  "data/file2.parquet",
  "data/file3.parquet"
)

ds <- open_dataset(files)

Explicit Schema

Force specific column types:
ds <- open_dataset(
  "data",
  schema = schema(
    id = int64(),
    name = utf8(),
    value = float64(),
    timestamp = timestamp(unit = "ms", timezone = "UTC")
  )
)
Useful when:
  • Files have inconsistent types (e.g., int32 vs int64)
  • You want to ensure specific types
  • Reading data with non-standard types

Multi-Source Datasets

Combine data from different locations:
# Different directories
ds1 <- open_dataset("data/2023")
ds2 <- open_dataset("data/2024")
ds_combined <- c(ds1, ds2)

# Different formats
ds_parquet <- open_dataset("data/parquet")
ds_csv <- open_csv_dataset("data/csv")
ds_all <- c(ds_parquet, ds_csv)

# Cloud and local
ds_s3 <- open_dataset(s3_bucket("my-bucket/data"))
ds_local <- open_dataset("local/data")
ds_hybrid <- c(ds_s3, ds_local)

Partitioning Performance

Partitioning has both benefits and costs:

Benefits

  • Faster queries: Skip entire files based on filters
  • Parallel processing: Process multiple files simultaneously
  • Organized storage: Logical data organization

Costs

  • More files: Each file has metadata overhead
  • Directory traversal: More directories to scan
  • Small files: Can be inefficient

Best Practices

File Size Guidelines:
  • Avoid files smaller than 20MB
  • Avoid files larger than 2GB
  • Target 100MB-500MB per file
Partition Guidelines:
  • Avoid more than 10,000 distinct partitions
  • Partition on commonly filtered columns
  • Consider cardinality (unique values)

Good Partitioning Example

# Good: Date-based partitioning with reasonable cardinality
data |>
  mutate(year = year(date), month = month(date)) |>
  group_by(year, month) |>  # ~12 partitions per year
  write_dataset("output/data")

Bad Partitioning Example

# Bad: Too fine partitioning
data |>
  mutate(date_str = as.character(date)) |>
  group_by(date_str, hour, user_id) |>  # Could create millions of files!
  write_dataset("output/data")

Real-World Example: ETL Pipeline

Complete data processing pipeline:
library(arrow)
library(dplyr)

# 1. Open raw CSV data
raw_ds <- open_csv_dataset("raw_data/csv/")

# 2. Clean and transform
processed_ds <- raw_ds |>
  filter(
    !is.na(amount),
    amount > 0,
    date >= as.Date("2020-01-01")
  ) |>
  mutate(
    year = year(date),
    month = month(date),
    log_amount = log1p(amount)
  ) |>
  select(-internal_id, -temp_field)  # Drop unnecessary columns

# 3. Write partitioned Parquet files
processed_ds |>
  group_by(year, month) |>
  write_dataset(
    "processed_data/parquet",
    format = "parquet",
    compression = "zstd"
  )

# 4. Query the processed data
results <- open_dataset("processed_data/parquet") |>
  filter(year == 2023) |>
  group_by(category) |>
  summarize(
    total_amount = sum(amount),
    avg_amount = mean(amount),
    n = n()
  ) |>
  arrange(desc(total_amount)) |>
  collect()

print(results)

Cloud Storage Integration

Amazon S3

# Open S3 dataset
ds <- open_dataset("s3://my-bucket/data/")

# Write to S3
write_dataset(data, "s3://my-bucket/output/", format = "parquet")

# Configure credentials
Sys.setenv(
  AWS_ACCESS_KEY_ID = "your-key",
  AWS_SECRET_ACCESS_KEY = "your-secret",
  AWS_REGION = "us-east-1"
)

Google Cloud Storage

# Open GCS dataset
ds <- open_dataset("gs://my-bucket/data/")

# Write to GCS
write_dataset(data, "gs://my-bucket/output/", format = "parquet")

Limitations and Considerations

No Transaction Support

Datasets don’t provide ACID guarantees:
  • Concurrent reads are safe
  • Concurrent writes may cause corruption
  • No atomic operations
  • Killing a write process may leave partial files
Mitigation strategies:
# Use unique basename for each writer
write_dataset(data, "output", basename_template = "data_{i}.parquet")

# Write to temporary location, then move
write_dataset(data, "output/temp")
file.rename("output/temp", "output/final")

Unsupported Operations

Datasets raise errors for unsupported dplyr operations:
# This fails
ds |>
  mutate(complex_calc = some_r_function(column)) |>
  collect()
#> Error: Expression some_r_function(column) not supported

# Solution: collect() first
ds |>
  select(relevant_columns) |>
  filter(reduce_data) |>
  collect() |>  # Now in R
  mutate(complex_calc = some_r_function(column))

Next Steps

Data Wrangling

Learn more about dplyr integration

Reading Files

Work with individual files

Additional Resources

Build docs developers (and LLMs) love