Overview
The GTM Research Engine achieves 3-5x performance improvements over sequential execution using async worker pools, semaphore-based rate limiting, and parallel domain analysis. Every stage of the pipeline is optimized for concurrent execution.
Architecture
Async Foundation
The entire pipeline is built on Python’s asyncio for non-blocking I/O:
# From pipeline.py:136
async def run ( self ) -> Tuple[List[CompanyResearchResult], int ]:
"""
Execute research pipeline with batch LLM analysis.
"""
tasks: List[asyncio.Task[Tuple[ str , SourceResult]]] = []
for domain in self .company_domains:
for strategy in self .strategies:
tasks.append(
asyncio.create_task(
self ._execute_one(domain, strategy, self .search_depth)
)
)
Three-Tier Parallelization
Source-Level Parallelism - Multiple sources query simultaneously
Domain-Level Parallelism - All domains researched concurrently
Strategy-Level Parallelism - Multiple strategies execute per source
Semaphore Pools
Each data source has its own semaphore pool to control concurrency:
# From pipeline.py:44-48
self .source_pools: Dict[ str , asyncio.Semaphore] = {
"google_search" : asyncio.Semaphore(max_parallel_searches),
"jobs_search" : asyncio.Semaphore(max_parallel_searches),
"news_search" : asyncio.Semaphore(max_parallel_searches),
}
self .default_pool = asyncio.Semaphore( max (max_parallel_searches // 4 , 2 ))
How Semaphores Work
Semaphores limit concurrent operations to prevent overwhelming APIs:
# From pipeline.py:94-101
source_pool = self .source_pools.get(strategy.channel, self .default_pool)
async with source_pool:
try :
result = await source.fetch(
domain = domain, query = query, search_depth = search_depth
)
# ... handle result
Example: With max_parallel_searches=20:
Up to 20 Google searches run simultaneously
Up to 20 news searches run simultaneously
Up to 20 job searches run simultaneously
Total: Up to 60 concurrent API calls (20 per source)
Adjusting Concurrency
{
"max_parallel_searches" : 10 // Conservative
}
Vs.
{
"max_parallel_searches" : 50 // Aggressive
}
Higher concurrency increases speed but may hit API rate limits. Start with 20 and adjust based on your API tier.
Task Creation & Execution
Creating All Tasks Upfront
Tasks are created for every domain × strategy combination:
# From pipeline.py:145-152
tasks: List[asyncio.Task[Tuple[ str , SourceResult]]] = []
for domain in self .company_domains:
for strategy in self .strategies:
tasks.append(
asyncio.create_task(
self ._execute_one(domain, strategy, self .search_depth)
)
)
Example calculation:
10 domains
9 strategies per domain
= 90 concurrent tasks
All 90 tasks execute in parallel, limited only by semaphore pools.
Processing Results as They Complete
Results are processed immediately using asyncio.as_completed():
# From pipeline.py:158-164
for coro in asyncio.as_completed(tasks):
domain, res = await coro
completed_count += 1
if res.ok and res.evidences:
domain_to_evidence[domain].extend(res.evidences)
This approach:
✅ Processes fastest queries first
✅ Doesn’t wait for slowest queries
✅ Enables real-time streaming
✅ Maximizes throughput
Parallel Domain Analysis
After evidence collection, each domain is analyzed in parallel:
# From pipeline.py:166-173
domain_tasks = [
asyncio.create_task( self .analyze_domain(domain, evidences))
for domain, evidences in domain_to_evidence.items()
]
results: List[CompanyResearchResult] = []
for coro in asyncio.as_completed(domain_tasks):
result = await coro
results.append(result)
CPU-Intensive Operations
LLM analysis and TF-IDF processing run in thread pools to avoid blocking:
# From pipeline.py:125
await asyncio.to_thread(redis_client.clear_cache, domain)
# From jobs_search.py:84-85
# Run CPU-intensive work in thread pool
return await asyncio.to_thread(_compute_similarities)
asyncio.to_thread() runs blocking operations in a separate thread, keeping the event loop free for I/O.
Circuit Breakers
Circuit breakers prevent cascading failures when sources are down:
# From pipeline.py:59-65
self .breakers: Dict[ str , CircuitBreaker] = {
channel: CircuitBreaker(
failure_threshold = settings.circuit_breaker_failures,
reset_timeout_seconds = settings.circuit_breaker_reset_seconds,
)
for channel in self .sources.keys()
}
How Circuit Breakers Work
# From pipeline.py:83-92
breaker = self .breakers[strategy.channel]
if not breaker.allow_request():
return domain, SourceResult(
channel = strategy.channel,
domain = domain,
query = query,
evidences = [],
ok = False ,
error = "circuit open"
)
If a source fails repeatedly, the breaker opens and stops sending requests until the timeout expires.
The pipeline tracks detailed performance metrics:
# From pipeline.py:25-30
self .metrics = RunMetrics( start_time = time.perf_counter())
# Track successful queries
self .metrics.record_query( 1 )
# Track failures
self .metrics.record_failure( 1 )
Returned in response:
{
"search_performance" : {
"queries_per_second" : 12.5 ,
"failed_requests" : 2
},
"processing_time_ms" : 3420
}
Streaming Optimization
The streaming pipeline uses optimized event frequency:
# From pipeline.py:256-278
progress_updates = [ 25 , 50 , 75 , 100 ] # Only send at milestones
for coro in asyncio.as_completed(tasks):
domain, res = await coro
completed_count += 1
if res.ok and res.evidences:
domain_to_evidence[domain].extend(res.evidences)
progress_percent = int ((completed_count / total_planned) * 100 )
if progress_percent in progress_updates:
progress_updates.remove(progress_percent)
yield orjson.dumps({
"type" : "evidence_progress" ,
"progress" : progress_percent,
"completed" : completed_count,
"total" : total_planned,
"domains_with_evidence" : len (domain_to_evidence),
"timestamp" : time.time()
}).decode()
This reduces event noise from potentially 100+ events to just 4 progress updates.
Rate Limiting
Rate limiting is applied at multiple levels:
1. Source-Level Semaphores
Control concurrent requests per source (covered above).
2. Decorator-Based Rate Limiting
API-specific rate limiting via decorators:
# From google_search.py:44
@rate_limited ( "tavily" )
async def fetch (
self , domain : str , query : str , search_depth : str
) -> SourceResult:
# ... API call
# From news_search.py:42
@rate_limited ( "newsapi" )
async def fetch (
self , domain : str , query : str , search_depth : str
) -> SourceResult:
# ... API call
3. LLM Rate Limiting
# From query_generation.py:176
@rate_limited ( "gemini" )
async def _generate_with_llm (
self , research_goal : str , search_depth : str
) -> List[QueryStrategy]:
# ... Gemini API call
Sequential vs. Parallel
Sequential execution:
10 domains × 9 strategies × 1.5s per query = 135 seconds
Parallel execution (max_parallel_searches=20):
90 total queries / 20 concurrent = ~5 batches
5 batches × 1.5s per batch = ~7.5 seconds
Speedup: 18x faster!
Realistic Example
Actual performance with network latency and LLM analysis:
{
"total_companies" : 10 ,
"search_strategies_generated" : 9 ,
"total_searches_executed" : 90 ,
"processing_time_ms" : 3420 , // ~3.4 seconds total
"search_performance" : {
"queries_per_second" : 26.3 ,
"failed_requests" : 3
}
}
Best Practices
Begin with max_parallel_searches=20 and monitor:
API rate limit errors
Success/failure ratio
Overall throughput
Increase gradually if performance is stable.
Handle Failures Gracefully
The pipeline continues even if individual queries fail: # From pipeline.py:106-108
if result.ok and result.evidences:
self .metrics.record_query( 1 )
breaker.record_success()
else :
self .metrics.record_failure( 1 )
breaker.record_failure()
Check failed_requests in metrics to assess data quality.
Optimize for Your Use Case
Different scenarios need different settings: High volume, low latency: {
"search_depth" : "quick" ,
"max_parallel_searches" : 50
}
High quality, comprehensive: {
"search_depth" : "comprehensive" ,
"max_parallel_searches" : 15
}
Monitor Thread Pool Usage
CPU-intensive operations use thread pools: # From jobs_search.py:45-85
def _compute_similarities ():
# CPU-intensive TF-IDF computation
vectorizer = self ._get_tfidf_model()
tfidf_matrix = vectorizer.fit_transform(all_texts)
similarities = cosine_similarity(search_vector, job_vectors)[ 0 ]
return matches
return await asyncio.to_thread(_compute_similarities)
The default thread pool has limited workers. For heavy CPU workloads, consider increasing pool size.
Use Streaming for Long Operations
For batches with 50+ domains, use streaming to get incremental results: POST /api/research/batch/stream
This provides visibility into progress and early results while processing continues.
Concurrency Limits
Per-Source Limits
max_parallel_searches: int # Per-source concurrency
max_parallel_searches=20 → Up to 60 total concurrent requests (20 × 3 sources)
max_parallel_searches=50 → Up to 150 total concurrent requests (50 × 3 sources)
API Provider Limits
Check your API tier limits:
Tavily : Varies by plan (typically 100-1000/min)
NewsAPI : 1,000 requests/day (free tier)
Greenhouse : No auth required, but respect fair use
Gemini : Varies by plan (typically 60 RPM free tier)
Set max_parallel_searches lower than your strictest API limit to avoid errors.
Check Metrics
Analyze the performance metrics:
{
"processing_time_ms" : 15000 , // 15 seconds - slower than expected
"search_performance" : {
"queries_per_second" : 3.2 , // Low QPS indicates bottleneck
"failed_requests" : 45 // High failures suggest rate limiting
}
}
Diagnosis: High failures + low QPS → Reduce max_parallel_searches
Enable Timing Logs
The routes include timing logs:
# From routes.py:27-35
start_time = time.time()
query_generator = QueryGenerator()
strategies = await query_generator.generate_strategies(
research_goal = payload.research_goal,
search_depth = payload.search_depth,
)
end_time = time.time()
print ( f "Query generation time: { end_time - start_time } seconds" )
Monitor these logs to identify slow stages.
Advanced: Custom Semaphore Configuration
For advanced use cases, you can implement custom semaphore configurations:
# Custom configuration per source
self .source_pools = {
"google_search" : asyncio.Semaphore( 50 ), # High-tier Tavily account
"news_search" : asyncio.Semaphore( 10 ), # Free tier NewsAPI
"jobs_search" : asyncio.Semaphore( 30 ), # No auth, but be respectful
}
This allows fine-tuning based on specific API limitations.
Example: Scaling to 1000 Domains
Optimal configuration for large batches:
{
"research_goal" : "Find SaaS companies" ,
"company_domains" : [ "... 1000 domains ..." ],
"search_depth" : "quick" , // Minimize per-domain cost
"max_parallel_searches" : 30 , // Balance speed vs. rate limits
"confidence_threshold" : 0.75
}
Expected performance:
1000 domains × 6 strategies = 6000 total queries
With 30 concurrent per source = ~90 concurrent total
At ~1.5s per query = ~100-150s total time (~2 minutes)
Vs. sequential: ~2.5 hours!
Real-Time Streaming See how parallel processing enables live progress updates
Multi-Source Research Learn about the data sources being queried in parallel