Skip to main content

Overview

Phase 4 consists of 6 scripts that modify the master JSON in-place, adding advanced metrics and enrichment data. Each script loads all_stocks_fundamental_analysis.json, updates specific fields, and saves it back.
STRICT EXECUTION ORDER REQUIRED: Scripts must run in the exact order specified. Running out of order will cause data corruption or missing fields.

Execution Order


1. advanced_metrics_processor.py

Purpose: Adds volume, turnover, ADR, RVOL, ATH, and gap metrics using OHLCV historical data. Dependencies:
  • all_stocks_fundamental_analysis.json (Phase 3)
  • ohlcv_data/*.csv (Phase 2.5)
  • complete_price_bands.json (Phase 2)
Fields Added:
30 Days Average Rupee Volume(Cr.)
number
Average daily rupee volume over last 30 trading days
RVOL
number
Relative volume: Today’s volume / 20-day average volume
Daily Rupee Turnover 20(Cr.)
number
20-day moving average of daily turnover
200 Days EMA Volume
number
200-day exponential moving average of volume
5 Days MA ADR(%)
number
5-day moving average of Average Daily Range (High-Low as % of Low)
% from ATH
number
Percentage distance from All-Time High
Gap Up %
number
Gap between today’s open and yesterday’s close as percentage
Circuit Limit
string
Current circuit limit band (e.g., “5%”, “2%”, “10%“)

Core Processing Logic

def process_symbol_csv(csv_path):
    sym = os.path.basename(csv_path).replace(".csv", "")
    df = pd.read_csv(csv_path)
    
    if df.empty or len(df) < 5:
        return sym, None

    # Ensure numeric
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    
    df = df.dropna()
    latest = df.iloc[-1]
    prev = df.iloc[-2] if len(df) > 1 else latest

    # 1. ATH (All-Time High)
    ath = df['High'].max()
    pct_from_ath = ((ath - latest['Close']) / ath) * 100 if ath > 0 else 0
    
    # 2. Gap Up % and Day Range %
    gap_up_pct = ((latest['Open'] - prev['Close']) / prev['Close']) * 100 if prev['Close'] > 0 else 0
    day_range_pct = ((latest['High'] - latest['Low']) / latest['Low']) * 100 if latest['Low'] > 0 else 0
    
    # 3. ADR (Average Daily Range)
    df['Daily_Range_Pct'] = ((df['High'] - df['Low']) / df['Low']) * 100
    adr_5 = df['Daily_Range_Pct'].tail(5).mean()
    adr_14 = df['Daily_Range_Pct'].tail(14).mean()
    adr_20 = df['Daily_Range_Pct'].tail(20).mean()
    adr_30 = df['Daily_Range_Pct'].tail(30).mean()

    # 4. Returns & Low Benchmarks
    price_6m_ago = df['Close'].iloc[-126] if len(df) >= 126 else df['Close'].iloc[0]
    returns_6m = ((latest['Close'] - price_6m_ago) / price_6m_ago) * 100
    
    low_52w = df['Low'].tail(252).min()
    pct_from_52w_low = ((latest['Close'] - low_52w) / low_52w) * 100 if low_52w > 0 else 0

    # 5. Volume Metrics
    df['Turnover_Cr'] = (df['Close'] * df['Volume']) / 10000000 
    avg_rupee_vol_30 = df['Turnover_Cr'].tail(30).mean()
    
    avg_vol_20 = df['Volume'].tail(21).iloc[:-1].mean()
    rvol = latest['Volume'] / avg_vol_20 if avg_vol_20 > 0 else 0
    
    df['EMA_Vol_200'] = calculate_ema(df['Volume'], 200)
    ema_vol_200_latest = df['EMA_Vol_200'].iloc[-1]
    
    ema_vol_200_52w_high = df['EMA_Vol_200'].tail(252).max()
    pct_from_ema_200_52w_high = ((ema_vol_200_latest - ema_vol_200_52w_high) / ema_vol_200_52w_high) * 100

    # 6. Turnover Moving Averages
    turnover_20 = df['Turnover_Cr'].tail(20).mean()
    turnover_50 = df['Turnover_Cr'].tail(50).mean()
    turnover_100 = df['Turnover_Cr'].tail(100).mean()

    return sym, {
        "30 Days Average Rupee Volume(Cr.)": round(avg_rupee_vol_30, 2),
        "RVOL": round(rvol, 2),
        "Daily Rupee Turnover 20(Cr.)": round(turnover_20, 2),
        "Daily Rupee Turnover 50(Cr.)": round(turnover_50, 2),
        "Daily Rupee Turnover 100(Cr.)": round(turnover_100, 2),
        "200 Days EMA Volume": round(ema_vol_200_latest, 0),
        "% from 52W High 200 Days EMA Volume": round(pct_from_ema_200_52w_high, 2),
        "5 Days MA ADR(%)": round(adr_5, 2),
        "14 Days MA ADR(%)": round(adr_14, 2),
        "20 Days MA ADR(%)": round(adr_20, 2),
        "30 Days MA ADR(%)": round(adr_30, 2),
        "% from ATH": round(pct_from_ath, 2),
        "Gap Up %": round(gap_up_pct, 2),
        "Day Range(%)": round(day_range_pct, 2),
        "6 Month Returns(%)": round(returns_6m, 2),
        "% from 52W Low": round(pct_from_52w_low, 2)
    }

Hybrid Fix for 1-Day Lag

# Use Live LTP from master data to eliminate lag
live_ltp = pd.to_numeric(stock.get("Ltp"), errors='coerce')
if pd.notnull(live_ltp) and live_ltp > 0:
    ath = metrics.get("ATH_Value", 0)
    if ath > 0:
        metrics["% from ATH"] = round(((ath - live_ltp) / ath) * 100, 2)

2. process_earnings_performance.py

Purpose: Calculates post-earnings returns using smart benchmarking that accounts for market hours. Dependencies:
  • all_stocks_fundamental_analysis.json
  • company_filings/*.json (Phase 2)
  • ohlcv_data/*.csv (Phase 2.5)
Fields Added:
Quarterly Results Date
string
Latest earnings announcement date (YYYY-MM-DD)
Returns since Earnings(%)
number
Price return from earnings announcement to current price
Max Returns since Earnings(%)
number
Maximum price gain achieved since earnings (captures best exit point)

Smart Benchmarking Logic

def calculate_earnings_metrics(csv_path, earnings_news_date):
    """Calculate returns using smart after-hours detection."""
    
    if not earnings_news_date:
        return 0.0, 0.0
    
    # Parse date and time: "2026-01-27 20:17:25"
    date_part = earnings_news_date.split(" ")[0]
    time_part = earnings_news_date.split(" ")[1] if " " in earnings_news_date else "00:00:00"
    
    target_date = pd.to_datetime(date_part)
    hour = int(time_part.split(":")[0])
    minute = int(time_part.split(":")[1])
    
    df = pd.read_csv(csv_path)
    df['Date'] = pd.to_datetime(df['Date'])
    
    latest_price = df.iloc[-1]['Close']

    # Determine if news hit after market hours
    # Indian market closes at 15:30
    is_after_hours = (hour > 15) or (hour == 15 and minute >= 30)
    
    if is_after_hours:
        # Benchmark: close of announcement day
        pre_news_df = df[df['Date'] <= target_date]
        post_news_df = df[df['Date'] > target_date]
    else:
        # Benchmark: close of day BEFORE announcement
        pre_news_df = df[df['Date'] < target_date]
        post_news_df = df[df['Date'] >= target_date]
    
    if pre_news_df.empty or post_news_df.empty:
        return 0.0, 0.0
    
    base_price = pre_news_df.iloc[-1]['Close']
    
    # 1. Returns since Earnings
    returns_since = ((latest_price - base_price) / base_price) * 100
    
    # 2. Max Returns since Earnings
    max_high = post_news_df['High'].max()
    max_returns = ((max_high - base_price) / base_price) * 100
    
    return round(returns_since, 2), round(max_returns, 2)

Example Scenarios

Scenario: Results announced at 20:17 (after market close)Logic:
  • Market closed at 15:30 on Jan 27
  • Announcement at 20:17 on Jan 27
  • Benchmark: Jan 27 close
  • First reactive session: Jan 28
Calculation:
base_price = df[df['Date'] == '2026-01-27']['Close']
returns = (current_price - base_price) / base_price * 100
Scenario: Results announced at 12:30 (during market hours)Logic:
  • Market open when announced
  • Benchmark: Previous day close (Jan 26)
  • First reactive session: Jan 27 (same day)
Calculation:
base_price = df[df['Date'] == '2026-01-26']['Close']
returns = (current_price - base_price) / base_price * 100

3. enrich_fno_data.py

Purpose: Adds F&O eligibility, lot sizes, and next expiry dates. Dependencies:
  • all_stocks_fundamental_analysis.json
  • master_isin_map.json (for FnoFlag)
  • Dhan F&O APIs
Fields Added:
F&O
string
F&O eligibility: “Yes” or “No”
Lot Size
number | string
Current month lot size (e.g., 250) or “N/A”
Next Expiry
string
Next expiry date in YYYY-MM-DD format or “N/A”

BuildId Dynamic Fetching

def get_build_id():
    """Dynamically fetch Next.js buildId from Dhan page."""
    url = "https://dhan.co/nse-fno-lot-size/"
    response = requests.get(url, headers=headers, timeout=10)
    match = re.search(r'"buildId":"([^"]+)"', response.text)
    return match.group(1) if match else None

Lot Size Fetching

def fetch_lot_sizes(build_id):
    """Fetch current F&O lot sizes."""
    lot_map = {}
    url = f"https://dhan.co/_next/data/{build_id}/nse-fno-lot-size.json"
    
    r = requests.get(url, headers=headers, timeout=15)
    if r.status_code == 200:
        data = r.json()
        instruments = data.get("pageProps", {}).get("listData", [])
        for item in instruments:
            sym = item.get("sym")
            fo_contracts = item.get("fo_dt", [])
            if sym and fo_contracts:
                # First contract = current month lot size
                lot_map[sym] = fo_contracts[0].get("ls")
    
    return lot_map

Next Expiry Fetching

def fetch_next_expiry(build_id):
    """Fetch F&O expiry calendar."""
    expiry_map = {}
    url = f"https://dhan.co/_next/data/{build_id}/fno-expiry-calendar.json"
    
    r = requests.get(url, headers=headers, timeout=15)
    data = r.json()
    expiry_raw = data.get("pageProps", {}).get("expiryData", {}).get("data", [])

    from datetime import datetime
    today = datetime.now().strftime("%Y-%m-%d")

    for exchange_data in expiry_raw:
        for exp_group in exchange_data.get("exps", []):
            for item in exp_group.get("explst", []):
                sym = item.get("symbolName")
                exp_date = item.get("expdate")
                if sym and exp_date and exp_date >= today:
                    # Keep nearest future expiry
                    if sym not in expiry_map or exp_date < expiry_map[sym]:
                        expiry_map[sym] = exp_date
    
    return expiry_map

4. process_market_breadth.py

Purpose: Adds market breadth metrics and relative strength ratings. Fields Added:
  • Sector-wise advance/decline ratios
  • Market breadth percentages
  • Relative strength rating (RSR)

5. process_historical_market_breadth.py

Purpose: Generates historical market breadth time-series data for charts. Output: market_breadth.json.gz (separate file, not injected into master JSON)

6. add_corporate_events.py (MUST BE LAST)

Purpose: Adds event markers, recent announcements, and news feed. Must run last because it aggregates data from all prior enrichment. Dependencies:
  • all_stocks_fundamental_analysis.json
  • upcoming_corporate_actions.json
  • company_filings/*.json
  • all_company_announcements.json
  • nse_asm_list.json, nse_gsm_list.json
  • bulk_block_deals.json
  • incremental_price_bands.json
  • market_news/*.json
Fields Added:
Event Markers
string
Pipe-separated list of event icons with labels (e.g., ”💸: Dividend (15-Mar) | 📊: Results Recently Out”)
Recent Announcements
array
Top 5 regulatory announcements with Date, Headline, URL
News Feed
array
Top 5 market news items with Title, Sentiment, Date

Event Icon Mapping

def add_event(sym, event_str):
    if sym not in refined_map:
        refined_map[sym] = []
    if event_str not in refined_map[sym]:
        refined_map[sym].append(event_str)

# Surveillance
if "LTASM" in stage:
    add_event(sym, "⭐: LTASM")
elif "STASM" in stage:
    add_event(sym, "⭐: STASM")

# Corporate Actions
if "QUARTERLY" in etype and within_14_days:
    add_event(sym, f"⏰: Results ({date_str})")
elif "DIVIDEND" in etype:
    add_event(sym, f"💸: Dividend ({date_str})")
elif "BONUS" in etype:
    add_event(sym, f"🎁: Bonus ({date_str})")
elif "SPLIT" in etype:
    add_event(sym, f"✂️: Split ({date_str})")
elif "RIGHTS" in etype:
    add_event(sym, f"📈: Rights ({date_str})")

# Circuit Revisions
if t_band < f_band:
    add_event(sym, "#: -ve Circuit Limit Revision")
elif t_band > f_band:
    add_event(sym, "#: +ve Circuit Limit Revision")

# Deals
if "BLOCK" in deal_type or "BULK" in deal_type:
    add_event(sym, "📦: Block Deal")

# Insider Trading
if is_insider_trade(filing):
    add_event(sym, "🔑: Insider Trading")

# Recent Results
if "results are out" in event.lower():
    add_event(sym, "📊: Results Recently Out")

Insider Trading Detection

def detect_insider_trading(filing_item):
    desc = (item.get("descriptor") or "").lower()
    caption = (item.get("caption") or "").lower()
    cat = (item.get("cat") or "").lower()
    body = (item.get("news_body") or "").lower()
    
    full_text = f"{desc} {caption} {cat} {body}"
    
    # Keywords indicating insider trading
    trade_keywords = [
        "regulation 7(2)", "reg 7(2)", "inter-se transfer", 
        "form c", "continual disclosure"
    ]
    
    if any(k in full_text for k in trade_keywords):
        return True
    
    if ("insider trading" in full_text or "sebi (pit)" in full_text):
        # Exclude trading window closures
        if "trading window" not in full_text and "closure" not in full_text:
            return True
    
    return False

Announcement Deduplication

# Merge company_filings and all_company_announcements
news_map[sym] = []

# Add from filings (top 5)
for filing in filings[:5]:
    headline = filing.get("caption") or filing.get("descriptor")
    news_map[sym].append({
        "Date": filing.get("news_date"),
        "Headline": headline,
        "URL": filing.get("file_url") or "N/A"
    })

# Add from announcements (if not duplicate)
for ann in announcements:
    event_text = ann.get("Event")
    exists = any(event_text.lower() in item["Headline"].lower() 
                 for item in news_map[sym])
    
    if not exists:
        news_map[sym].insert(0, {
            "Date": ann.get("Date"),
            "Headline": event_text,
            "URL": "N/A"
        })

# Keep only top 5
news_map[sym] = news_map[sym][:5]

Final Update

for stock in master_data:
    sym = stock.get("Symbol")
    
    # 1. Event Markers
    events = refined_map.get(sym, [])
    stock["Event Markers"] = " | ".join(events) if events else "N/A"

    # 2. Recent Announcements (Regulatory)
    stock["Recent Announcements"] = news_map.get(sym, [])[:5]
    
    # 3. News Feed (Market/Media with Sentiment)
    stock["News Feed"] = news_feed_map.get(sym, [])

Final Output Example

{
  "Symbol": "RELIANCE",
  "Name": "Reliance Industries Ltd",
  
  // ... Base fields from Phase 3 ...
  
  // Phase 4 Additions
  "30 Days Average Rupee Volume(Cr.)": 1234.5,
  "RVOL": 1.23,
  "% from ATH": -12.4,
  "Gap Up %": 0.8,
  "Circuit Limit": "10%",
  
  "Quarterly Results Date": "2026-02-15",
  "Returns since Earnings(%)": 5.6,
  "Max Returns since Earnings(%)": 8.9,
  
  "F&O": "Yes",
  "Lot Size": 250,
  "Next Expiry": "2026-03-26",
  
  "Event Markers": "💸: Dividend (15-Mar) | 📊: Results Recently Out",
  
  "Recent Announcements": [
    {
      "Date": "2026-02-15 18:30:00",
      "Headline": "Financial Results - Q3 FY25",
      "URL": "https://..."
    }
  ],
  
  "News Feed": [
    {
      "Title": "Reliance Q3 Results Beat Estimates",
      "Sentiment": "positive",
      "Date": 1709123456789
    }
  ]
}

Performance Summary

ScriptDependenciesAvg TimeFields Added
advanced_metrics_processor.pyohlcv_data/10-15s15 metrics
process_earnings_performance.pyfilings/, ohlcv_data/8-12s3 fields
enrich_fno_data.pyAPIs5-8s3 fields
process_market_breadth.pyohlcv_data/15-20s5 fields
process_historical_market_breadth.pyohlcv_data/20-30sSeparate file
add_corporate_events.pyAll Phase 2 outputs10-15s3 complex fields
Total Phase 4-1-2 min~30 fields

Next Steps

Back to Master Runner

Return to pipeline overview and execution guide

Build docs developers (and LLMs) love