Skip to main content
The GTM Research Engine processes research requests through a sophisticated pipeline that collects, analyzes, and aggregates data from multiple sources. This page explains the complete data flow from user input to final results.

High-Level Data Flow

Phase 1: Request Processing

User Input Structure

The frontend collects research parameters from the user:
interface SearchQuery {
  research_goal: string;                        // "Find fintech companies using AI"
  company_domains: string[];                    // ["stripe.com", "paypal.com"]
  search_depth: "quick" | "standard" | "comprehensive";
  max_parallel_searches: number;                // 1-20
  confidence_threshold: number;                 // 0.0-1.0
}

Request Validation

FastAPI validates incoming requests using Pydantic models:
class BatchResearchRequest(BaseModel):
    research_goal: str = Field(..., description="The high-level research objective")
    company_domains: List[str] = Field(..., description="Company domains to analyze")
    search_depth: Literal["quick", "standard", "comprehensive"] = Field(...)
    max_parallel_searches: int = Field(...)
    confidence_threshold: float = Field(..., ge=0.0, le=1.0)
All requests are validated at the API boundary, ensuring type safety and preventing invalid data from entering the pipeline.

Phase 2: AI-Powered Query Generation

The system uses AI to generate intelligent search strategies:
1

Goal Analysis

The AI analyzes the research goal to understand what to search for
2

Strategy Generation

Multiple search strategies are generated based on search depth:
  • Quick: 2-3 strategies
  • Standard: 4-6 strategies
  • Comprehensive: 8-10 strategies
3

Query Template Creation

Each strategy defines query templates that will be customized per company

Query Strategy Model

class QueryStrategy:
    channel: str           # "google_search", "news_search", "jobs_search"
    query_template: str    # "{domain} AI fraud detection"
    
    def build_query(self, domain: str) -> str:
        """Build domain-specific query from template."""
        return self.query_template.format(domain=domain)

Example Generated Strategies

For research goal: “Find fintech companies using AI for fraud detection”
StrategyChannelQuery Template
1google_search{domain} artificial intelligence fraud detection
2news_search{domain} machine learning financial fraud
3jobs_search{domain} hiring fraud detection engineer
4google_search{domain} AI powered fraud prevention
Query generation typically takes 0.5-1.5 seconds and happens once per research request, regardless of the number of companies.

Phase 3: Parallel Execution Engine

The research pipeline executes queries in parallel with sophisticated concurrency control:

Concurrency Control

The pipeline uses semaphores to control parallel execution:
# From pipeline.py:44-49
self.source_pools: Dict[str, asyncio.Semaphore] = {
    "google_search": asyncio.Semaphore(max_parallel_searches),
    "jobs_search": asyncio.Semaphore(max_parallel_searches),
    "news_search": asyncio.Semaphore(max_parallel_searches),
}
Setting max_parallel_searches too high (>20) may trigger rate limits on external APIs. The default of 5-10 provides good performance without hitting limits.

Task Execution Flow

For a request with 3 companies and 4 strategies:
  1. Total tasks created: 3 × 4 = 12 tasks
  2. Parallel execution: Up to max_parallel_searches tasks run simultaneously
  3. Completion order: Tasks complete as they finish (non-deterministic)
  4. Aggregation: Results are collected per domain
# From pipeline.py:145-163
tasks: List[asyncio.Task[Tuple[str, SourceResult]]] = []
for domain in self.company_domains:
    for strategy in self.strategies:
        tasks.append(
            asyncio.create_task(
                self._execute_one(domain, strategy, self.search_depth)
            )
        )

# Execute all tasks and aggregate by domain
domain_to_evidence: Dict[str, List[Evidence]] = defaultdict(list)
for coro in asyncio.as_completed(tasks):
    domain, res = await coro
    if res.ok and res.evidences:
        domain_to_evidence[domain].extend(res.evidences)

Phase 4: Evidence Collection

Source Result Structure

Each data source returns structured evidence:
class SourceResult:
    channel: str              # Source identifier
    domain: str               # Company domain
    query: str                # Executed query
    evidences: List[Evidence] # Found evidence
    ok: bool                  # Success indicator
    error: Optional[str]      # Error message if failed

Evidence Model

class Evidence(BaseModel):
    url: str          # Source URL
    title: str        # Page/article title
    snippet: str      # Relevant text excerpt
    source_name: str  # "google_search", "news_api", etc.

Deduplication

Evidence is deduplicated based on URL and title:
def _evidence_key(self, evidence: Evidence) -> str:
    """Generate unique key for evidence deduplication."""
    import hashlib
    content = f"{evidence.url}|{evidence.title}".lower().strip()
    return hashlib.sha1(content.encode()).hexdigest()
Duplicates are identified using SHA-1 hashing of normalized URL and title. If duplicates are found, the evidence with the higher score is retained.

Phase 5: AI Analysis

Once all evidence is collected for a company, AI analysis extracts insights:

Analysis Process

1

Batch Analysis

All evidence for a company is analyzed in a single AI call for efficiency
2

