Skip to main content

Overview

The pipeline generates comprehensive artifacts for reproducibility, debugging, and analysis. All outputs are organized under the configured output_dir (default: artifacts_edge or artifacts_server).

Directory Structure

output_dir/
├── reports/
│   ├── pipeline_report.json          # Complete run report
│   ├── streaming_chunks.jsonl        # Per-chunk metrics (streaming mode)
│   └── constraint_experiment_log.jsonl
├── benchmarks/
│   ├── constraint_experiment.csv     # Resource constraint sweep results
│   ├── significance_tests.csv        # Statistical significance tests
│   ├── streaming_chunks.csv          # Chunk metrics (CSV format)
│   ├── latency_vs_data_size.csv
│   ├── throughput_vs_memory.csv
│   ├── resource_vs_accuracy.csv
│   ├── latency_vs_accuracy.png       # Visualization plots
│   ├── memory_vs_accuracy.png
│   └── latency_memory_accuracy.png
├── profiles/
│   └── operator_profile.csv          # Per-operator timing breakdown
├── metadata/
│   └── run_manifest.json             # Reproducibility manifest
└── intermediate/                     # Spilled chunks (if enabled)
    ├── stream_chunk_1_X.csv
    ├── stream_chunk_1_y.csv
    └── ...

Core Reports

pipeline_report.json

Complete execution summary containing all metrics, benchmarks, and quality reports. Location: reports/pipeline_report.json Key sections:
{
  "dataset_fingerprint": {
    "sha256": "abc123...",
    "row_count": 1340,
    "column_count": 20,
    "timestamp": "2026-03-04T10:30:00"
  },
  "reproducibility": {
    "random_seed": 42,
    "python_version": "3.10.0",
    "platform": "Linux-5.15.0",
    "config": { /* full configuration */ },
    "dependencies": {
      "numpy": "1.24.0",
      "pandas": "2.0.0",
      "matplotlib": "3.7.0"
    }
  },
  "batch": { /* batch mode results */ },
  "streaming": { /* streaming mode results */ },
  "benchmark": { /* statistical benchmarks */ },
  "constraint_experiment": { /* constraint sweep */ },
  "quality": { /* data quality metrics */ },
  "scaling": {
    "n_jobs": 4,
    "parallel_enabled": true
  }
}
Use cases:
  • Verify reproducibility via dataset_fingerprint.sha256
  • Compare runs across different configurations
  • Extract model metrics for reporting
  • Debug pipeline failures

streaming_chunks.jsonl

Newline-delimited JSON with detailed metrics for each streaming chunk. Location: reports/streaming_chunks.jsonl Example record:
{
  "chunk_id": 1,
  "rows": 64,
  "latency_s": 0.042,
  "throughput_rows_s": 1523.8,
  "batch_size": 96,
  "chunk_size": 64,
  "memory_before_mb": 45.2,
  "memory_after_mb": 52.8,
  "memory_exceeded": false,
  "retries": 0,
  "spill_paths": {
    "X": "intermediate/stream_chunk_1_X.csv",
    "y": "intermediate/stream_chunk_1_y.csv"
  },
  "operator_profile_s": {
    "preprocess_s": 0.008,
    "feature_engineering_s": 0.021,
    "feature_selection_s": 0.006,
    "encode_scale_s": 0.007
  },
  "input_bytes": 8192,
  "estimated_input_bandwidth_mb_s": 185.6
}
Use cases:
  • Identify memory bottlenecks (look for memory_exceeded: true)
  • Analyze adaptive resizing behavior (check retries)
  • Profile per-chunk performance
  • Validate spill-to-disk strategy

run_manifest.json

Reproducibility manifest for validating identical runs. Location: metadata/run_manifest.json Contents: Same as reproducibility section in pipeline_report.json Use case: Quickly verify configuration without parsing full report
# Compare two runs
diff artifacts_1/metadata/run_manifest.json \
     artifacts_2/metadata/run_manifest.json

Benchmark Artifacts

constraint_experiment.csv

