Skip to main content

Overview

The data ingestion stage is the entry point of the NBA preprocessing pipeline. The DataIngestor class handles loading data from multiple sources (CSV files, paths, or in-memory DataFrames) and provides streaming capabilities for large datasets.

DataIngestor Class

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:19

Initialization

class DataIngestor:
    def __init__(self, random_seed: int = 42)
random_seed
int
default:"42"
Random seed for reproducibility across data loading operations

Core Methods

load()

Loads data from various source types into a pandas DataFrame.
def load(self, source: str | Path | pd.DataFrame) -> pd.DataFrame
source
str | Path | pd.DataFrame
required
Data source - can be a file path (string), Path object, or existing DataFrame
Returns: A copy of the loaded DataFrame Example:
from pipeline.ingestion import DataIngestor

ingestor = DataIngestor(random_seed=42)

# Load from CSV file
df = ingestor.load('data/nba_players.csv')

# Load from Path object
from pathlib import Path
df = ingestor.load(Path('data/nba_players.csv'))

# Load from existing DataFrame
import pandas as pd
existing_df = pd.read_csv('data/nba_players.csv')
df = ingestor.load(existing_df)  # Returns a copy
When loading from a DataFrame, the method returns a copy to prevent unintended mutations of the original data.

stream_chunks()

Streams data in configurable chunk sizes for memory-efficient processing of large datasets.
def stream_chunks(
    self, 
    source: str | Path | pd.DataFrame, 
    chunk_size: int
) -> Iterator[pd.DataFrame]
source
str | Path | pd.DataFrame
required
Data source to stream from
chunk_size
int
required
Number of rows per chunk
Returns: An iterator yielding DataFrame chunks Example:
ingestor = DataIngestor(random_seed=42)

# Stream large dataset in chunks of 1000 rows
for chunk in ingestor.stream_chunks('large_dataset.csv', chunk_size=1000):
    print(f"Processing chunk with {len(chunk)} rows")
    # Process each chunk independently
    process_chunk(chunk)
Data Flow:
  1. DataFrame source: Splits the DataFrame using iloc with the specified chunk size
  2. File source: Uses pandas read_csv with chunksize parameter for efficient streaming
  3. Each chunk is returned as a copy to ensure isolation
Chunks are processed sequentially. Ensure your chunk size balances memory usage with processing efficiency.

fingerprint()

Generates a cryptographic fingerprint of the dataset for versioning and reproducibility tracking.
def fingerprint(
    self, 
    source: str | Path | pd.DataFrame
) -> DatasetFingerprint
source
str | Path | pd.DataFrame
required
Data source to fingerprint
Returns: A DatasetFingerprint object containing:
  • path (str): Source path or ‘<in-memory>’ for DataFrames
  • sha256 (str): SHA-256 hash of the CSV representation
  • rows (int): Number of rows in the dataset
  • columns (int): Number of columns in the dataset
Example:
ingestor = DataIngestor(random_seed=42)
fp = ingestor.fingerprint('data/nba_players.csv')

print(f"Dataset: {fp.path}")
print(f"SHA-256: {fp.sha256}")
print(f"Dimensions: {fp.rows} rows × {fp.columns} columns")
Use Cases:
  • Version Control: Track dataset changes across pipeline runs
  • Reproducibility: Verify that the same input data is used
  • Data Integrity: Detect accidental modifications or corruption

DatasetFingerprint

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:11
@dataclass(frozen=True)
class DatasetFingerprint:
    path: str
    sha256: str
    rows: int
    columns: int
Immutable dataclass that represents a dataset’s unique identity.
path
str
File path or ‘<in-memory>’ for DataFrames
sha256
str
SHA-256 hash of the CSV-encoded dataset
rows
int
Total number of rows
columns
int
Total number of columns

Data Flow

The ingestion stage follows this flow:

Integration with Pipeline

The ingestion stage integrates with the streaming engine: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:36
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        self.ingestor = DataIngestor(config.random_seed)
Batch Processing:
def run_batch(self, source: str | Path | pd.DataFrame) -> dict:
    df = self.ingestor.load(source)
    # Continue with preprocessing...
Streaming Processing:
def run_streaming(self, source: str | Path | pd.DataFrame) -> dict:
    # Stream data in configurable chunks
    for chunk in self._iter_source_chunks(source, chunk_size):
        # Process each chunk through the pipeline
        X_chunk, y_chunk = self._process_stream_chunk(chunk, state)
The ingestion stage is source-agnostic - the same API works for files, paths, and DataFrames.

Performance Considerations

Memory Efficiency

  • Batch mode: Loads entire dataset into memory
  • Streaming mode: Processes data in chunks, keeping only one chunk in memory at a time

When to Use Each Mode

ModeBest ForMemory Usage
Batch (load)Small to medium datasets (<1GB)High
Streaming (stream_chunks)Large datasets or memory-constrained environmentsLow
For datasets larger than available RAM, always use streaming mode with appropriate chunk sizes (typically 1000-10000 rows).

Next Steps

Preprocessing

Clean and transform the ingested data

Streaming Engine

Learn about real-time pipeline execution

Build docs developers (and LLMs) love