Overview
The run_full_pipeline.py script is the master orchestrator that executes the entire ChartsMaze EDL (Extract, Data, Load) pipeline. It runs 20+ scripts in strict sequential order to produce the final compressed output: all_stocks_fundamental_analysis.json.gz.
Location: run_full_pipeline.py
Pipeline Architecture
The pipeline follows a strict 6-phase execution model:
Configuration Options
Controls OHLCV historical data fetching:
True: Always fetch (incremental update: 2-5 min if data exists, ~30 min first time)
False: Skip entirely (ADR, RVOL, ATH, % from ATH fields will be 0)
Controls standalone data fetching:
True: Fetch indices and ETF data
False: Skip optional data (faster execution)
Auto-delete intermediate files after successful pipeline completion:
True: Only keeps all_stocks_fundamental_analysis.json.gz + ohlcv_data/
False: Retains all intermediate JSON files for debugging
Execution Flow
Phase 1: Core Data (Foundation)
Critical Dependency : fetch_dhan_data.py MUST succeed. It produces master_isin_map.json which ALL other scripts depend on. Pipeline aborts if this fails.
# Phase 1 Scripts
results[ "fetch_dhan_data.py" ] = run_script( "fetch_dhan_data.py" , "Phase 1" )
if not results[ "fetch_dhan_data.py" ]:
print ( "🛑 CRITICAL: fetch_dhan_data.py failed. Cannot continue." )
return
results[ "fetch_fundamental_data.py" ] = run_script( "fetch_fundamental_data.py" , "Phase 1" )
Outputs:
dhan_data_response.json - Raw market data for ~5000 stocks
master_isin_map.json - Symbol → ISIN → Name → Sid → FnoFlag mapping
fundamental_data.json - Fundamental metrics (P&L, balance sheet, ratios)
nse_equity_list.csv - Listing dates from NSE
Phase 2: Data Enrichment (Parallel Fetching)
All Phase 2 scripts depend on master_isin_map.json but are independent of each other:
phase2_scripts = [
"fetch_company_filings.py" , # Company filings (quarterly results, announcements)
"fetch_new_announcements.py" , # Live announcements feed
"fetch_advanced_indicators.py" , # Technical indicators (SMA, EMA, RSI, Pivots)
"fetch_market_news.py" , # Last 50 news items per stock
"fetch_corporate_actions.py" , # Dividend, bonus, splits, results calendar
"fetch_surveillance_lists.py" , # NSE ASM/GSM lists
"fetch_circuit_stocks.py" , # Upper/lower circuit breakers
"fetch_bulk_block_deals.py" , # Bulk/block deals
"fetch_incremental_price_bands.py" , # Circuit limit revisions
"fetch_complete_price_bands.py" , # Current circuit limits
"fetch_all_indices.py" # Index constituents
]
Outputs:
company_filings/*.json - Per-stock regulatory filings
market_news/*.json - Per-stock news feed with sentiment
all_company_announcements.json - Consolidated announcements
advanced_indicator_data.json - Technical indicators
upcoming_corporate_actions.json - Future corporate actions
nse_asm_list.json, nse_gsm_list.json - Surveillance lists
Circuit and deal data files
Phase 2.5: OHLCV Data (Smart Incremental)
if FETCH_OHLCV :
results[ "fetch_all_ohlcv.py" ] = run_script( "fetch_all_ohlcv.py" , "Phase 2.5" )
results[ "fetch_indices_ohlcv.py" ] = run_script( "fetch_indices_ohlcv.py" , "Phase 2.5" )
Outputs:
ohlcv_data/*.csv - Historical OHLCV data per stock (incremental updates)
OHLCV scripts use smart incremental logic: only fetch missing dates, making subsequent runs 10-15x faster.
Phase 3: Base Analysis
Critical Step : This script builds the master JSON structure. All Phase 4 scripts modify this file in-place.
results[ "bulk_market_analyzer.py" ] = run_script( "bulk_market_analyzer.py" , "Phase 3" )
if not results[ "bulk_market_analyzer.py" ]:
print ( "🛑 CRITICAL: bulk_market_analyzer.py failed." )
return
Output:
all_stocks_fundamental_analysis.json - Base master JSON with ~60 fields per stock
Phase 4: Enrichment (Order Matters!)
STRICT EXECUTION ORDER REQUIRED : Each script modifies the master JSON in-place. Running out of order will cause data corruption or missing fields.
# 4a. Advanced Metrics (ADR, RVOL, ATH) - needs ohlcv_data/
results[ "advanced_metrics_processor.py" ] = run_script( "advanced_metrics_processor.py" , "Phase 4" )
# 4b. Earnings Performance - needs company_filings/ + ohlcv_data/
results[ "process_earnings_performance.py" ] = run_script( "process_earnings_performance.py" , "Phase 4" )
# 4c. F&O Data (Lot Size, Next Expiry)
results[ "enrich_fno_data.py" ] = run_script( "enrich_fno_data.py" , "Phase 4" )
# 4d. Market Breadth & Relative Strength
results[ "process_market_breadth.py" ] = run_script( "process_market_breadth.py" , "Phase 4" )
# 4e. Historical Market Breadth
results[ "process_historical_market_breadth.py" ] = run_script( "process_historical_market_breadth.py" , "Phase 4" )
# 4f. Corporate Events + News Feed (MUST BE LAST)
results[ "add_corporate_events.py" ] = run_script( "add_corporate_events.py" , "Phase 4" )
Modified Fields Added:
Volume/turnover metrics, ADR, RVOL, ATH data
Earnings performance metrics
F&O eligibility, lot sizes, expiry dates
Event markers, announcements, news feed
Phase 5: Compression
raw_size, gz_size = compress_output()
Compression Logic:
files_to_compress = {
"all_stocks_fundamental_analysis.json" : "all_stocks_fundamental_analysis.json.gz" ,
"sector_analytics.json" : "sector_analytics.json.gz" ,
"market_breadth.csv" : "market_breadth.json.gz"
}
with gzip.open(gz_path, "wb" , compresslevel = 9 ) as f_out:
f_out.write(data)
Typical Compression:
Raw JSON: ~15-25 MB
Compressed: ~2-3 MB (85-90% reduction)
Phase 6: Cleanup
if CLEANUP_INTERMEDIATE :
cleanup_intermediate()
Files Deleted:
INTERMEDIATE_FILES = [
"master_isin_map.json" ,
"dhan_data_response.json" ,
"fundamental_data.json" ,
"advanced_indicator_data.json" ,
"all_company_announcements.json" ,
"upcoming_corporate_actions.json" ,
"history_corporate_actions.json" ,
"nse_asm_list.json" ,
"nse_gsm_list.json" ,
"bulk_block_deals.json" ,
"upper_circuit_stocks.json" ,
"lower_circuit_stocks.json" ,
"incremental_price_bands.json" ,
"complete_price_bands.json" ,
"nse_equity_list.csv" ,
"all_stocks_fundamental_analysis.json" # Raw JSON cleaned after .gz
]
INTERMEDIATE_DIRS = [
"company_filings" ,
"market_news"
]
Script Runner Function
def run_script ( script_name , phase_label ):
"""Run a Python script and return success/failure."""
script_path = os.path.join( BASE_DIR , script_name)
if not os.path.exists(script_path):
print ( f " ⚠️ SKIP: { script_name } not found." )
return False
print ( f " ▶ Running { script_name } ..." )
start = time.time()
try :
result = subprocess.run(
[sys.executable, script_path],
cwd = BASE_DIR ,
text = True ,
timeout = 1800 # 30 minute timeout
)
elapsed = time.time() - start
if result.returncode == 0 :
print ( f " ✅ { script_name } ( { elapsed :.1f} s)" )
return True
else :
print ( f " ❌ { script_name } FAILED ( { elapsed :.1f} s)" )
return True # Continue on enrichment errors
except subprocess.TimeoutExpired:
print ( f " ⏰ { script_name } TIMED OUT (>30 min)" )
return False
except Exception as e:
print ( f " ❌ { script_name } EXCEPTION: { e } " )
return False
Usage
Run Full Pipeline
python run_full_pipeline.py
Expected Output
════════════════════════════════════════════════════════════
EDL PIPELINE - FULL DATA REFRESH
════════════════════════════════════════════════════════════
📦 PHASE 1: Core Data (Foundation)
────────────────────────────────────
▶ Running fetch_dhan_data.py...
✅ fetch_dhan_data.py (3.2s)
▶ Running fetch_fundamental_data.py...
✅ fetch_fundamental_data.py (142.8s)
📡 PHASE 2: Data Enrichment (Fetching)
────────────────────────────────────
▶ Running fetch_company_filings.py...
✅ fetch_company_filings.py (89.3s)
...
📊 PHASE 2.5: OHLCV History (Smart Incremental)
────────────────────────────────────
▶ Running fetch_all_ohlcv.py...
✅ fetch_all_ohlcv.py (167.4s)
🔬 PHASE 3: Base Analysis (Building Master JSON)
────────────────────────────────────
▶ Running bulk_market_analyzer.py...
✅ bulk_market_analyzer.py (8.7s)
✨ PHASE 4: Enrichment (Injecting into Master JSON)
────────────────────────────────────
▶ Running advanced_metrics_processor.py...
✅ advanced_metrics_processor.py (12.4s)
...
📦 PHASE 5: Compression (.json → .json.gz)
────────────────────────────────────
📦 Compressed: 18.3 MB → 2.1 MB (89% reduction)
🧹 CLEANUP: Removing intermediate files...
────────────────────────────────────
🗑️ Cleaned: 13 files + 2 dirs (45.7 MB freed)
════════════════════════════════════════════════════════════
PIPELINE COMPLETE
════════════════════════════════════════════════════════════
Total Time: 524.3s (8.7 min)
Successful: 18/18
Failed: 0/18
📄 Output: all_stocks_fundamental_analysis.json.gz (2.1 MB)
📦 Compression: 18.3 MB → 2.1 MB (89% smaller)
🧹 Only .json.gz + ohlcv_data/ remain. All intermediate data purged.
════════════════════════════════════════════════════════════
Phase Scripts Avg Time Bottleneck Phase 1 2 ~2-3 min Fundamental data batching Phase 2 11 ~3-5 min Company filings (multi-threaded) Phase 2.5 2 ~2-30 min OHLCV (incremental vs. full fetch) Phase 3 1 ~8-10 sec Analysis calculations Phase 4 6 ~1-2 min CSV processing Phase 5 1 ~2-3 sec Compression Total 23 8-15 min OHLCV (first run: 30-40 min)
Error Handling
Critical Failures (Pipeline Aborts)
fetch_dhan_data.py fails : No master_isin_map.json → cannot proceed
bulk_market_analyzer.py fails : No master JSON → Phase 4 has nothing to enrich
Non-Critical Failures (Pipeline Continues)
Any Phase 2 enrichment script fails → missing data but pipeline continues
Phase 4 scripts fail → some fields missing/zeroed but output still generated
Timeout Configuration
Each script has a 30-minute timeout:
timeout = 1800 # 30 minutes in seconds
Dependencies
import subprocess
import sys
import os
import time
import shutil
import glob
import gzip
import json
Final Output Structure
{
"Symbol" : "RELIANCE" ,
"Name" : "Reliance Industries Ltd" ,
"Market Cap(Cr.)" : 1850234.5 ,
"Stock Price(₹)" : 2745.30 ,
"ROE(%)" : 12.5 ,
"ROCE(%)" : 14.2 ,
"P/E" : 28.3 ,
"30 Days Average Rupee Volume(Cr.)" : 1234.5 ,
"RVOL" : 1.23 ,
"% from ATH" : -12.4 ,
"F&O" : "Yes" ,
"Lot Size" : 250 ,
"Next Expiry" : "2026-03-26" ,
"Event Markers" : "💸: Dividend (15-Mar) | 📊: Results Recently Out" ,
"Recent Announcements" : [ ... ],
"News Feed" : [ ... ]
}
Next Steps
Phase 1: Core Data Learn about foundation data fetching
Phase 2: Enrichment Explore data enrichment scripts
Phase 3: Analysis Understand base analysis logic
Phase 4: Injection Deep dive into enrichment injection