Skip to main content
The GTM Research Engine is built for high-performance parallel data collection. This guide covers optimization strategies based on the actual implementation.

Architecture Overview

The engine uses three core performance components:

Token Bucket Rate Limiter

Smooth API request distribution with burst capacity

Redis Deduplication

Prevent duplicate evidence across sources

Circuit Breaker

Automatic source failure protection

Parallel Processing Configuration

Source Pool Concurrency

The pipeline creates separate semaphore pools for each data source to maximize parallelization:
backend/app/services/pipeline.py
# Source Pools for rate limiting
self.source_pools: Dict[str, asyncio.Semaphore] = {
    "google_search": asyncio.Semaphore(max_parallel_searches),   
    "jobs_search": asyncio.Semaphore(max_parallel_searches),
    "news_search": asyncio.Semaphore(max_parallel_searches),
}
self.default_pool = asyncio.Semaphore(max(max_parallel_searches // 4, 2))
Performance Impact: Each source pool runs max_parallel_searches queries simultaneously. With 3 sources and max_parallel_searches=20, the engine can process 60 concurrent requests.

Optimizing Concurrency Levels

1

Start with Conservative Values

Begin with max_parallel_searches: 10-20 to avoid rate limits:
{
  "max_parallel_searches": 15,
  "search_depth": "standard"
}
2

Monitor API Rate Limits

Check your API quotas:
  • Tavily: 500 RPM (requests per minute)
  • Gemini: 2000 RPM
  • NewsAPI: 300 RPM (free tier: 1,000/day)
3

Scale Up Gradually

Increase concurrency based on your API limits:
backend/app/core/config.py
return Settings(
    max_parallel_searches=30,  # Increased from 20
    tavily_rpm=500,
    gemini_rpm=2000,
    newsapi_rpm=300,
)
Rate Limit Formula: Ensure (max_parallel_searches * number_of_sources) < API_RPMExample: 20 searches * 3 sources = 60 req/min is safe for Tavily’s 500 RPM limit.

Rate Limiting Strategy

Token Bucket Algorithm

The engine uses a token bucket rate limiter for smooth request distribution:
backend/app/decorators/api_rate_limiter.py
class TokenBucketRateLimiter:
    def __init__(self, tokens_per_second: int, bucket_capacity: Optional[int] = None):
        self.tokens_per_second = tokens_per_second
        self.bucket_capacity = bucket_capacity or tokens_per_second
        self.tokens = self.bucket_capacity  # Start with full bucket
        self.last_refill_time = time.time()

    def try_consume_token(self) -> bool:
        self._refill_tokens()
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

Configuring API-Specific Rate Limits

class RateLimitConfig:
    def __init__(self):
        settings = get_settings()
        
        # Convert RPM to RPS (requests per second)
        self.tavily_rate_limit = settings.tavily_rpm / 60.0      # 8.33 RPS
        self.gemini_rate_limit = settings.gemini_rpm / 60.0      # 33.33 RPS
        self.newsapi_rate_limit = settings.newsapi_rpm / 60.0    # 5 RPS
        
        # Retry settings
        self.max_retry_attempts = 3
        self.initial_delay = 1.0
        self.max_delay = 10.0
        self.backoff_factor = 2.0
Burst Capacity: The token bucket allows bursts up to bucket_capacity tokens, then refills at tokens_per_second. This handles spiky traffic gracefully.

Redis Caching Strategy

Evidence Deduplication

Redis prevents duplicate evidence from being processed across multiple sources:
backend/app/db/redis_cache.py
class RedisCache:
    def is_evidence_cached(self, domain: str, evidence: Evidence) -> bool:
        if self.client.sismember(f"urls:{domain}", evidence.url):
            return True
        if self.client.sismember(f"titles:{domain}", evidence.title):
            return True
        if self.client.sismember(f"snippets:{domain}", evidence.snippet):
            return True
        return False
    
    def add_evidence_to_cache(self, domain: str, evidence: Evidence) -> None:
        self.client.sadd(f"urls:{domain}", evidence.url)
        self.client.sadd(f"titles:{domain}", evidence.title)
        self.client.sadd(f"snippets:{domain}", evidence.snippet)

Cache Performance Benefits

  1. URL Deduplication: Same URL found by multiple sources
  2. Title Matching: Articles with identical titles from different outlets
  3. Content Similarity: Duplicate snippets across sources
Performance Gain: Redis set membership checks are O(1), making deduplication extremely fast even with thousands of evidence items.
The pipeline clears cache per domain before analysis:
backend/app/services/pipeline.py
async def analyze_domain(self, domain: str, evidences: List[Evidence]):
    await asyncio.to_thread(redis_client.clear_cache, domain)
    
    if evidences:
        analysis_result = await self.signal_extractor.analyze_company(
            self.research_goal, evidences
        )
        return self._build_company_result(domain, evidences, analysis_result)
This ensures fresh data for each research run.

Circuit Breaker Protection

Automatic Failure Handling

Circuit breakers protect against cascading failures when API sources become unreliable:
backend/app/core/circuit_breaker.py
class CircuitBreaker:
    def allow_request(self) -> bool:
        if self._state == CircuitState.CLOSED:
            return True
        if self._state == CircuitState.OPEN:
            if (time.monotonic() - self._opened_at) >= self.reset_timeout_seconds:
                self._state = CircuitState.HALF_OPEN
                return True
            return False
        return True

    def record_failure(self) -> None:
        self._failure_count += 1
        if self._failure_count >= self.failure_threshold:
            self._state = CircuitState.OPEN
            self._opened_at = time.monotonic()

Circuit Breaker States

1

CLOSED (Normal Operation)

All requests proceed normally. Failures are counted.
2

OPEN (Source Failed)

After failure_threshold failures, circuit opens and blocks all requests:
breaker = CircuitBreaker(
    failure_threshold=5,
    reset_timeout_seconds=30.0
)
3

HALF_OPEN (Testing Recovery)

After reset_timeout_seconds, allows one request to test if source recovered.
  • Success: Circuit closes, normal operation resumes
  • Failure: Circuit re-opens for another timeout period

Configuring Circuit Breakers

backend/app/core/config.py
@lru_cache(maxsize=1)
def get_settings() -> Settings:
    return Settings(
        circuit_breaker_failures=5,         # Open after 5 consecutive failures
        circuit_breaker_reset_seconds=30.0, # Test recovery after 30 seconds
        max_parallel_searches=20,
    )
If a circuit breaker opens during a research run, that source will return:
SourceResult(
    ok=False,
    error="circuit open"
)
The pipeline continues with remaining sources.

Metrics and Monitoring

Pipeline Metrics

Track performance with the RunMetrics class:
backend/app/core/metrics.py
@dataclass
class RunMetrics:
    start_time: float
    total_queries_executed: int = 0
    failed_requests: int = 0

    def to_dict(self) -> dict:
        elapsed = max(0.0001, time.perf_counter() - self.start_time)
        qps = self.total_queries_executed / elapsed
        return {
            "queries_per_second": round(qps, 2),
            "failed_requests": self.failed_requests,
        }

Performance Benchmarks

Standard Configuration

  • Domains: 10
  • Parallel Searches: 20
  • Search Depth: standard
  • Processing Time: ~3-5 seconds
  • Queries/Second: ~40-60 QPS

High-Performance Configuration

  • Domains: 50
  • Parallel Searches: 30
  • Search Depth: comprehensive
  • Processing Time: ~8-12 seconds
  • Queries/Second: ~100-150 QPS

Optimization Checklist

  • Set appropriate max_parallel_searches for your API limits
  • Configure tavily_rpm, gemini_rpm, newsapi_rpm in settings
  • Enable retry logic with exponential backoff
  • Monitor rate limit errors in logs
  • Redis running on localhost:6379
  • Monitor cache hit rates
  • Clear cache between research runs if needed
  • Consider Redis persistence for production
  • Set circuit_breaker_failures based on source reliability
  • Configure circuit_breaker_reset_seconds for quick recovery
  • Monitor breaker state changes in metrics
  • Implement alerting for open circuits
  • Use separate source pools for independent scaling
  • Leverage async/await for I/O-bound operations
  • Run CPU-intensive tasks (TF-IDF) in thread pools
  • Monitor asyncio task completion rates

Advanced Performance Tuning

TF-IDF Job Matching Optimization

The jobs search source uses scikit-learn for semantic matching:
backend/app/sources/jobs_search.py
self._tfidf_vectorizer = TfidfVectorizer(
    max_features=2000,           # More features for richer content
    stop_words='english',
    ngram_range=(1, 3),          # Include trigrams for better phrase matching
    lowercase=True,
    min_df=1,                    # Include rare terms (good for specialized roles)
    max_df=0.95,                 # Remove very common terms
)
CPU-Intensive Operations: TF-IDF computations run in thread pools to avoid blocking the event loop:
return await asyncio.to_thread(_compute_similarities)

Search Depth Optimization

Balance result quality vs. API usage:
Search DepthResults per SourceTotal API Calls (10 domains, 5 strategies)
quick2100 calls
standard3150 calls
comprehensive5250 calls
The search_depth parameter controls both Tavily’s search quality and the number of results returned per query.

Troubleshooting Performance Issues

Slow Research Runs

1

Check API Response Times

Add timing logs to identify slow sources:
start = time.time()
result = await source.fetch(domain, query, search_depth)
logger.info(f"{source.channel_name} took {time.time() - start:.2f}s")
2

Verify Redis Connection

Test cache performance:
redis-cli ping
redis-cli --latency
3

Monitor Circuit Breaker State

Check if sources are failing:
for channel, breaker in self.breakers.items():
    logger.info(f"{channel}: {breaker.state}")

High API Costs

Reduce costs by optimizing search strategies:
  1. Use quick search depth for initial exploration
  2. Reduce max_parallel_searches to slow request rate
  3. Filter domains before running expensive comprehensive searches
  4. Enable caching to avoid redundant API calls

Next Steps

Custom Sources

Build custom data sources for proprietary APIs

Troubleshooting

Debug common issues and errors

Build docs developers (and LLMs) love