Skip to main content

Overview

The GTM Research Engine achieves 3-5x performance improvements over sequential execution using async worker pools, semaphore-based rate limiting, and parallel domain analysis. Every stage of the pipeline is optimized for concurrent execution.

Architecture

Async Foundation

The entire pipeline is built on Python’s asyncio for non-blocking I/O:
# From pipeline.py:136
async def run(self) -> Tuple[List[CompanyResearchResult], int]:
    """
    Execute research pipeline with batch LLM analysis.
    """
    tasks: List[asyncio.Task[Tuple[str, SourceResult]]] = []
    for domain in self.company_domains:
        for strategy in self.strategies:
            tasks.append(
                asyncio.create_task(
                    self._execute_one(domain, strategy, self.search_depth)
                )
            )

Three-Tier Parallelization

  1. Source-Level Parallelism - Multiple sources query simultaneously
  2. Domain-Level Parallelism - All domains researched concurrently
  3. Strategy-Level Parallelism - Multiple strategies execute per source

Semaphore Pools

Each data source has its own semaphore pool to control concurrency:
# From pipeline.py:44-48
self.source_pools: Dict[str, asyncio.Semaphore] = {
    "google_search": asyncio.Semaphore(max_parallel_searches),   
    "jobs_search": asyncio.Semaphore(max_parallel_searches),
    "news_search": asyncio.Semaphore(max_parallel_searches),
}
self.default_pool = asyncio.Semaphore(max(max_parallel_searches // 4, 2))

How Semaphores Work

Semaphores limit concurrent operations to prevent overwhelming APIs:
# From pipeline.py:94-101
source_pool = self.source_pools.get(strategy.channel, self.default_pool)

async with source_pool:
    try:
        result = await source.fetch(
            domain=domain, query=query, search_depth=search_depth
        )
        # ... handle result
Example: With max_parallel_searches=20:
  • Up to 20 Google searches run simultaneously
  • Up to 20 news searches run simultaneously
  • Up to 20 job searches run simultaneously
  • Total: Up to 60 concurrent API calls (20 per source)

Adjusting Concurrency

{
  "max_parallel_searches": 10  // Conservative
}
Vs.
{
  "max_parallel_searches": 50  // Aggressive
}
Higher concurrency increases speed but may hit API rate limits. Start with 20 and adjust based on your API tier.

Task Creation & Execution

Creating All Tasks Upfront

Tasks are created for every domain × strategy combination:
# From pipeline.py:145-152
tasks: List[asyncio.Task[Tuple[str, SourceResult]]] = []
for domain in self.company_domains:
    for strategy in self.strategies:
        tasks.append(
            asyncio.create_task(
                self._execute_one(domain, strategy, self.search_depth)
            )
        )
Example calculation:
  • 10 domains
  • 9 strategies per domain
  • = 90 concurrent tasks
All 90 tasks execute in parallel, limited only by semaphore pools.

Processing Results as They Complete

Results are processed immediately using asyncio.as_completed():
# From pipeline.py:158-164
for coro in asyncio.as_completed(tasks):
    domain, res = await coro
    completed_count += 1
    
    if res.ok and res.evidences:
        domain_to_evidence[domain].extend(res.evidences)
This approach:
  • ✅ Processes fastest queries first
  • ✅ Doesn’t wait for slowest queries
  • ✅ Enables real-time streaming
  • ✅ Maximizes throughput

Parallel Domain Analysis

After evidence collection, each domain is analyzed in parallel:
# From pipeline.py:166-173
domain_tasks = [
    asyncio.create_task(self.analyze_domain(domain, evidences))
    for domain, evidences in domain_to_evidence.items()
]
results: List[CompanyResearchResult] = []
for coro in asyncio.as_completed(domain_tasks):
    result = await coro
    results.append(result)

CPU-Intensive Operations

LLM analysis and TF-IDF processing run in thread pools to avoid blocking:
# From pipeline.py:125
await asyncio.to_thread(redis_client.clear_cache, domain)
# From jobs_search.py:84-85
# Run CPU-intensive work in thread pool
return await asyncio.to_thread(_compute_similarities)
asyncio.to_thread() runs blocking operations in a separate thread, keeping the event loop free for I/O.

Circuit Breakers

Circuit breakers prevent cascading failures when sources are down:
# From pipeline.py:59-65
self.breakers: Dict[str, CircuitBreaker] = {
    channel: CircuitBreaker(
        failure_threshold=settings.circuit_breaker_failures,
        reset_timeout_seconds=settings.circuit_breaker_reset_seconds,
    )
    for channel in self.sources.keys()
}

How Circuit Breakers Work

# From pipeline.py:83-92
breaker = self.breakers[strategy.channel]
if not breaker.allow_request():
    return domain, SourceResult(
        channel=strategy.channel,
        domain=domain,
        query=query,
        evidences=[],
        ok=False,
        error="circuit open"
    )
If a source fails repeatedly, the breaker opens and stops sending requests until the timeout expires.

Performance Metrics

The pipeline tracks detailed performance metrics:
# From pipeline.py:25-30
self.metrics = RunMetrics(start_time=time.perf_counter())

# Track successful queries
self.metrics.record_query(1)

# Track failures
self.metrics.record_failure(1)
Returned in response:
{
  "search_performance": {
    "queries_per_second": 12.5,
    "failed_requests": 2
  },
  "processing_time_ms": 3420
}

Streaming Optimization

The streaming pipeline uses optimized event frequency:
# From pipeline.py:256-278
progress_updates = [25, 50, 75, 100]  # Only send at milestones

for coro in asyncio.as_completed(tasks):
    domain, res = await coro
    completed_count += 1
    
    if res.ok and res.evidences:
        domain_to_evidence[domain].extend(res.evidences)
    
    progress_percent = int((completed_count / total_planned) * 100)
    if progress_percent in progress_updates:
        progress_updates.remove(progress_percent)
        
        yield orjson.dumps({
            "type": "evidence_progress",
            "progress": progress_percent,
            "completed": completed_count,
            "total": total_planned,
            "domains_with_evidence": len(domain_to_evidence),
            "timestamp": time.time()
        }).decode()
This reduces event noise from potentially 100+ events to just 4 progress updates.

Rate Limiting

Rate limiting is applied at multiple levels:

1. Source-Level Semaphores

Control concurrent requests per source (covered above).

2. Decorator-Based Rate Limiting

API-specific rate limiting via decorators:
# From google_search.py:44
@rate_limited("tavily")
async def fetch(
    self, domain: str, query: str, search_depth: str
) -> SourceResult:
    # ... API call
# From news_search.py:42
@rate_limited("newsapi")
async def fetch(
    self, domain: str, query: str, search_depth: str
) -> SourceResult:
    # ... API call

3. LLM Rate Limiting

# From query_generation.py:176
@rate_limited("gemini")
async def _generate_with_llm(
    self, research_goal: str, search_depth: str
) -> List[QueryStrategy]:
    # ... Gemini API call

Real-World Performance

Sequential vs. Parallel

Sequential execution:
10 domains × 9 strategies × 1.5s per query = 135 seconds
Parallel execution (max_parallel_searches=20):
90 total queries / 20 concurrent = ~5 batches
5 batches × 1.5s per batch = ~7.5 seconds
Speedup: 18x faster!

Realistic Example

Actual performance with network latency and LLM analysis:
{
  "total_companies": 10,
  "search_strategies_generated": 9,
  "total_searches_executed": 90,
  "processing_time_ms": 3420,  // ~3.4 seconds total
  "search_performance": {
    "queries_per_second": 26.3,
    "failed_requests": 3
  }
}

Best Practices

Begin with max_parallel_searches=20 and monitor:
  • API rate limit errors
  • Success/failure ratio
  • Overall throughput
Increase gradually if performance is stable.
The pipeline continues even if individual queries fail:
# From pipeline.py:106-108
if result.ok and result.evidences:
    self.metrics.record_query(1)
    breaker.record_success()
else:
    self.metrics.record_failure(1)
    breaker.record_failure()
Check failed_requests in metrics to assess data quality.
Different scenarios need different settings:High volume, low latency:
{
  "search_depth": "quick",
  "max_parallel_searches": 50
}
High quality, comprehensive:
{
  "search_depth": "comprehensive",
  "max_parallel_searches": 15
}
CPU-intensive operations use thread pools:
# From jobs_search.py:45-85
def _compute_similarities():
    # CPU-intensive TF-IDF computation
    vectorizer = self._get_tfidf_model()
    tfidf_matrix = vectorizer.fit_transform(all_texts)
    similarities = cosine_similarity(search_vector, job_vectors)[0]
    return matches

return await asyncio.to_thread(_compute_similarities)
The default thread pool has limited workers. For heavy CPU workloads, consider increasing pool size.
For batches with 50+ domains, use streaming to get incremental results:
POST /api/research/batch/stream
This provides visibility into progress and early results while processing continues.

Concurrency Limits

Per-Source Limits

max_parallel_searches: int  # Per-source concurrency
  • max_parallel_searches=20 → Up to 60 total concurrent requests (20 × 3 sources)
  • max_parallel_searches=50 → Up to 150 total concurrent requests (50 × 3 sources)

API Provider Limits

Check your API tier limits:
  • Tavily: Varies by plan (typically 100-1000/min)
  • NewsAPI: 1,000 requests/day (free tier)
  • Greenhouse: No auth required, but respect fair use
  • Gemini: Varies by plan (typically 60 RPM free tier)
Set max_parallel_searches lower than your strictest API limit to avoid errors.

Debugging Performance

Check Metrics

Analyze the performance metrics:
{
  "processing_time_ms": 15000,  // 15 seconds - slower than expected
  "search_performance": {
    "queries_per_second": 3.2,  // Low QPS indicates bottleneck
    "failed_requests": 45  // High failures suggest rate limiting
  }
}
Diagnosis: High failures + low QPS → Reduce max_parallel_searches

Enable Timing Logs

The routes include timing logs:
# From routes.py:27-35
start_time = time.time()
query_generator = QueryGenerator()
strategies = await query_generator.generate_strategies(
    research_goal=payload.research_goal,
    search_depth=payload.search_depth,
)
end_time = time.time()
print(f"Query generation time: {end_time - start_time} seconds")
Monitor these logs to identify slow stages.

Advanced: Custom Semaphore Configuration

For advanced use cases, you can implement custom semaphore configurations:
# Custom configuration per source
self.source_pools = {
    "google_search": asyncio.Semaphore(50),  # High-tier Tavily account
    "news_search": asyncio.Semaphore(10),    # Free tier NewsAPI
    "jobs_search": asyncio.Semaphore(30),    # No auth, but be respectful
}
This allows fine-tuning based on specific API limitations.

Example: Scaling to 1000 Domains

Optimal configuration for large batches:
{
  "research_goal": "Find SaaS companies",
  "company_domains": ["... 1000 domains ..."],
  "search_depth": "quick",  // Minimize per-domain cost
  "max_parallel_searches": 30,  // Balance speed vs. rate limits
  "confidence_threshold": 0.75
}
Expected performance:
  • 1000 domains × 6 strategies = 6000 total queries
  • With 30 concurrent per source = ~90 concurrent total
  • At ~1.5s per query = ~100-150s total time (~2 minutes)
Vs. sequential: ~2.5 hours!

Real-Time Streaming

See how parallel processing enables live progress updates

Multi-Source Research

Learn about the data sources being queried in parallel

Build docs developers (and LLMs) love