Skip to main content

Overview

The run_full_pipeline.py script is the master orchestrator that executes the entire ChartsMaze EDL (Extract, Data, Load) pipeline. It runs 20+ scripts in strict sequential order to produce the final compressed output: all_stocks_fundamental_analysis.json.gz. Location: run_full_pipeline.py

Pipeline Architecture

The pipeline follows a strict 6-phase execution model:

Configuration Options

FETCH_OHLCV
boolean
default:"True"
Controls OHLCV historical data fetching:
  • True: Always fetch (incremental update: 2-5 min if data exists, ~30 min first time)
  • False: Skip entirely (ADR, RVOL, ATH, % from ATH fields will be 0)
FETCH_OPTIONAL
boolean
default:"False"
Controls standalone data fetching:
  • True: Fetch indices and ETF data
  • False: Skip optional data (faster execution)
CLEANUP_INTERMEDIATE
boolean
default:"True"
Auto-delete intermediate files after successful pipeline completion:
  • True: Only keeps all_stocks_fundamental_analysis.json.gz + ohlcv_data/
  • False: Retains all intermediate JSON files for debugging

Execution Flow

Phase 1: Core Data (Foundation)

Critical Dependency: fetch_dhan_data.py MUST succeed. It produces master_isin_map.json which ALL other scripts depend on. Pipeline aborts if this fails.
# Phase 1 Scripts
results["fetch_dhan_data.py"] = run_script("fetch_dhan_data.py", "Phase 1")

if not results["fetch_dhan_data.py"]:
    print("🛑 CRITICAL: fetch_dhan_data.py failed. Cannot continue.")
    return

results["fetch_fundamental_data.py"] = run_script("fetch_fundamental_data.py", "Phase 1")
Outputs:
  • dhan_data_response.json - Raw market data for ~5000 stocks
  • master_isin_map.json - Symbol → ISIN → Name → Sid → FnoFlag mapping
  • fundamental_data.json - Fundamental metrics (P&L, balance sheet, ratios)
  • nse_equity_list.csv - Listing dates from NSE

Phase 2: Data Enrichment (Parallel Fetching)

