Skip to main content
The EDL Pipeline is designed for high-throughput data fetching. This guide covers performance optimization techniques to reduce execution time from 34 minutes to 3-6 minutes.

Performance Overview

Execution Time Breakdown

ConfigurationTimeBottleneck
Full (first run)~34 minOHLCV fetch (25-35 min)
Daily update~6-9 minFundamental data (2-3 min)
F&O only (207 stocks)~3 minReduced dataset
Without OHLCV~4 minNetwork I/O
The OHLCV fetcher has smart incremental updates: first run takes 25-35 min, but daily updates take only 2-5 min.

Threading Optimization

Current Thread Counts

Each script uses optimized thread counts based on API rate limits:
ScriptThreadsRationale
fetch_company_filings.py20Dual endpoints, moderate load
fetch_new_announcements.py40Simple endpoint, high throughput
fetch_advanced_indicators.py50Fast API, CPU-bound parsing
fetch_market_news.py15Heavy response payload
fetch_all_ohlcv.py15Large data transfer
fetch_indices_ohlcv.py60Lightweight index data

Increasing Thread Counts

For faster execution on high-bandwidth connections:
# Example: fetch_company_filings.py (line 11)
MAX_THREADS = 20  # Default

# Increase to 40 for 2x speedup (requires testing for rate limits)
MAX_THREADS = 40
Testing methodology:
  1. Increase threads by 10 (e.g., 20 → 30)
  2. Run script and monitor error rate
  3. If error rate < 5%, increase further
  4. If error rate > 10%, reduce threads
Excessive threading can trigger rate limiting (HTTP 429 errors) and actually slow down execution due to retries.

Optimal Thread Count Formula

import multiprocessing

# Conservative: 2x CPU cores
MAX_THREADS = multiprocessing.cpu_count() * 2

# Aggressive: 4x CPU cores (for I/O-bound tasks)
MAX_THREADS = multiprocessing.cpu_count() * 4

# Dynamic based on dataset size
total_stocks = len(stock_list)
MAX_THREADS = min(total_stocks // 10, 50)  # Cap at 50

Batching Strategies

Fundamental Data Batching

The fundamental data fetcher uses batching to reduce API calls:
# From fetch_fundamental_data.py
BATCH_SIZE = 100  # Fetch 100 ISINs per API call

# Increase batch size for faster execution (if API supports it)
BATCH_SIZE = 200  # 2x fewer API calls
Trade-offs:
  • Larger batches = fewer API calls, but higher timeout risk
  • Smaller batches = more API calls, but better error isolation

Dynamic Batching

def get_optimal_batch_size(total_items, max_threads):
    """Calculate batch size to minimize API calls while maintaining parallelism."""
    if total_items < 500:
        return 50
    elif total_items < 2000:
        return 100
    else:
        return 200

BATCH_SIZE = get_optimal_batch_size(len(stock_list), MAX_THREADS)

Timeout Tuning

Per-Request Timeouts

Default timeout is 10-30 seconds depending on endpoint:
# Fast endpoints (indicators, announcements)
response = requests.post(url, json=payload, headers=headers, timeout=10)

# Slow endpoints (fundamental data, OHLCV)
response = requests.post(url, json=payload, headers=headers, timeout=30)
Optimization: Reduce timeouts for fast networks
# Fast network: reduce to 5 seconds
timeout=5

# Slow/unreliable network: increase to 60 seconds
timeout=60

Script-level Timeouts

The master runner has a 30-minute timeout per script:
# run_full_pipeline.py (line 117)
result = subprocess.run(
    [sys.executable, script_path],
    cwd=BASE_DIR,
    text=True,
    timeout=1800  # 30 minutes
)
OHLCV optimization: Increase for first runs
# For fetch_all_ohlcv.py on first run
if script_name == "fetch_all_ohlcv.py":
    timeout = 3600  # 1 hour for first-time full history
else:
    timeout = 1800

Caching Strategies

Skip Already-Fetched Data

Many scripts support FORCE_UPDATE flags:
# fetch_company_filings.py (line 12)
FORCE_UPDATE = True  # Always re-fetch

# Change to False for incremental updates
FORCE_UPDATE = False  # Skip existing files
Effect: 20x speedup for incremental runs (10 min → 30 sec)

OHLCV Smart Caching

The OHLCV fetcher automatically caches:
# Pseudocode from fetch_all_ohlcv.py
if csv_exists:
    last_date = read_last_date_from_csv()
    fetch_from = last_date + 1 day
else:
    fetch_from = 1976-10-31  # Full history
Performance:
  • First run: 25-35 min (full history)
  • Daily update: 2-5 min (only new dates)

Intermediate File Caching

Disable cleanup during development:
# run_full_pipeline.py (line 71)
CLEANUP_INTERMEDIATE = False  # Keep all files
Then selectively re-run failed scripts:
# Re-run only enrichment without re-fetching base data
python3 advanced_metrics_processor.py
python3 add_corporate_events.py

Network Optimization

Connection Pooling

Reuse HTTP connections to reduce overhead:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Create session with connection pooling
session = requests.Session()
adapter = HTTPAdapter(
    pool_connections=20,  # Number of connection pools
    pool_maxsize=50,      # Connections per pool
    max_retries=Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[500, 502, 503, 504]
    )
)
session.mount('http://', adapter)
session.mount('https://', adapter)

