Skip to main content

Overview

The DataIngestor class handles data loading from multiple sources (CSV files, paths, or DataFrames) and provides dataset fingerprinting for reproducibility tracking.

Class Definition

class DataIngestor:
    def __init__(self, random_seed: int = 42)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:19

Constructor

random_seed
int
default:"42"
Random seed for reproducible operations (currently unused but reserved for future randomized sampling)

Methods

load

def load(self, source: str | Path | pd.DataFrame) -> pd.DataFrame
Loads data from various source types into a pandas DataFrame.
source
str | Path | pd.DataFrame
required
Data source to load:
  • str or Path: Path to CSV file
  • pd.DataFrame: Returns a copy of the DataFrame
return
pd.DataFrame
Loaded DataFrame with all columns and rows from the source
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
from pathlib import Path

ingestor = DataIngestor(random_seed=42)

# Load from CSV file (string path)
df1 = ingestor.load('data/nba_salaries.csv')
print(f"Loaded {len(df1)} rows from CSV")

# Load from Path object
df2 = ingestor.load(Path('data/nba_salaries.csv'))
print(f"Loaded {len(df2)} rows from Path")

# Load from existing DataFrame (returns copy)
existing_df = pd.DataFrame({'salary': [1000000, 2000000]})
df3 = ingestor.load(existing_df)
print(f"Loaded {len(df3)} rows from DataFrame")
assert df3 is not existing_df  # Confirms it's a copy

stream_chunks

def stream_chunks(
    self, 
    source: str | Path | pd.DataFrame, 
    chunk_size: int
) -> Iterator[pd.DataFrame]
Creates an iterator that yields data in chunks for streaming processing.
source
str | Path | pd.DataFrame
required
Data source to stream:
  • str or Path: Path to CSV file (uses pandas chunksize)
  • pd.DataFrame: Splits DataFrame into chunks
chunk_size
int
required
Number of rows per chunk
return
Iterator[pd.DataFrame]
Iterator yielding DataFrame chunks, each containing up to chunk_size rows
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd

ingestor = DataIngestor()

# Stream from CSV file
for i, chunk in enumerate(ingestor.stream_chunks('nba_data.csv', chunk_size=100)):
    print(f"Chunk {i}: {len(chunk)} rows")
    # Process chunk...

# Stream from DataFrame
df = pd.DataFrame({'salary': range(1000)})
chunks = list(ingestor.stream_chunks(df, chunk_size=250))
print(f"Split into {len(chunks)} chunks")  # 4 chunks
print(f"Last chunk size: {len(chunks[-1])}")  # 250 rows

fingerprint

def fingerprint(self, source: str | Path | pd.DataFrame) -> DatasetFingerprint
Generates a cryptographic fingerprint of the dataset for reproducibility verification.
source
str | Path | pd.DataFrame
required
Data source to fingerprint
return
DatasetFingerprint
Dataclass containing:
  • path (str): Source path or “<in-memory>” for DataFrames
  • sha256 (str): SHA-256 hash of CSV representation
  • rows (int): Number of rows
  • columns (int): Number of columns
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd

ingestor = DataIngestor()

# Fingerprint CSV file
fp1 = ingestor.fingerprint('nba_salaries.csv')
print(f"Path: {fp1.path}")
print(f"SHA256: {fp1.sha256}")
print(f"Shape: {fp1.rows} rows × {fp1.columns} columns")

# Fingerprint DataFrame
df = pd.DataFrame({
    'name': ['Player A', 'Player B'],
    'salary': [1000000, 2000000]
})
fp2 = ingestor.fingerprint(df)
print(f"Path: {fp2.path}")  # &lt;in-memory&gt;
print(f"SHA256: {fp2.sha256}")
print(f"Shape: {fp2.rows}x{fp2.columns}")  # 2x2

# Verify data integrity
fp3 = ingestor.fingerprint('nba_salaries.csv')
if fp1.sha256 == fp3.sha256:
    print("✓ Dataset unchanged")
else:
    print("✗ Dataset modified!")

DatasetFingerprint

@dataclass(frozen=True)
class DatasetFingerprint:
    path: str
    sha256: str
    rows: int
    columns: int
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:12 Immutable dataclass representing a dataset’s unique fingerprint.
path
str
Source path or “<in-memory>” for DataFrames
sha256
str
SHA-256 cryptographic hash of the CSV representation
rows
int
Number of rows in the dataset
columns
int
Number of columns in the dataset
Example:
from dataclasses import asdict
from pipeline.ingestion import DataIngestor
import json

ingestor = DataIngestor()
fp = ingestor.fingerprint('nba_data.csv')

# Convert to dict for serialization
fp_dict = asdict(fp)
print(json.dumps(fp_dict, indent=2))
# {
#   "path": "nba_data.csv",
#   "sha256": "a1b2c3d4...",
#   "rows": 500,
#   "columns": 15
# }

# Access fields
print(f"Dataset has {fp.rows:,} rows")
print(f"Fingerprint: {fp.sha256[:16]}...")  # First 16 chars

Usage Patterns

Batch Loading

from pipeline.ingestion import DataIngestor
from pipeline.config import PipelineConfig

config = PipelineConfig()
ingestor = DataIngestor(config.random_seed)

# Load entire dataset
df = ingestor.load('large_dataset.csv')
print(f"Loaded {len(df):,} rows into memory")

# Generate fingerprint for provenance
fp = ingestor.fingerprint(df)
print(f"Dataset fingerprint: {fp.sha256}")

Streaming Processing

from pipeline.ingestion import DataIngestor

ingestor = DataIngestor()
processed_rows = 0

for chunk in ingestor.stream_chunks('huge_dataset.csv', chunk_size=1000):
    # Process each chunk independently
    processed = process_chunk(chunk)  # Your processing function
    processed_rows += len(processed)
    print(f"Processed {processed_rows:,} rows so far...")

print(f"Total processed: {processed_rows:,} rows")

Data Verification

from pipeline.ingestion import DataIngestor
import json
from pathlib import Path

ingestor = DataIngestor()

# Save fingerprint for later verification
fp_original = ingestor.fingerprint('dataset_v1.csv')
metadata = {
    'fingerprint': asdict(fp_original),
    'version': '1.0'
}

with open('metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

# Later: verify dataset hasn't changed
with open('metadata.json') as f:
    saved_metadata = json.load(f)

fp_current = ingestor.fingerprint('dataset_v1.csv')
if fp_current.sha256 == saved_metadata['fingerprint']['sha256']:
    print("✓ Dataset integrity verified")
else:
    print("✗ Warning: Dataset has been modified!")
    print(f"  Expected: {saved_metadata['fingerprint']['sha256']}")
    print(f"  Got: {fp_current.sha256}")

Notes

  • All methods return copies of DataFrames to prevent unintended mutations
  • The load() method uses pandas’ default CSV parsing with no special options
  • Fingerprinting converts the DataFrame to CSV format before hashing for consistency
  • The SHA-256 hash ensures cryptographic-strength verification of data integrity
  • stream_chunks() returns an iterator for memory-efficient processing of large files
  • Chunk copies are created to prevent modifications from affecting subsequent chunks

Build docs developers (and LLMs) love