Skip to main content
The EDL Pipeline includes robust error handling to ensure data integrity and graceful degradation when individual scripts fail.

Error Handling Philosophy

The pipeline follows a fail-forward approach:
  1. Critical failures (Phase 1) halt the pipeline
  2. Non-critical failures (Phase 2+) log errors but continue execution
  3. Enrichment failures (Phase 4) skip problematic stocks but complete the run

Critical vs Non-Critical Failures

Critical Failures (Pipeline Stops)

Phase 1: Core Data
  • fetch_dhan_data.py failure → No master_isin_map.jsonSTOP
  • bulk_market_analyzer.py failure → No base JSON → STOP
# From run_full_pipeline.py (lines 207-212)
results["fetch_dhan_data.py"] = run_script("fetch_dhan_data.py", "Phase 1")

if not results["fetch_dhan_data.py"]:
    print("\n🛑 CRITICAL: fetch_dhan_data.py failed. Cannot continue.")
    print("   This script produces master_isin_map.json which ALL other scripts need.")
    return

Non-Critical Failures (Pipeline Continues)

Phase 2: Enrichment
  • Individual enrichment scripts can fail without stopping the pipeline
  • Example: fetch_market_news.py fails → News fields will be empty, but pipeline completes
# From run_full_pipeline.py (lines 123-126)
if result.returncode == 0:
    print(f"  ✅ {script_name} ({elapsed:.1f}s)")
    return True
else:
    print(f"  ❌ {script_name} FAILED ({elapsed:.1f}s)")
    return True  # Continuing on enrichment errors to finish the job

Error Types and Solutions

1. Network Errors

Symptoms: requests.exceptions.ConnectionError, ReadTimeout, HTTPError Causes:
  • API endpoint temporarily down
  • Network connectivity issues
  • Rate limiting by Dhan/NSE servers
Solutions:
# Example from fetch_fundamental_data.py
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Add retry logic
session = requests.Session()
retry = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

try:
    response = session.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
except requests.exceptions.RequestException as e:
    print(f"Network error: {e}")
    # Continue with next stock or retry

2. Timeout Errors

Symptoms: Script hangs or subprocess.TimeoutExpired Causes:
  • API response delay
  • Large data transfer
  • System resource constraints
Pipeline timeout: 1800 seconds (30 minutes) per script
# From run_full_pipeline.py (lines 112-130)
try:
    result = subprocess.run(
        [sys.executable, script_path],
        cwd=BASE_DIR,
        text=True,
        timeout=1800  # 30 minutes
    )
except subprocess.TimeoutExpired:
    print(f"  ⏰ {script_name} TIMED OUT (>30 min)")
    return False
Fix: Increase timeout for slow scripts
# For fetch_all_ohlcv.py, extend to 3600 seconds (1 hour)
timeout=3600

3. Data Quality Errors

Symptoms: Missing fields, None values, type mismatches Example:
# From bulk_market_analyzer.py (lines 5-9)
def get_float(value_str):
    try:
        return float(value_str)
    except (ValueError, TypeError):
        return 0.0  # Safe fallback
Best Practices:
  • Use defensive .get() instead of direct key access
  • Provide sensible defaults (0 for numbers, "" for strings, [] for arrays)
  • Validate critical fields before processing
# Good: Safe field access
pe = stock.get('P/E', 0)
if pe > 0:
    # Process P/E
    
# Bad: Direct access (throws KeyError if missing)
pe = stock['P/E']  # May crash

4. File I/O Errors

Symptoms: FileNotFoundError, PermissionError, OSError Common Causes:
  • Missing input files (e.g., master_isin_map.json not created)
  • Disk space exhausted
  • Permission issues on output directory
Prevention:
import os

# Check input file exists before processing
if not os.path.exists(INPUT_FILE):
    print(f"Error: {INPUT_FILE} not found. Run fetch_dhan_data.py first.")
    return

# Create output directory if missing
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# Check disk space before writing large files
stat = os.statvfs(BASE_DIR)
free_space_mb = (stat.f_bavail * stat.f_frsize) / (1024 * 1024)
if free_space_mb < 500:  # Less than 500 MB free
    print(f"Warning: Low disk space ({free_space_mb:.1f} MB)")

5. JSON Parsing Errors

Symptoms: json.decoder.JSONDecodeError Causes:
  • Malformed API response
  • Incomplete file write (crashed mid-write)
  • Encoding issues
Handling:
import json

try:
    with open('data.json', 'r') as f:
        data = json.load(f)