All Phase 2 scripts depend on master_isin_map.json but are independent of each other:
phase2_scripts = [
    "fetch_company_filings.py",      # Company filings (quarterly results, announcements)
    "fetch_new_announcements.py",     # Live announcements feed
    "fetch_advanced_indicators.py",   # Technical indicators (SMA, EMA, RSI, Pivots)
    "fetch_market_news.py",           # Last 50 news items per stock
    "fetch_corporate_actions.py",     # Dividend, bonus, splits, results calendar
    "fetch_surveillance_lists.py",    # NSE ASM/GSM lists
    "fetch_circuit_stocks.py",        # Upper/lower circuit breakers
    "fetch_bulk_block_deals.py",      # Bulk/block deals
    "fetch_incremental_price_bands.py", # Circuit limit revisions
    "fetch_complete_price_bands.py",  # Current circuit limits
    "fetch_all_indices.py"            # Index constituents
]
Outputs:
  • company_filings/*.json - Per-stock regulatory filings
  • market_news/*.json - Per-stock news feed with sentiment
  • all_company_announcements.json - Consolidated announcements
  • advanced_indicator_data.json - Technical indicators
  • upcoming_corporate_actions.json - Future corporate actions
  • nse_asm_list.json, nse_gsm_list.json - Surveillance lists
  • Circuit and deal data files

Phase 2.5: OHLCV Data (Smart Incremental)

if FETCH_OHLCV:
    results["fetch_all_ohlcv.py"] = run_script("fetch_all_ohlcv.py", "Phase 2.5")
    results["fetch_indices_ohlcv.py"] = run_script("fetch_indices_ohlcv.py", "Phase 2.5")
Outputs:
  • ohlcv_data/*.csv - Historical OHLCV data per stock (incremental updates)
OHLCV scripts use smart incremental logic: only fetch missing dates, making subsequent runs 10-15x faster.

Phase 3: Base Analysis

Critical Step: This script builds the master JSON structure. All Phase 4 scripts modify this file in-place.
results["bulk_market_analyzer.py"] = run_script("bulk_market_analyzer.py", "Phase 3")

if not results["bulk_market_analyzer.py"]:
    print("🛑 CRITICAL: bulk_market_analyzer.py failed.")
    return
Output:
  • all_stocks_fundamental_analysis.json - Base master JSON with ~60 fields per stock

Phase 4: Enrichment (Order Matters!)

STRICT EXECUTION ORDER REQUIRED: Each script modifies the master JSON in-place. Running out of order will cause data corruption or missing fields.
# 4a. Advanced Metrics (ADR, RVOL, ATH) - needs ohlcv_data/
results["advanced_metrics_processor.py"] = run_script("advanced_metrics_processor.py", "Phase 4")

# 4b. Earnings Performance - needs company_filings/ + ohlcv_data/
results["process_earnings_performance.py"] = run_script("process_earnings_performance.py", "Phase 4")

# 4c. F&O Data (Lot Size, Next Expiry)
results["enrich_fno_data.py"] = run_script("enrich_fno_data.py", "Phase 4")

# 4d. Market Breadth & Relative Strength
results["process_market_breadth.py"] = run_script("process_market_breadth.py", "Phase 4")

# 4e. Historical Market Breadth
results["process_historical_market_breadth.py"] = run_script("process_historical_market_breadth.py", "Phase 4")

# 4f. Corporate Events + News Feed (MUST BE LAST)
results["add_corporate_events.py"] = run_script("add_corporate_events.py", "Phase 4")
Modified Fields Added:
  • Volume/turnover metrics, ADR, RVOL, ATH data
  • Earnings performance metrics
  • F&O eligibility, lot sizes, expiry dates
  • Event markers, announcements, news feed

Phase 5: Compression

raw_size, gz_size = compress_output()
Compression Logic:
files_to_compress = {
    "all_stocks_fundamental_analysis.json": "all_stocks_fundamental_analysis.json.gz",
    "sector_analytics.json": "sector_analytics.json.gz",
    "market_breadth.csv": "market_breadth.json.gz"
}

with gzip.open(gz_path, "wb", compresslevel=9) as f_out:
    f_out.write(data)
Typical Compression:
  • Raw JSON: ~15-25 MB
  • Compressed: ~2-3 MB (85-90% reduction)

Phase 6: Cleanup

if CLEANUP_INTERMEDIATE:
    cleanup_intermediate()
Files Deleted:
INTERMEDIATE_FILES = [
    "master_isin_map.json",
    "dhan_data_response.json",
    "fundamental_data.json",
    "advanced_indicator_data.json",
    "all_company_announcements.json",
    "upcoming_corporate_actions.json",
    "history_corporate_actions.json",
    "nse_asm_list.json",
    "nse_gsm_list.json",
    "bulk_block_deals.json",
    "upper_circuit_stocks.json",
    "lower_circuit_stocks.json",
    "incremental_price_bands.json",
    "complete_price_bands.json",
    "nse_equity_list.csv",
    "all_stocks_fundamental_analysis.json"  # Raw JSON cleaned after .gz
]

INTERMEDIATE_DIRS = [
    "company_filings",
    "market_news"
]

Script Runner Function

def run_script(script_name, phase_label):
    """Run a Python script and return success/failure."""
    script_path = os.path.join(BASE_DIR, script_name)
    
    if not os.path.exists(script_path):
        print(f"  ⚠️  SKIP: {script_name} not found.")
        return False
    
    print(f"  ▶ Running {script_name}...")
    start = time.time()
    
    try:
        result = subprocess.run(
            [sys.executable, script_path],
            cwd=BASE_DIR,
            text=True,
            timeout=1800  # 30 minute timeout
        )
        elapsed = time.time() - start
        
        if result.returncode == 0:
            print(f"  ✅ {script_name} ({elapsed:.1f}s)")
            return True
        else:
            print(f"  ❌ {script_name} FAILED ({elapsed:.1f}s)")
            return True  # Continue on enrichment errors
            
    except subprocess.TimeoutExpired:
        print(f"  ⏰ {script_name} TIMED OUT (>30 min)")
        return False
    except Exception as e:
        print(f"  ❌ {script_name} EXCEPTION: {e}")
        return False

Usage

Run Full Pipeline

python run_full_pipeline.py

Expected Output

════════════════════════════════════════════════════════════
  EDL PIPELINE - FULL DATA REFRESH
════════════════════════════════════════════════════════════

📦 PHASE 1: Core Data (Foundation)
────────────────────────────────────
  ▶ Running fetch_dhan_data.py...
  ✅ fetch_dhan_data.py (3.2s)
  ▶ Running fetch_fundamental_data.py...
  ✅ fetch_fundamental_data.py (142.8s)

📡 PHASE 2: Data Enrichment (Fetching)
────────────────────────────────────
  ▶ Running fetch_company_filings.py...
  ✅ fetch_company_filings.py (89.3s)
  ...

📊 PHASE 2.5: OHLCV History (Smart Incremental)
────────────────────────────────────
  ▶ Running fetch_all_ohlcv.py...
  ✅ fetch_all_ohlcv.py (167.4s)

🔬 PHASE 3: Base Analysis (Building Master JSON)
────────────────────────────────────
  ▶ Running bulk_market_analyzer.py...
  ✅ bulk_market_analyzer.py (8.7s)

✨ PHASE 4: Enrichment (Injecting into Master JSON)
────────────────────────────────────
  ▶ Running advanced_metrics_processor.py...
  ✅ advanced_metrics_processor.py (12.4s)
  ...

📦 PHASE 5: Compression (.json → .json.gz)
────────────────────────────────────
  📦 Compressed: 18.3 MB → 2.1 MB (89% reduction)

🧹 CLEANUP: Removing intermediate files...
────────────────────────────────────
  🗑️  Cleaned: 13 files + 2 dirs (45.7 MB freed)

════════════════════════════════════════════════════════════
  PIPELINE COMPLETE
════════════════════════════════════════════════════════════
  Total Time:  524.3s (8.7 min)
  Successful:  18/18
  Failed:      0/18

  📄 Output: all_stocks_fundamental_analysis.json.gz (2.1 MB)
  📦 Compression: 18.3 MB → 2.1 MB (89% smaller)
  🧹 Only .json.gz + ohlcv_data/ remain. All intermediate data purged.
════════════════════════════════════════════════════════════

Performance Metrics

PhaseScriptsAvg TimeBottleneck
Phase 12~2-3 minFundamental data batching
Phase 211~3-5 minCompany filings (multi-threaded)
Phase 2.52~2-30 minOHLCV (incremental vs. full fetch)
Phase 31~8-10 secAnalysis calculations
Phase 46~1-2 minCSV processing
Phase 51~2-3 secCompression
Total238-15 minOHLCV (first run: 30-40 min)

Error Handling

Critical Failures (Pipeline Aborts)

  1. fetch_dhan_data.py fails: No master_isin_map.json → cannot proceed
  2. bulk_market_analyzer.py fails: No master JSON → Phase 4 has nothing to enrich

Non-Critical Failures (Pipeline Continues)

  • Any Phase 2 enrichment script fails → missing data but pipeline continues
  • Phase 4 scripts fail → some fields missing/zeroed but output still generated

Timeout Configuration

Each script has a 30-minute timeout:
timeout=1800  # 30 minutes in seconds

Dependencies

import subprocess
import sys
import os
import time
import shutil
import glob
import gzip
import json

Final Output Structure

{
  "Symbol": "RELIANCE",
  "Name": "Reliance Industries Ltd",
  "Market Cap(Cr.)": 1850234.5,
  "Stock Price(₹)": 2745.30,
  "ROE(%)": 12.5,
  "ROCE(%)": 14.2,
  "P/E": 28.3,
  "30 Days Average Rupee Volume(Cr.)": 1234.5,
  "RVOL": 1.23,
  "% from ATH": -12.4,
  "F&O": "Yes",
  "Lot Size": 250,
  "Next Expiry": "2026-03-26",
  "Event Markers": "💸: Dividend (15-Mar) | 📊: Results Recently Out",
  "Recent Announcements": [...],
  "News Feed": [...]
}

Next Steps

Phase 1: Core Data

Learn about foundation data fetching

Phase 2: Enrichment

Explore data enrichment scripts

Phase 3: Analysis

Understand base analysis logic

Phase 4: Injection

Deep dive into enrichment injection

Build docs developers (and LLMs) love