Skip to main content

Introduction

Choosing the right data format significantly impacts your ML pipeline performance. This guide covers format comparison, benchmarking, and optimization strategies.

Format Comparison

Different formats offer tradeoffs between speed, size, and compatibility.

Common Formats

CSV

Pros: Human-readable, universal support
Cons: Slow, large file size, no type preservation
Use case: Small datasets, data interchange

Parquet

Pros: Columnar, compressed, fast queries
Cons: Not human-readable
Use case: Large datasets, analytics, production

Feather

Pros: Fast read/write, language-agnostic
Cons: Limited compression
Use case: Intermediate data, caching

HDF5

Pros: Hierarchical, append mode, large arrays
Cons: Complex API, portability issues
Use case: Scientific data, time series

Pandas Format Benchmarking

Performance Comparison

Based on the Python for Scientific Computing guide: Pandas Format Performance Key Findings:
  • Parquet: Best overall performance for read/write
  • Feather: Fastest read times, good write performance
  • HDF5: Good for append operations
  • CSV: Slowest, but most portable

Benchmark Code

Create your own benchmarks:
import time
import pandas as pd
import numpy as np

# Generate sample data
df = pd.DataFrame({
    'id': range(1000000),
    'value': np.random.randn(1000000),
    'category': np.random.choice(['A', 'B', 'C'], 1000000),
    'timestamp': pd.date_range('2020-01-01', periods=1000000, freq='1s')
})

# Benchmark writes
formats = {
    'csv': lambda: df.to_csv('data.csv', index=False),
    'parquet': lambda: df.to_parquet('data.parquet'),
    'feather': lambda: df.to_feather('data.feather'),
    'hdf5': lambda: df.to_hdf('data.h5', key='df', mode='w')
}

for name, write_func in formats.items():
    start = time.time()
    write_func()
    write_time = time.time() - start
    print(f"{name} write: {write_time:.2f}s")

# Benchmark reads
readers = {
    'csv': lambda: pd.read_csv('data.csv'),
    'parquet': lambda: pd.read_parquet('data.parquet'),
    'feather': lambda: pd.read_feather('data.feather'),
    'hdf5': lambda: pd.read_hdf('data.h5', key='df')
}

for name, read_func in readers.items():
    start = time.time()
    df_loaded = read_func()
    read_time = time.time() - start
    print(f"{name} read: {read_time:.2f}s")

Inference Performance

Processing large datasets efficiently requires parallelization.

Single Worker Baseline

processing/inference_example.py
import numpy as np
from sklearn.dummy import DummyClassifier
from tqdm import tqdm

def run_inference(
    model: DummyClassifier, 
    x_test: np.ndarray, 
    batch_size: int = 2048
) -> np.ndarray:
    y_pred = []
    
    for i in tqdm(range(0, x_test.shape[0], batch_size)):
        x_batch = x_test[i : i + batch_size]
        y_batch = model.predict(x_batch)
        y_pred.append(y_batch)
    
    return np.concatenate(y_pred)

ProcessPoolExecutor

Parallelize with multiple processes:
processing/inference_example.py
import concurrent.futures
from concurrent.futures import wait

def run_inference_process_pool(
    model: DummyClassifier, 
    x_test: np.ndarray, 
    max_workers: int = 16
) -> np.ndarray:
    with concurrent.futures.ProcessPoolExecutor(
        max_workers=max_workers
    ) as executor:
        chunk_size = len(x_test) // max_workers
        
        # Split into chunks
        chunks = [
            x_test[i : i + chunk_size]
            for i in range(0, len(x_test), chunk_size)
        ]
        
        # Submit chunks for inference
        futures = [
            executor.submit(run_inference, model=model, x_test=chunk)
            for chunk in chunks
        ]
        
        # Wait and collect results
        wait(futures)
        y_pred = [future.result() for future in futures]
    
    return np.concatenate(y_pred)

Ray for Distributed Processing

processing/inference_example.py
import ray

@ray.remote
def run_inference_ray(
    model: DummyClassifier, 
    x_test: np.ndarray, 
    batch_size: int = 2048
) -> np.ndarray:
    y_pred = []
    for i in range(0, x_test.shape[0], batch_size):
        x_batch = x_test[i : i + batch_size]
        y_batch = model.predict(x_batch)
        y_pred.append(y_batch)
    return np.concatenate(y_pred)

def run_inference_ray_main(
    model: DummyClassifier, 
    x_test: np.ndarray, 
    max_workers: int = 16
) -> np.ndarray:
    chunk_size = len(x_test) // max_workers
    chunks = [
        x_test[i : i + chunk_size]
        for i in range(0, len(x_test), chunk_size)
    ]
    
    # Run inference on chunks
    futures = [
        run_inference_ray.remote(model, chunk) 
        for chunk in chunks
    ]
    
    # Collect results
    y_pred = ray.get(futures)
    return np.concatenate(y_pred)

Benchmark Results

Running experiments with 10M samples:
python processing/inference_example.py run-single-worker --inference-size 10000000
python processing/inference_example.py run-pool --inference-size 10000000
python processing/inference_example.py run-ray --inference-size 10000000

Performance Comparison

ApproachTime (seconds)Speedup
Single worker12.641.0x
ThreadPoolExecutor (16 workers)0.8514.9x
ProcessPoolExecutor (16 workers)4.033.1x
Ray (16 workers)2.195.8x
Key Insights:
  • ThreadPoolExecutor is fastest due to GIL release during I/O
  • ProcessPoolExecutor has overhead from process spawning
  • Ray provides good balance with distributed capabilities
  • Choose based on your workload: I/O-bound vs CPU-bound

Optimization Strategies

Use Parquet or Feather for:
  • Selective column reading
  • Better compression ratios
  • Faster aggregations
  • Type preservation
Enable compression for:
  • Network transfers
  • Storage cost reduction
  • I/O-bound workloads
Common algorithms:
  • Snappy: Fast, moderate compression
  • Gzip: Slower, high compression
  • LZ4: Very fast, light compression
  • Zstd: Balanced speed and ratio
Process data in chunks:
for chunk in pd.read_csv('large.csv', chunksize=10000):
    process(chunk)
Benefits:
  • Lower memory usage
  • Parallelization opportunities
  • Early termination possible
Use memory-mapped arrays for:
  • Very large arrays
  • Random access patterns
  • Shared memory across processes
arr = np.memmap('data.dat', dtype='float32', mode='r', shape=(1000000, 100))

Format Selection Guide

Recommendation: CSV or Feather
  • CSV for shareability
  • Feather for speed
  • Format choice less critical at this scale

Resources

Next Steps

Build docs developers (and LLMs) love