Overview
The DataIngestor class handles data loading from multiple sources (CSV files, paths, or DataFrames) and provides dataset fingerprinting for reproducibility tracking.
Class Definition
class DataIngestor:
def __init__(self, random_seed: int = 42)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:19
Constructor
Random seed for reproducible operations (currently unused but reserved for future randomized sampling)
Methods
load
def load(self, source: str | Path | pd.DataFrame) -> pd.DataFrame
Loads data from various source types into a pandas DataFrame.
source
str | Path | pd.DataFrame
required
Data source to load:
str or Path: Path to CSV file
pd.DataFrame: Returns a copy of the DataFrame
Loaded DataFrame with all columns and rows from the source
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
from pathlib import Path
ingestor = DataIngestor(random_seed=42)
# Load from CSV file (string path)
df1 = ingestor.load('data/nba_salaries.csv')
print(f"Loaded {len(df1)} rows from CSV")
# Load from Path object
df2 = ingestor.load(Path('data/nba_salaries.csv'))
print(f"Loaded {len(df2)} rows from Path")
# Load from existing DataFrame (returns copy)
existing_df = pd.DataFrame({'salary': [1000000, 2000000]})
df3 = ingestor.load(existing_df)
print(f"Loaded {len(df3)} rows from DataFrame")
assert df3 is not existing_df # Confirms it's a copy
stream_chunks
def stream_chunks(
self,
source: str | Path | pd.DataFrame,
chunk_size: int
) -> Iterator[pd.DataFrame]
Creates an iterator that yields data in chunks for streaming processing.
source
str | Path | pd.DataFrame
required
Data source to stream:
str or Path: Path to CSV file (uses pandas chunksize)
pd.DataFrame: Splits DataFrame into chunks
Iterator yielding DataFrame chunks, each containing up to chunk_size rows
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
ingestor = DataIngestor()
# Stream from CSV file
for i, chunk in enumerate(ingestor.stream_chunks('nba_data.csv', chunk_size=100)):
print(f"Chunk {i}: {len(chunk)} rows")
# Process chunk...
# Stream from DataFrame
df = pd.DataFrame({'salary': range(1000)})
chunks = list(ingestor.stream_chunks(df, chunk_size=250))
print(f"Split into {len(chunks)} chunks") # 4 chunks
print(f"Last chunk size: {len(chunks[-1])}") # 250 rows
fingerprint
def fingerprint(self, source: str | Path | pd.DataFrame) -> DatasetFingerprint
Generates a cryptographic fingerprint of the dataset for reproducibility verification.
source
str | Path | pd.DataFrame
required
Data source to fingerprint
Dataclass containing:
path (str): Source path or “<in-memory>” for DataFrames
sha256 (str): SHA-256 hash of CSV representation
rows (int): Number of rows
columns (int): Number of columns
Example:
from pipeline.ingestion import DataIngestor
import pandas as pd
ingestor = DataIngestor()
# Fingerprint CSV file
fp1 = ingestor.fingerprint('nba_salaries.csv')
print(f"Path: {fp1.path}")
print(f"SHA256: {fp1.sha256}")
print(f"Shape: {fp1.rows} rows × {fp1.columns} columns")
# Fingerprint DataFrame
df = pd.DataFrame({
'name': ['Player A', 'Player B'],
'salary': [1000000, 2000000]
})
fp2 = ingestor.fingerprint(df)
print(f"Path: {fp2.path}") # <in-memory>
print(f"SHA256: {fp2.sha256}")
print(f"Shape: {fp2.rows}x{fp2.columns}") # 2x2
# Verify data integrity
fp3 = ingestor.fingerprint('nba_salaries.csv')
if fp1.sha256 == fp3.sha256:
print("✓ Dataset unchanged")
else:
print("✗ Dataset modified!")
DatasetFingerprint
@dataclass(frozen=True)
class DatasetFingerprint:
path: str
sha256: str
rows: int
columns: int
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/ingestion/loader.py:12
Immutable dataclass representing a dataset’s unique fingerprint.
Source path or “<in-memory>” for DataFrames
SHA-256 cryptographic hash of the CSV representation
Number of rows in the dataset
Number of columns in the dataset
Example:
from dataclasses import asdict
from pipeline.ingestion import DataIngestor
import json
ingestor = DataIngestor()
fp = ingestor.fingerprint('nba_data.csv')
# Convert to dict for serialization
fp_dict = asdict(fp)
print(json.dumps(fp_dict, indent=2))
# {
# "path": "nba_data.csv",
# "sha256": "a1b2c3d4...",
# "rows": 500,
# "columns": 15
# }
# Access fields
print(f"Dataset has {fp.rows:,} rows")
print(f"Fingerprint: {fp.sha256[:16]}...") # First 16 chars
Usage Patterns
Batch Loading
from pipeline.ingestion import DataIngestor
from pipeline.config import PipelineConfig
config = PipelineConfig()
ingestor = DataIngestor(config.random_seed)
# Load entire dataset
df = ingestor.load('large_dataset.csv')
print(f"Loaded {len(df):,} rows into memory")
# Generate fingerprint for provenance
fp = ingestor.fingerprint(df)
print(f"Dataset fingerprint: {fp.sha256}")
Streaming Processing
from pipeline.ingestion import DataIngestor
ingestor = DataIngestor()
processed_rows = 0
for chunk in ingestor.stream_chunks('huge_dataset.csv', chunk_size=1000):
# Process each chunk independently
processed = process_chunk(chunk) # Your processing function
processed_rows += len(processed)
print(f"Processed {processed_rows:,} rows so far...")
print(f"Total processed: {processed_rows:,} rows")
Data Verification
from pipeline.ingestion import DataIngestor
import json
from pathlib import Path
ingestor = DataIngestor()
# Save fingerprint for later verification
fp_original = ingestor.fingerprint('dataset_v1.csv')
metadata = {
'fingerprint': asdict(fp_original),
'version': '1.0'
}
with open('metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
# Later: verify dataset hasn't changed
with open('metadata.json') as f:
saved_metadata = json.load(f)
fp_current = ingestor.fingerprint('dataset_v1.csv')
if fp_current.sha256 == saved_metadata['fingerprint']['sha256']:
print("✓ Dataset integrity verified")
else:
print("✗ Warning: Dataset has been modified!")
print(f" Expected: {saved_metadata['fingerprint']['sha256']}")
print(f" Got: {fp_current.sha256}")
Notes
- All methods return copies of DataFrames to prevent unintended mutations
- The
load() method uses pandas’ default CSV parsing with no special options
- Fingerprinting converts the DataFrame to CSV format before hashing for consistency
- The SHA-256 hash ensures cryptographic-strength verification of data integrity
stream_chunks() returns an iterator for memory-efficient processing of large files
- Chunk copies are created to prevent modifications from affecting subsequent chunks