Overview
Phase 4 consists of 6 scripts that modify the master JSON in-place , adding advanced metrics and enrichment data. Each script loads all_stocks_fundamental_analysis.json, updates specific fields, and saves it back.
STRICT EXECUTION ORDER REQUIRED : Scripts must run in the exact order specified. Running out of order will cause data corruption or missing fields.
Execution Order
1. advanced_metrics_processor.py
Purpose: Adds volume, turnover, ADR, RVOL, ATH, and gap metrics using OHLCV historical data.
Dependencies:
all_stocks_fundamental_analysis.json (Phase 3)
ohlcv_data/*.csv (Phase 2.5)
complete_price_bands.json (Phase 2)
Fields Added:
30 Days Average Rupee Volume(Cr.)
Average daily rupee volume over last 30 trading days
Relative volume: Today’s volume / 20-day average volume
Daily Rupee Turnover 20(Cr.)
20-day moving average of daily turnover
200-day exponential moving average of volume
5-day moving average of Average Daily Range (High-Low as % of Low)
Percentage distance from All-Time High
Gap between today’s open and yesterday’s close as percentage
Current circuit limit band (e.g., “5%”, “2%”, “10%“)
Core Processing Logic
def process_symbol_csv ( csv_path ):
sym = os.path.basename(csv_path).replace( ".csv" , "" )
df = pd.read_csv(csv_path)
if df.empty or len (df) < 5 :
return sym, None
# Ensure numeric
for col in [ 'Open' , 'High' , 'Low' , 'Close' , 'Volume' ]:
df[col] = pd.to_numeric(df[col], errors = 'coerce' )
df = df.dropna()
latest = df.iloc[ - 1 ]
prev = df.iloc[ - 2 ] if len (df) > 1 else latest
# 1. ATH (All-Time High)
ath = df[ 'High' ].max()
pct_from_ath = ((ath - latest[ 'Close' ]) / ath) * 100 if ath > 0 else 0
# 2. Gap Up % and Day Range %
gap_up_pct = ((latest[ 'Open' ] - prev[ 'Close' ]) / prev[ 'Close' ]) * 100 if prev[ 'Close' ] > 0 else 0
day_range_pct = ((latest[ 'High' ] - latest[ 'Low' ]) / latest[ 'Low' ]) * 100 if latest[ 'Low' ] > 0 else 0
# 3. ADR (Average Daily Range)
df[ 'Daily_Range_Pct' ] = ((df[ 'High' ] - df[ 'Low' ]) / df[ 'Low' ]) * 100
adr_5 = df[ 'Daily_Range_Pct' ].tail( 5 ).mean()
adr_14 = df[ 'Daily_Range_Pct' ].tail( 14 ).mean()
adr_20 = df[ 'Daily_Range_Pct' ].tail( 20 ).mean()
adr_30 = df[ 'Daily_Range_Pct' ].tail( 30 ).mean()
# 4. Returns & Low Benchmarks
price_6m_ago = df[ 'Close' ].iloc[ - 126 ] if len (df) >= 126 else df[ 'Close' ].iloc[ 0 ]
returns_6m = ((latest[ 'Close' ] - price_6m_ago) / price_6m_ago) * 100
low_52w = df[ 'Low' ].tail( 252 ).min()
pct_from_52w_low = ((latest[ 'Close' ] - low_52w) / low_52w) * 100 if low_52w > 0 else 0
# 5. Volume Metrics
df[ 'Turnover_Cr' ] = (df[ 'Close' ] * df[ 'Volume' ]) / 10000000
avg_rupee_vol_30 = df[ 'Turnover_Cr' ].tail( 30 ).mean()
avg_vol_20 = df[ 'Volume' ].tail( 21 ).iloc[: - 1 ].mean()
rvol = latest[ 'Volume' ] / avg_vol_20 if avg_vol_20 > 0 else 0
df[ 'EMA_Vol_200' ] = calculate_ema(df[ 'Volume' ], 200 )
ema_vol_200_latest = df[ 'EMA_Vol_200' ].iloc[ - 1 ]
ema_vol_200_52w_high = df[ 'EMA_Vol_200' ].tail( 252 ).max()
pct_from_ema_200_52w_high = ((ema_vol_200_latest - ema_vol_200_52w_high) / ema_vol_200_52w_high) * 100
# 6. Turnover Moving Averages
turnover_20 = df[ 'Turnover_Cr' ].tail( 20 ).mean()
turnover_50 = df[ 'Turnover_Cr' ].tail( 50 ).mean()
turnover_100 = df[ 'Turnover_Cr' ].tail( 100 ).mean()
return sym, {
"30 Days Average Rupee Volume(Cr.)" : round (avg_rupee_vol_30, 2 ),
"RVOL" : round (rvol, 2 ),
"Daily Rupee Turnover 20(Cr.)" : round (turnover_20, 2 ),
"Daily Rupee Turnover 50(Cr.)" : round (turnover_50, 2 ),
"Daily Rupee Turnover 100(Cr.)" : round (turnover_100, 2 ),
"200 Days EMA Volume" : round (ema_vol_200_latest, 0 ),
" % f rom 52W High 200 Days EMA Volume" : round (pct_from_ema_200_52w_high, 2 ),
"5 Days MA ADR(%)" : round (adr_5, 2 ),
"14 Days MA ADR(%)" : round (adr_14, 2 ),
"20 Days MA ADR(%)" : round (adr_20, 2 ),
"30 Days MA ADR(%)" : round (adr_30, 2 ),
" % f rom ATH" : round (pct_from_ath, 2 ),
"Gap Up %" : round (gap_up_pct, 2 ),
"Day Range(%)" : round (day_range_pct, 2 ),
"6 Month Returns(%)" : round (returns_6m, 2 ),
" % f rom 52W Low" : round (pct_from_52w_low, 2 )
}
Hybrid Fix for 1-Day Lag
# Use Live LTP from master data to eliminate lag
live_ltp = pd.to_numeric(stock.get( "Ltp" ), errors = 'coerce' )
if pd.notnull(live_ltp) and live_ltp > 0 :
ath = metrics.get( "ATH_Value" , 0 )
if ath > 0 :
metrics[ " % f rom ATH" ] = round (((ath - live_ltp) / ath) * 100 , 2 )
Purpose: Calculates post-earnings returns using smart benchmarking that accounts for market hours.
Dependencies:
all_stocks_fundamental_analysis.json
company_filings/*.json (Phase 2)
ohlcv_data/*.csv (Phase 2.5)
Fields Added:
Latest earnings announcement date (YYYY-MM-DD)
Returns since Earnings(%)
Price return from earnings announcement to current price
Max Returns since Earnings(%)
Maximum price gain achieved since earnings (captures best exit point)
Smart Benchmarking Logic
def calculate_earnings_metrics ( csv_path , earnings_news_date ):
"""Calculate returns using smart after-hours detection."""
if not earnings_news_date:
return 0.0 , 0.0
# Parse date and time: "2026-01-27 20:17:25"
date_part = earnings_news_date.split( " " )[ 0 ]
time_part = earnings_news_date.split( " " )[ 1 ] if " " in earnings_news_date else "00:00:00"
target_date = pd.to_datetime(date_part)
hour = int (time_part.split( ":" )[ 0 ])
minute = int (time_part.split( ":" )[ 1 ])
df = pd.read_csv(csv_path)
df[ 'Date' ] = pd.to_datetime(df[ 'Date' ])
latest_price = df.iloc[ - 1 ][ 'Close' ]
# Determine if news hit after market hours
# Indian market closes at 15:30
is_after_hours = (hour > 15 ) or (hour == 15 and minute >= 30 )
if is_after_hours:
# Benchmark: close of announcement day
pre_news_df = df[df[ 'Date' ] <= target_date]
post_news_df = df[df[ 'Date' ] > target_date]
else :
# Benchmark: close of day BEFORE announcement
pre_news_df = df[df[ 'Date' ] < target_date]
post_news_df = df[df[ 'Date' ] >= target_date]
if pre_news_df.empty or post_news_df.empty:
return 0.0 , 0.0
base_price = pre_news_df.iloc[ - 1 ][ 'Close' ]
# 1. Returns since Earnings
returns_since = ((latest_price - base_price) / base_price) * 100
# 2. Max Returns since Earnings
max_high = post_news_df[ 'High' ].max()
max_returns = ((max_high - base_price) / base_price) * 100
return round (returns_since, 2 ), round (max_returns, 2 )
Example Scenarios
After-Hours Announcement (Most Common)
Scenario: Results announced at 20:17 (after market close)Logic:
Market closed at 15:30 on Jan 27
Announcement at 20:17 on Jan 27
Benchmark: Jan 27 close
First reactive session: Jan 28
Calculation: base_price = df[df[ 'Date' ] == '2026-01-27' ][ 'Close' ]
returns = (current_price - base_price) / base_price * 100
Intraday Announcement (Rare)
Scenario: Results announced at 12:30 (during market hours)Logic:
Market open when announced
Benchmark: Previous day close (Jan 26)
First reactive session: Jan 27 (same day)
Calculation: base_price = df[df[ 'Date' ] == '2026-01-26' ][ 'Close' ]
returns = (current_price - base_price) / base_price * 100
3. enrich_fno_data.py
Purpose: Adds F&O eligibility, lot sizes, and next expiry dates.
Dependencies:
all_stocks_fundamental_analysis.json
master_isin_map.json (for FnoFlag)
Dhan F&O APIs
Fields Added:
F&O eligibility: “Yes” or “No”
Current month lot size (e.g., 250) or “N/A”
Next expiry date in YYYY-MM-DD format or “N/A”
BuildId Dynamic Fetching
def get_build_id ():
"""Dynamically fetch Next.js buildId from Dhan page."""
url = "https://dhan.co/nse-fno-lot-size/"
response = requests.get(url, headers = headers, timeout = 10 )
match = re.search( r '"buildId":" ([ ^ " ] + ) "' , response.text)
return match.group( 1 ) if match else None
Lot Size Fetching
def fetch_lot_sizes ( build_id ):
"""Fetch current F&O lot sizes."""
lot_map = {}
url = f "https://dhan.co/_next/data/ { build_id } /nse-fno-lot-size.json"
r = requests.get(url, headers = headers, timeout = 15 )
if r.status_code == 200 :
data = r.json()
instruments = data.get( "pageProps" , {}).get( "listData" , [])
for item in instruments:
sym = item.get( "sym" )
fo_contracts = item.get( "fo_dt" , [])
if sym and fo_contracts:
# First contract = current month lot size
lot_map[sym] = fo_contracts[ 0 ].get( "ls" )
return lot_map
Next Expiry Fetching
def fetch_next_expiry ( build_id ):
"""Fetch F&O expiry calendar."""
expiry_map = {}
url = f "https://dhan.co/_next/data/ { build_id } /fno-expiry-calendar.json"
r = requests.get(url, headers = headers, timeout = 15 )
data = r.json()
expiry_raw = data.get( "pageProps" , {}).get( "expiryData" , {}).get( "data" , [])
from datetime import datetime
today = datetime.now().strftime( "%Y-%m- %d " )
for exchange_data in expiry_raw:
for exp_group in exchange_data.get( "exps" , []):
for item in exp_group.get( "explst" , []):
sym = item.get( "symbolName" )
exp_date = item.get( "expdate" )
if sym and exp_date and exp_date >= today:
# Keep nearest future expiry
if sym not in expiry_map or exp_date < expiry_map[sym]:
expiry_map[sym] = exp_date
return expiry_map
4. process_market_breadth.py
Purpose: Adds market breadth metrics and relative strength ratings.
Fields Added:
Sector-wise advance/decline ratios
Market breadth percentages
Relative strength rating (RSR)
5. process_historical_market_breadth.py
Purpose: Generates historical market breadth time-series data for charts.
Output: market_breadth.json.gz (separate file, not injected into master JSON)
6. add_corporate_events.py (MUST BE LAST)
Purpose: Adds event markers, recent announcements, and news feed. Must run last because it aggregates data from all prior enrichment.
Dependencies:
all_stocks_fundamental_analysis.json
upcoming_corporate_actions.json
company_filings/*.json
all_company_announcements.json
nse_asm_list.json, nse_gsm_list.json
bulk_block_deals.json
incremental_price_bands.json
market_news/*.json
Fields Added:
Pipe-separated list of event icons with labels (e.g., ”💸: Dividend (15-Mar) | 📊: Results Recently Out”)
Top 5 regulatory announcements with Date, Headline, URL
Top 5 market news items with Title, Sentiment, Date
Event Icon Mapping
def add_event ( sym , event_str ):
if sym not in refined_map:
refined_map[sym] = []
if event_str not in refined_map[sym]:
refined_map[sym].append(event_str)
# Surveillance
if "LTASM" in stage:
add_event(sym, "⭐: LTASM" )
elif "STASM" in stage:
add_event(sym, "⭐: STASM" )
# Corporate Actions
if "QUARTERLY" in etype and within_14_days:
add_event(sym, f "⏰: Results ( { date_str } )" )
elif "DIVIDEND" in etype:
add_event(sym, f "💸: Dividend ( { date_str } )" )
elif "BONUS" in etype:
add_event(sym, f "🎁: Bonus ( { date_str } )" )
elif "SPLIT" in etype:
add_event(sym, f "✂️: Split ( { date_str } )" )
elif "RIGHTS" in etype:
add_event(sym, f "📈: Rights ( { date_str } )" )
# Circuit Revisions
if t_band < f_band:
add_event(sym, "#: -ve Circuit Limit Revision" )
elif t_band > f_band:
add_event(sym, "#: +ve Circuit Limit Revision" )
# Deals
if "BLOCK" in deal_type or "BULK" in deal_type:
add_event(sym, "📦: Block Deal" )
# Insider Trading
if is_insider_trade(filing):
add_event(sym, "🔑: Insider Trading" )
# Recent Results
if "results are out" in event.lower():
add_event(sym, "📊: Results Recently Out" )
Insider Trading Detection
def detect_insider_trading ( filing_item ):
desc = (item.get( "descriptor" ) or "" ).lower()
caption = (item.get( "caption" ) or "" ).lower()
cat = (item.get( "cat" ) or "" ).lower()
body = (item.get( "news_body" ) or "" ).lower()
full_text = f " { desc } { caption } { cat } { body } "
# Keywords indicating insider trading
trade_keywords = [
"regulation 7(2)" , "reg 7(2)" , "inter-se transfer" ,
"form c" , "continual disclosure"
]
if any (k in full_text for k in trade_keywords):
return True
if ( "insider trading" in full_text or "sebi (pit)" in full_text):
# Exclude trading window closures
if "trading window" not in full_text and "closure" not in full_text:
return True
return False
Announcement Deduplication
# Merge company_filings and all_company_announcements
news_map[sym] = []
# Add from filings (top 5)
for filing in filings[: 5 ]:
headline = filing.get( "caption" ) or filing.get( "descriptor" )
news_map[sym].append({
"Date" : filing.get( "news_date" ),
"Headline" : headline,
"URL" : filing.get( "file_url" ) or "N/A"
})
# Add from announcements (if not duplicate)
for ann in announcements:
event_text = ann.get( "Event" )
exists = any (event_text.lower() in item[ "Headline" ].lower()
for item in news_map[sym])
if not exists:
news_map[sym].insert( 0 , {
"Date" : ann.get( "Date" ),
"Headline" : event_text,
"URL" : "N/A"
})
# Keep only top 5
news_map[sym] = news_map[sym][: 5 ]
Final Update
for stock in master_data:
sym = stock.get( "Symbol" )
# 1. Event Markers
events = refined_map.get(sym, [])
stock[ "Event Markers" ] = " | " .join(events) if events else "N/A"
# 2. Recent Announcements (Regulatory)
stock[ "Recent Announcements" ] = news_map.get(sym, [])[: 5 ]
# 3. News Feed (Market/Media with Sentiment)
stock[ "News Feed" ] = news_feed_map.get(sym, [])
Final Output Example
{
"Symbol" : "RELIANCE" ,
"Name" : "Reliance Industries Ltd" ,
// ... Base fields from Phase 3 ...
// Phase 4 Additions
"30 Days Average Rupee Volume(Cr.)" : 1234.5 ,
"RVOL" : 1.23 ,
"% from ATH" : -12.4 ,
"Gap Up %" : 0.8 ,
"Circuit Limit" : "10%" ,
"Quarterly Results Date" : "2026-02-15" ,
"Returns since Earnings(%)" : 5.6 ,
"Max Returns since Earnings(%)" : 8.9 ,
"F&O" : "Yes" ,
"Lot Size" : 250 ,
"Next Expiry" : "2026-03-26" ,
"Event Markers" : "💸: Dividend (15-Mar) | 📊: Results Recently Out" ,
"Recent Announcements" : [
{
"Date" : "2026-02-15 18:30:00" ,
"Headline" : "Financial Results - Q3 FY25" ,
"URL" : "https://..."
}
],
"News Feed" : [
{
"Title" : "Reliance Q3 Results Beat Estimates" ,
"Sentiment" : "positive" ,
"Date" : 1709123456789
}
]
}
Script Dependencies Avg Time Fields Added advanced_metrics_processor.py ohlcv_data/ 10-15s 15 metrics process_earnings_performance.py filings/, ohlcv_data/ 8-12s 3 fields enrich_fno_data.py APIs 5-8s 3 fields process_market_breadth.py ohlcv_data/ 15-20s 5 fields process_historical_market_breadth.py ohlcv_data/ 20-30s Separate file add_corporate_events.py All Phase 2 outputs 10-15s 3 complex fields Total Phase 4 - 1-2 min ~30 fields
Next Steps
Back to Master Runner Return to pipeline overview and execution guide