The EDL Pipeline is designed for high-throughput data fetching. This guide covers performance optimization techniques to reduce execution time from 34 minutes to 3-6 minutes.
Execution Time Breakdown
Configuration Time Bottleneck Full (first run) ~34 min OHLCV fetch (25-35 min) Daily update ~6-9 min Fundamental data (2-3 min) F&O only (207 stocks) ~3 min Reduced dataset Without OHLCV ~4 min Network I/O
The OHLCV fetcher has smart incremental updates: first run takes 25-35 min, but daily updates take only 2-5 min.
Threading Optimization
Current Thread Counts
Each script uses optimized thread counts based on API rate limits:
Script Threads Rationale fetch_company_filings.py20 Dual endpoints, moderate load fetch_new_announcements.py40 Simple endpoint, high throughput fetch_advanced_indicators.py50 Fast API, CPU-bound parsing fetch_market_news.py15 Heavy response payload fetch_all_ohlcv.py15 Large data transfer fetch_indices_ohlcv.py60 Lightweight index data
Increasing Thread Counts
For faster execution on high-bandwidth connections:
# Example: fetch_company_filings.py (line 11)
MAX_THREADS = 20 # Default
# Increase to 40 for 2x speedup (requires testing for rate limits)
MAX_THREADS = 40
Testing methodology :
Increase threads by 10 (e.g., 20 → 30)
Run script and monitor error rate
If error rate < 5%, increase further
If error rate > 10%, reduce threads
Excessive threading can trigger rate limiting (HTTP 429 errors) and actually slow down execution due to retries.
import multiprocessing
# Conservative: 2x CPU cores
MAX_THREADS = multiprocessing.cpu_count() * 2
# Aggressive: 4x CPU cores (for I/O-bound tasks)
MAX_THREADS = multiprocessing.cpu_count() * 4
# Dynamic based on dataset size
total_stocks = len (stock_list)
MAX_THREADS = min (total_stocks // 10 , 50 ) # Cap at 50
Batching Strategies
Fundamental Data Batching
The fundamental data fetcher uses batching to reduce API calls:
# From fetch_fundamental_data.py
BATCH_SIZE = 100 # Fetch 100 ISINs per API call
# Increase batch size for faster execution (if API supports it)
BATCH_SIZE = 200 # 2x fewer API calls
Trade-offs :
Larger batches = fewer API calls, but higher timeout risk
Smaller batches = more API calls, but better error isolation
Dynamic Batching
def get_optimal_batch_size ( total_items , max_threads ):
"""Calculate batch size to minimize API calls while maintaining parallelism."""
if total_items < 500 :
return 50
elif total_items < 2000 :
return 100
else :
return 200
BATCH_SIZE = get_optimal_batch_size( len (stock_list), MAX_THREADS )
Timeout Tuning
Per-Request Timeouts
Default timeout is 10-30 seconds depending on endpoint:
# Fast endpoints (indicators, announcements)
response = requests.post(url, json = payload, headers = headers, timeout = 10 )
# Slow endpoints (fundamental data, OHLCV)
response = requests.post(url, json = payload, headers = headers, timeout = 30 )
Optimization : Reduce timeouts for fast networks
# Fast network: reduce to 5 seconds
timeout = 5
# Slow/unreliable network: increase to 60 seconds
timeout = 60
Script-level Timeouts
The master runner has a 30-minute timeout per script:
# run_full_pipeline.py (line 117)
result = subprocess.run(
[sys.executable, script_path],
cwd = BASE_DIR ,
text = True ,
timeout = 1800 # 30 minutes
)
OHLCV optimization : Increase for first runs
# For fetch_all_ohlcv.py on first run
if script_name == "fetch_all_ohlcv.py" :
timeout = 3600 # 1 hour for first-time full history
else :
timeout = 1800
Caching Strategies
Skip Already-Fetched Data
Many scripts support FORCE_UPDATE flags:
# fetch_company_filings.py (line 12)
FORCE_UPDATE = True # Always re-fetch
# Change to False for incremental updates
FORCE_UPDATE = False # Skip existing files
Effect : 20x speedup for incremental runs (10 min → 30 sec)
OHLCV Smart Caching
The OHLCV fetcher automatically caches:
# Pseudocode from fetch_all_ohlcv.py
if csv_exists:
last_date = read_last_date_from_csv()
fetch_from = last_date + 1 day
else :
fetch_from = 1976 - 10 - 31 # Full history
Performance :
First run: 25-35 min (full history)
Daily update: 2-5 min (only new dates)
Disable cleanup during development:
# run_full_pipeline.py (line 71)
CLEANUP_INTERMEDIATE = False # Keep all files
Then selectively re-run failed scripts:
# Re-run only enrichment without re-fetching base data
python3 advanced_metrics_processor.py
python3 add_corporate_events.py
Network Optimization
Connection Pooling
Reuse HTTP connections to reduce overhead:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Create session with connection pooling
session = requests.Session()
adapter = HTTPAdapter(
pool_connections = 20 , # Number of connection pools
pool_maxsize = 50 , # Connections per pool
max_retries = Retry(
total = 3 ,
backoff_factor = 0.5 ,
status_forcelist = [ 500 , 502 , 503 , 504 ]
)
)
session.mount( 'http://' , adapter)
session.mount( 'https://' , adapter)
# Use session instead of requests.post()
response = session.post(url, json = payload, headers = headers, timeout = 10 )
Speedup : ~15% faster due to TCP connection reuse
DNS Caching
import socket
import functools
# Cache DNS lookups
@functools.lru_cache ( maxsize = 128 )
def cached_getaddrinfo ( * args , ** kwargs ):
return socket.getaddrinfo( * args, ** kwargs)
socket.getaddrinfo = cached_getaddrinfo
Disk I/O Optimization
Use SSDs
OHLCV data writes ~2,775 CSV files:
HDD : ~10 min write time
SSD : ~2 min write time
Compress on the Fly
Reduce disk writes by compressing immediately:
import gzip
# Instead of writing JSON then compressing
with open ( 'data.json' , 'w' ) as f:
json.dump(data, f)
# Then: gzip data.json
# Write compressed directly
with gzip.open( 'data.json.gz' , 'wt' ) as f:
json.dump(data, f)
Speedup : ~30% faster final output generation
Memory Optimization
Stream Processing for Large Files
For fundamental data (35 MB JSON):
import ijson
# Instead of loading entire file
with open ( 'fundamental_data.json' , 'r' ) as f:
data = json.load(f) # 35 MB into memory
# Stream process
with open ( 'fundamental_data.json' , 'rb' ) as f:
for item in ijson.items(f, 'item' ):
process(item) # Process one at a time
Benefit : Constant memory usage vs. 35 MB spike
Garbage Collection Tuning
import gc
# Disable GC during bulk processing
gc.disable()
# Process 2,775 stocks
for stock in stock_list:
process(stock)
# Re-enable and force collection
gc.enable()
gc.collect()
Speedup : ~10% for CPU-bound processing loops
Dataset Reduction
Filter to Liquid Stocks Only
Reduce from 2,775 to 500 stocks:
# fetch_dhan_data.py payload
"params" : [
{ "field" : "OgInst" , "op" : "" , "val" : "ES" },
{ "field" : "Exch" , "op" : "" , "val" : "NSE" },
{ "field" : "Mcap" , "op" : "GT" , "val" : "5000" }, # > ₹5,000 Cr.
{ "field" : "volume" , "op" : "GT" , "val" : "500000" } # > 500K volume
]
Effect : ~60% faster pipeline (9 min → 3-4 min)
Skip Optional Data
# run_full_pipeline.py (line 67)
FETCH_OPTIONAL = False # Skip indices, ETFs
Speedup : ~1 min saved
Parallel Script Execution
Run independent Phase 2 scripts in parallel:
import subprocess
import concurrent.futures
phase2_scripts = [
"fetch_company_filings.py" ,
"fetch_new_announcements.py" ,
"fetch_market_news.py" ,
"fetch_corporate_actions.py" ,
]
def run_script ( script ):
return subprocess.run([ "python3" , script], cwd = BASE_DIR )
with concurrent.futures.ThreadPoolExecutor( max_workers = 4 ) as executor:
futures = [executor.submit(run_script, s) for s in phase2_scripts]
concurrent.futures.wait(futures)
Speedup : Phase 2 from 5 min → 2 min (if CPU/network allows)
Parallel execution may overwhelm the API with concurrent requests. Monitor error rates.
Benchmarking
# Time individual scripts
time python3 fetch_company_filings.py
# Profile Python code
python3 -m cProfile -o profile.stats fetch_company_filings.py
python3 -m pstats profile.stats
# (pstats) sort time
# (pstats) stats 10
Full Pipeline Benchmark
# Add to run_full_pipeline.py
import time
phase_times = {}
start = time.time()
# ... run Phase 1 ...
phase_times[ 'Phase 1' ] = time.time() - start
start = time.time()
# ... run Phase 2 ...
phase_times[ 'Phase 2' ] = time.time() - start
# Print breakdown
for phase, duration in phase_times.items():
print ( f " { phase } : { duration :.1f} s ( { duration / 60 :.1f} min)" )
Enable OHLCV smart caching
Set FETCH_OHLCV = True and ensure incremental updates are working (check for existing CSV files).
Optimize thread counts
Test with +10 threads per script, monitor error rates, adjust accordingly.
Use SSD storage
Move ohlcv_data/ directory to SSD for 5x faster writes.
Filter dataset
Add market cap/volume filters to reduce from 2,775 to 500-1,000 stocks.
Skip optional scripts
Set FETCH_OPTIONAL = False unless you need indices/ETF data.
Enable connection pooling
Implement session-based requests with connection pooling in high-frequency scripts.
Expected Results
Optimization Time Saved Effort OHLCV incremental (after first run) -25 min Automatic Thread count +50% -2 min Easy Dataset filter (500 stocks) -4 min Medium Skip optional data -1 min Easy Connection pooling -1 min Medium SSD storage -3 min Hardware Total -36 min Mixed
Best case : First run 34 min → Daily update 3 min (91% faster)
Next Steps
Error Handling Handle failures from aggressive threading/batching
Incremental Updates Set up automated daily updates to leverage caching