# Use session instead of requests.post()
response = session.post(url, json=payload, headers=headers, timeout=10)
Speedup: ~15% faster due to TCP connection reuse

DNS Caching

import socket
import functools

# Cache DNS lookups
@functools.lru_cache(maxsize=128)
def cached_getaddrinfo(*args, **kwargs):
    return socket.getaddrinfo(*args, **kwargs)

socket.getaddrinfo = cached_getaddrinfo

Disk I/O Optimization

Use SSDs

OHLCV data writes ~2,775 CSV files:
  • HDD: ~10 min write time
  • SSD: ~2 min write time

Compress on the Fly

Reduce disk writes by compressing immediately:
import gzip

# Instead of writing JSON then compressing
with open('data.json', 'w') as f:
    json.dump(data, f)
# Then: gzip data.json

# Write compressed directly
with gzip.open('data.json.gz', 'wt') as f:
    json.dump(data, f)
Speedup: ~30% faster final output generation

Memory Optimization

Stream Processing for Large Files

For fundamental data (35 MB JSON):
import ijson

# Instead of loading entire file
with open('fundamental_data.json', 'r') as f:
    data = json.load(f)  # 35 MB into memory

# Stream process
with open('fundamental_data.json', 'rb') as f:
    for item in ijson.items(f, 'item'):
        process(item)  # Process one at a time
Benefit: Constant memory usage vs. 35 MB spike

Garbage Collection Tuning

import gc

# Disable GC during bulk processing
gc.disable()

# Process 2,775 stocks
for stock in stock_list:
    process(stock)

# Re-enable and force collection
gc.enable()
gc.collect()
Speedup: ~10% for CPU-bound processing loops

Dataset Reduction

Filter to Liquid Stocks Only

Reduce from 2,775 to 500 stocks:
# fetch_dhan_data.py payload
"params": [
    {"field": "OgInst", "op": "", "val": "ES"},
    {"field": "Exch", "op": "", "val": "NSE"},
    {"field": "Mcap", "op": "GT", "val": "5000"},  # > ₹5,000 Cr.
    {"field": "volume", "op": "GT", "val": "500000"}  # > 500K volume
]
Effect: ~60% faster pipeline (9 min → 3-4 min)

Skip Optional Data

# run_full_pipeline.py (line 67)
FETCH_OPTIONAL = False  # Skip indices, ETFs
Speedup: ~1 min saved

Parallel Script Execution

Run independent Phase 2 scripts in parallel:
import subprocess
import concurrent.futures

phase2_scripts = [
    "fetch_company_filings.py",
    "fetch_new_announcements.py",
    "fetch_market_news.py",
    "fetch_corporate_actions.py",
]

def run_script(script):
    return subprocess.run(["python3", script], cwd=BASE_DIR)

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(run_script, s) for s in phase2_scripts]
    concurrent.futures.wait(futures)
Speedup: Phase 2 from 5 min → 2 min (if CPU/network allows)
Parallel execution may overwhelm the API with concurrent requests. Monitor error rates.

Benchmarking

Measure Script Performance

# Time individual scripts
time python3 fetch_company_filings.py

# Profile Python code
python3 -m cProfile -o profile.stats fetch_company_filings.py
python3 -m pstats profile.stats
# (pstats) sort time
# (pstats) stats 10

Full Pipeline Benchmark

# Add to run_full_pipeline.py
import time

phase_times = {}

start = time.time()
# ... run Phase 1 ...
phase_times['Phase 1'] = time.time() - start

start = time.time()
# ... run Phase 2 ...
phase_times['Phase 2'] = time.time() - start

# Print breakdown
for phase, duration in phase_times.items():
    print(f"{phase}: {duration:.1f}s ({duration/60:.1f} min)")

Performance Checklist

1

Enable OHLCV smart caching

Set FETCH_OHLCV = True and ensure incremental updates are working (check for existing CSV files).
2

Optimize thread counts

Test with +10 threads per script, monitor error rates, adjust accordingly.
3

Use SSD storage

Move ohlcv_data/ directory to SSD for 5x faster writes.
4

Filter dataset

Add market cap/volume filters to reduce from 2,775 to 500-1,000 stocks.
5

Skip optional scripts

Set FETCH_OPTIONAL = False unless you need indices/ETF data.
6

Enable connection pooling

Implement session-based requests with connection pooling in high-frequency scripts.

Expected Results

OptimizationTime SavedEffort
OHLCV incremental (after first run)-25 minAutomatic
Thread count +50%-2 minEasy
Dataset filter (500 stocks)-4 minMedium
Skip optional data-1 minEasy
Connection pooling-1 minMedium
SSD storage-3 minHardware
Total-36 minMixed
Best case: First run 34 min → Daily update 3 min (91% faster)

Next Steps

Error Handling

Handle failures from aggressive threading/batching

Incremental Updates

Set up automated daily updates to leverage caching

Build docs developers (and LLMs) love