Results from sweeping resource constraints (chunk size, memory, compute). Location: benchmarks/constraint_experiment.csv Columns:
ColumnTypeDescription
chunk_sizeintRows per chunk
memory_limit_mbintMaximum memory constraint
compute_limitfloatCPU utilization cap (0.0-1.0)
preprocessing_latency_sfloatTotal preprocessing time
peak_memory_mbfloatPeak memory usage
training_time_sfloatModel training time
model_accuracy_r2floatModel R² score
model_rmsefloatModel RMSE
Generated by: engine.py:389-413 (run_constraint_experiment) Example analysis:
import pandas as pd

df = pd.read_csv('benchmarks/constraint_experiment.csv')

# Find best accuracy configuration
best = df.loc[df['model_accuracy_r2'].idxmax()]
print(f"Best R²: {best['model_accuracy_r2']:.3f}")
print(f"Configuration: chunk={best['chunk_size']}, memory={best['memory_limit_mb']}MB")

# Find lowest latency configuration
fastest = df.loc[df['preprocessing_latency_s'].idxmin()]
print(f"Fastest: {fastest['preprocessing_latency_s']:.3f}s")

significance_tests.csv

Statistical significance tests comparing batch vs. streaming modes. Location: benchmarks/significance_tests.csv Columns:
ColumnTypeDescription
latency_pvaluefloatPermutation test p-value for latency
throughput_pvaluefloatPermutation test p-value for throughput
latency_mean_delta_sfloatMean latency difference (streaming - batch)
throughput_mean_delta_rows_sfloatMean throughput difference
Interpretation:
  • p-value < 0.05: Statistically significant difference
  • p-value >= 0.05: No significant difference (modes are comparable)
Generated by: engine.py:134-147 (_permutation_pvalue)

Visualization Plots

Three PNG plots visualizing trade-offs between resources and accuracy.

latency_vs_accuracy.png

X-axis: Preprocessing latency (seconds)
Y-axis: Model accuracy (R²)
Color: Compute constraint (viridis colormap)
Use case: Identify configurations with optimal latency-accuracy trade-off

memory_vs_accuracy.png

X-axis: Peak memory usage (MB)
Y-axis: Model accuracy (R²)
Color: Memory limit (plasma colormap)
Use case: Select memory-efficient configurations without sacrificing accuracy

latency_memory_accuracy.png

X-axis: Peak memory (MB)
Y-axis: Latency (seconds)
Color: Model accuracy (coolwarm colormap)
Use case: Three-way trade-off analysis for deployment decisions Generated by: engine.py:511-555 (_plot_experiment_results)

Profiling Artifacts

operator_profile.csv

Per-operator timing breakdown for each streaming chunk. Location: profiles/operator_profile.csv Columns:
ColumnTypeDescription
chunk_idintChunk identifier
preprocess_sfloatData cleaning time
feature_engineering_sfloatFeature derivation time
feature_selection_sfloatMulticollinearity removal time
encode_scale_sfloatEncoding and scaling time
estimated_input_bandwidth_mb_sfloatInput I/O bandwidth
input_bytesintRaw input size
Generated by: engine.py:481-492 (extracted from chunk_metrics) Example analysis:
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv('profiles/operator_profile.csv')

# Plot operator timing distribution
operators = ['preprocess_s', 'feature_engineering_s', 'feature_selection_s', 'encode_scale_s']
df[operators].boxplot()
plt.ylabel('Time (seconds)')
plt.title('Operator Timing Distribution')
plt.savefig('operator_timing_boxplot.png')
Use cases:
  • Identify bottleneck operators
  • Validate operator-level optimizations
  • Estimate per-stage latency for SLA planning

Intermediate Artifacts

Spilled Chunks

When spill_to_disk: true, intermediate feature matrices are persisted to CSV. Location: intermediate/ Files:
  • stream_chunk_{id}_X.csv: Feature matrix (X)
  • stream_chunk_{id}_y.csv: Target variable (y)
Format:
  • X: CSV with feature columns (encoded, scaled)
  • y: Single-column CSV with header salary
Generated by: engine.py:260-266
if self.config.spill_to_disk:
    x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
    y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
    X_chunk.to_csv(x_path, index=False)
    y_chunk.to_frame('salary').to_csv(y_path, index=False)
Use cases:
  • Resume interrupted runs (manual reassembly)
  • Debug feature engineering issues
  • Validate encoding consistency across chunks
Spilled chunks can accumulate quickly. A 1340-row dataset with chunk_size: 64 generates ~21 chunk pairs (~5-10 MB total). Clean intermediate/ directory periodically.