except json.JSONDecodeError as e:
    print(f"JSON parsing error: {e}")
    print(f"Line {e.lineno}, Column {e.colno}")
    # Option 1: Skip file
    data = []
    # Option 2: Attempt manual repair
    # Option 3: Re-fetch data

Multi-threaded Error Handling

ThreadPoolExecutor Patterns

Many scripts use threading for parallel API calls:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_with_retry(item, max_retries=3):
    """Fetch with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            # API call
            response = requests.post(url, json=payload, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                print(f"Failed after {max_retries} attempts: {e}")
                return None
            # Exponential backoff: 1s, 2s, 4s
            time.sleep(2 ** attempt)
    return None

# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
    future_to_stock = {
        executor.submit(fetch_with_retry, item): item["Symbol"] 
        for item in stock_list
    }
    
    for future in as_completed(future_to_stock):
        symbol = future_to_stock[future]
        try:
            result = future.result()
            if result:
                success_count += 1
            else:
                error_count += 1
        except Exception as e:
            print(f"Unexpected error for {symbol}: {e}")
            error_count += 1

Logging and Diagnostics

Enable Detailed Logging

Add logging to scripts for better debugging:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Use in scripts
logger.info(f"Processing {symbol}")
logger.warning(f"Missing field 'P/E' for {symbol}")
logger.error(f"API call failed for {symbol}: {error}")

Check Pipeline Logs

# View real-time logs
tail -f pipeline.log

# Search for errors
grep ERROR pipeline.log

# Count failures by type
grep "Network error" pipeline.log | wc -l

Recovery Strategies

Partial Re-runs

If a Phase 2 script fails, you can re-run just that script:
# Re-fetch company filings without re-running full pipeline
python3 fetch_company_filings.py

# Then re-run enrichment phases
python3 advanced_metrics_processor.py
python3 add_corporate_events.py

Checkpoint-based Recovery

Modify scripts to skip already-processed items:
# Example: Skip stocks with existing files
output_path = f"{OUTPUT_DIR}/{symbol}_news.json"
if os.path.exists(output_path) and os.path.getsize(output_path) > 10:
    print(f"Skipping {symbol} (already exists)")
    return "skipped"

# Fetch and save
# ...
Enable checkpoint mode:
# In fetch_company_filings.py (line 12)
FORCE_UPDATE = False  # Skip existing files

Common Troubleshooting Scenarios

Scenario 1: “master_isin_map.json not found”

Cause: fetch_dhan_data.py failed or didn’t run Solution:
python3 fetch_dhan_data.py
# Check output
ls -lh master_isin_map.json

Scenario 2: “Empty fundamental_data.json”

Cause: API endpoint changed or rate limited Solution:
  1. Check API endpoint in fetch_fundamental_data.py
  2. Test API call manually with curl
  3. Add delays between requests

Scenario 3: “Compression failed”

Cause: Disk full or corrupted JSON file Solution:
# Check disk space
df -h

# Validate JSON syntax
jq . all_stocks_fundamental_analysis.json > /dev/null

# Manual compression
gzip -9 all_stocks_fundamental_analysis.json

Scenario 4: “OHLCV data missing dates”

Cause: Market holiday or API gap Solution: OHLCV fetcher auto-fills gaps; verify date range:
import pandas as pd
df = pd.read_csv('ohlcv_data/RELIANCE.csv')
print(df['Date'].min(), df['Date'].max())

Monitoring and Alerts

Email Alerts on Failure

import smtplib
from email.mime.text import MIMEText

def send_alert(subject, body):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = '[email protected]'
    msg['To'] = '[email protected]'
    
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login('user', 'password')
        server.send_message(msg)

# In run_full_pipeline.py
if failed > 0:
    send_alert(
        'Pipeline Failures Detected',
        f'{failed} scripts failed. Check logs.'
    )

Health Check Script

#!/bin/bash
# check_pipeline_health.sh

FILE="all_stocks_fundamental_analysis.json.gz"
MAX_AGE_HOURS=36

if [ ! -f "$FILE" ]; then
    echo "ERROR: Output file missing"
    exit 1
fi

AGE=$(( $(date +%s) - $(stat -c %Y "$FILE") ))
AGE_HOURS=$(( AGE / 3600 ))

if [ $AGE_HOURS -gt $MAX_AGE_HOURS ]; then
    echo "WARNING: Data is $AGE_HOURS hours old"
    exit 1
fi

echo "OK: Data is fresh ($AGE_HOURS hours old)"
exit 0

Next Steps

Performance Tuning

Optimize threading, batching, and timeouts

Incremental Updates

Set up automated daily updates with monitoring

Build docs developers (and LLMs) love