Technology Extraction

Extract specific technologies mentioned (TensorFlow, Python, etc.)
3

Signal Detection

Identify signals indicating the company matches the research goal
4

Confidence Scoring

Calculate a confidence score (0.0-1.0) based on evidence quality

Analysis Implementation

# From pipeline.py:124-133
async def analyze_domain(self, domain: str, evidences: List[Evidence]):
    await asyncio.to_thread(redis_client.clear_cache, domain)
    
    if evidences:
        analysis_result = await self.signal_extractor.analyze_company(
            self.research_goal, evidences
        )
        return self._build_company_result(domain, evidences, analysis_result)
    else:
        return self._build_empty_result(domain)

Analysis Result Structure

analysis_result = {
    "technologies": ["TensorFlow", "Python", "Kubernetes"],
    "goal_match_signals": [
        "Uses machine learning for fraud detection",
        "AI-powered transaction monitoring"
    ],
    "confidence_score": 0.87,
    "reasoning": "Strong evidence of AI usage in fraud detection..."
}

Phase 6: Result Aggregation

Results are aggregated into a structured response:

Company Research Result

interface CompanyResearchResult {
  domain: string;                // "stripe.com"
  confidence_score: number;      // 0.87
  evidence_sources: number;      // 3 (google, news, jobs)
  findings: Findings;
}

interface Findings {
  technologies: string[];        // ["TensorFlow", "Python"]
  evidence: Evidence[];          // All collected evidence
  signals_found: number;         // 2 signals detected
}

Batch Research Response

interface BatchResearchResponse {
  research_id: string;                  // Unique ID for this research
  total_companies: number;              // Number of companies researched
  search_strategies_generated: number;  // Strategies created by AI
  total_searches_executed: number;      // Total API calls made
  processing_time_ms: number;           // Total processing time
  results: CompanyResearchResult[];     // Per-company results
  search_performance: SearchPerformance;
}

Example Response

{
  "research_id": "550e8400-e29b-41d4-a716-446655440000",
  "total_companies": 2,
  "search_strategies_generated": 4,
  "total_searches_executed": 8,
  "processing_time_ms": 14016,
  "results": [
    {
      "domain": "stripe.com",
      "confidence_score": 0.87,
      "evidence_sources": 3,
      "findings": {
        "technologies": ["TensorFlow", "Python", "Machine Learning"],
        "evidence": [...],
        "signals_found": 2
      }
    }
  ],
  "search_performance": {
    "queries_per_second": 5.2,
    "failed_requests": 0
  }
}

Streaming Data Flow

For real-time updates, the /research/batch/stream endpoint provides progressive results:

Streaming Event Types

Event Structure

Each SSE event is JSON-formatted:
{
  "type": "domain_analyzed",
  "domain": "stripe.com",
  "confidence": 0.87,
  "evidence_count": 12,
  "technologies": ["TensorFlow", "Python"],
  "progress": 1,
  "total": 2,
  "timestamp": 1678901234.567
}
Streaming is implemented using Server-Sent Events (SSE), allowing the frontend to display real-time progress without polling.

Error Handling in Data Flow

Circuit Breaker Pattern

When a data source fails repeatedly:
breaker = self.breakers[strategy.channel]
if not breaker.allow_request():
    return SourceResult(
        channel=strategy.channel,
        domain=domain,
        query=query,
        evidences=[],
        ok=False,
        error="circuit open"
    )

Graceful Degradation

Partial Results

If some sources fail, results are still returned from successful sources

Empty Results

Companies with no evidence get empty results with 0.0 confidence

Error Tracking

Failed requests are tracked in search_performance.failed_requests

Retry Logic

Circuit breakers automatically reset after timeout period

Performance Optimization

Caching Strategy

Redis caching reduces redundant API calls:
# Cache is cleared per domain before analysis
await asyncio.to_thread(redis_client.clear_cache, domain)

Batch Processing

AI analysis is batched per company:
  • Before: One AI call per evidence item (inefficient)
  • After: One AI call per company with all evidence (efficient)

Async Optimization

All I/O operations are non-blocking:
# Parallel domain analysis
domain_tasks = [
    asyncio.create_task(self.analyze_domain(domain, evidences))
    for domain, evidences in domain_to_evidence.items()
]

results: List[CompanyResearchResult] = []
for coro in asyncio.as_completed(domain_tasks):
    result = await coro
    results.append(result)

Monitoring and Metrics

The system tracks performance metrics throughout the data flow:
class RunMetrics:
    start_time: float
    
    def record_query(self, count: int):
        """Record successful queries."""
    
    def record_failure(self, count: int):
        """Record failed queries."""
    
    def to_dict(self) -> Dict:
        """Export metrics for response."""
        return {
            "queries_per_second": self.queries_per_second,
            "failed_requests": self.failed_requests
        }

Next Steps

Backend Architecture

Deep dive into backend implementation

Frontend Architecture

Learn about frontend components

API Reference

Explore API endpoints in detail

Development Guide

Set up your development environment

Build docs developers (and LLMs) love