Artifact Reference Table

ArtifactFormatSize (typical)PurposeGenerated by
pipeline_report.jsonJSON50-200 KBComplete run summaryengine.py:473-476
streaming_chunks.jsonlJSONL10-50 KBPer-chunk metricsengine.py:493-495
run_manifest.jsonJSON1-5 KBReproducibility checkengine.py:478-479
constraint_experiment.csvCSV1-5 KBResource sweep resultsengine.py:502-503
significance_tests.csvCSV<1 KBStatistical testsengine.py:500
operator_profile.csvCSV5-20 KBOperator timingengine.py:481-492
latency_vs_accuracy.pngPNG50-100 KBLatency trade-off plotengine.py:512-525
memory_vs_accuracy.pngPNG50-100 KBMemory trade-off plotengine.py:527-540
latency_memory_accuracy.pngPNG50-100 KBThree-way trade-off plotengine.py:542-555
stream_chunk_*_X.csvCSV1-5 KB eachSpilled featuresengine.py:264
stream_chunk_*_y.csvCSV<1 KB eachSpilled targetsengine.py:265

Artifact Lifecycle

Creation

All artifacts are written at the end of run_all() execution:
# From engine.py:454
self._write_artifacts(report)

Retention

Recommendations:
  • Keep indefinitely:
    • pipeline_report.json (small, high value)
    • run_manifest.json (reproducibility)
    • constraint_experiment.csv (comparative analysis)
  • Archive after 30 days:
    • streaming_chunks.jsonl (detailed, but large)
    • operator_profile.csv (profiling data)
    • Visualization PNGs (regenerable from CSVs)
  • Delete after run:
    • intermediate/ directory (temporary spill files)

Cleanup

# Clean intermediate files only
rm -rf artifacts_*/intermediate/

# Archive old runs
tar -czf archive_$(date +%Y%m%d).tar.gz artifacts_*
rm -rf artifacts_*

# Selective cleanup (keep reports, remove profiles)
find artifacts_* -name "operator_profile.csv" -delete
find artifacts_* -name "streaming_chunks.jsonl" -delete

Working with Artifacts

Python Analysis

import json
import pandas as pd

# Load main report
with open('artifacts_server/reports/pipeline_report.json') as f:
    report = json.load(f)

print(f"Dataset: {report['dataset_fingerprint']['row_count']} rows")
print(f"Batch R²: {report['batch']['model']['r2']:.3f}")
print(f"Streaming R²: {report['streaming']['model']['r2']:.3f}")

# Load constraint experiments
exp = pd.read_csv('artifacts_server/benchmarks/constraint_experiment.csv')
print(exp.describe())

# Load streaming chunks
chunks = pd.read_csv('artifacts_server/benchmarks/streaming_chunks.csv')
print(f"Total chunks: {len(chunks)}")
print(f"Memory exceeded: {chunks['memory_exceeded'].sum()} times")

Command-Line Analysis

# Extract key metrics
jq '.batch.model.r2, .streaming.model.r2' \
  artifacts_server/reports/pipeline_report.json

# Check reproducibility
jq '.reproducibility.random_seed, .dataset_fingerprint.sha256' \
  artifacts_server/metadata/run_manifest.json

# Summarize constraint experiments
csv-stat artifacts_server/benchmarks/constraint_experiment.csv

# Count memory exceeded events
grep -c '"memory_exceeded": true' \
  artifacts_server/reports/streaming_chunks.jsonl

Best Practices

Avoid overwriting artifacts from previous runs:
python run_pipeline.py \
  --input data.csv \
  --output-dir artifacts_$(date +%Y%m%d_%H%M%S)
Ensure reproducibility by comparing SHA-256 hashes:
jq '.dataset_fingerprint.sha256' artifacts_*/reports/pipeline_report.json | sort -u
# Should output single hash if runs are identical
Configure external storage for long-term retention:
aws s3 sync artifacts_server/ s3://my-bucket/nba-pipeline/$(date +%Y%m%d)/
If spill_to_disk is enabled, clean up after successful runs:
python run_pipeline.py --config configs/pipeline.edge.template.json
rm -rf artifacts_edge/intermediate/

Build docs developers (and LLMs) love