The GTM Research Engine processes research requests through a sophisticated pipeline that collects, analyzes, and aggregates data from multiple sources. This page explains the complete data flow from user input to final results.
High-Level Data Flow
Phase 1: Request Processing
The frontend collects research parameters from the user:
interface SearchQuery {
research_goal : string ; // "Find fintech companies using AI"
company_domains : string []; // ["stripe.com", "paypal.com"]
search_depth : "quick" | "standard" | "comprehensive" ;
max_parallel_searches : number ; // 1-20
confidence_threshold : number ; // 0.0-1.0
}
Request Validation
FastAPI validates incoming requests using Pydantic models:
class BatchResearchRequest ( BaseModel ):
research_goal: str = Field( ... , description = "The high-level research objective" )
company_domains: List[ str ] = Field( ... , description = "Company domains to analyze" )
search_depth: Literal[ "quick" , "standard" , "comprehensive" ] = Field( ... )
max_parallel_searches: int = Field( ... )
confidence_threshold: float = Field( ... , ge = 0.0 , le = 1.0 )
All requests are validated at the API boundary, ensuring type safety and preventing invalid data from entering the pipeline.
Phase 2: AI-Powered Query Generation
The system uses AI to generate intelligent search strategies:
Goal Analysis
The AI analyzes the research goal to understand what to search for
Strategy Generation
Multiple search strategies are generated based on search depth:
Quick : 2-3 strategies
Standard : 4-6 strategies
Comprehensive : 8-10 strategies
Query Template Creation
Each strategy defines query templates that will be customized per company
Query Strategy Model
class QueryStrategy :
channel: str # "google_search", "news_search", "jobs_search"
query_template: str # "{domain} AI fraud detection"
def build_query ( self , domain : str ) -> str :
"""Build domain-specific query from template."""
return self .query_template.format( domain = domain)
Example Generated Strategies
For research goal: “Find fintech companies using AI for fraud detection”
Strategy Channel Query Template 1 google_search {domain} artificial intelligence fraud detection2 news_search {domain} machine learning financial fraud3 jobs_search {domain} hiring fraud detection engineer4 google_search {domain} AI powered fraud prevention
Query generation typically takes 0.5-1.5 seconds and happens once per research request, regardless of the number of companies.
Phase 3: Parallel Execution Engine
The research pipeline executes queries in parallel with sophisticated concurrency control:
Concurrency Control
The pipeline uses semaphores to control parallel execution:
# From pipeline.py:44-49
self .source_pools: Dict[ str , asyncio.Semaphore] = {
"google_search" : asyncio.Semaphore(max_parallel_searches),
"jobs_search" : asyncio.Semaphore(max_parallel_searches),
"news_search" : asyncio.Semaphore(max_parallel_searches),
}
Setting max_parallel_searches too high (>20) may trigger rate limits on external APIs. The default of 5-10 provides good performance without hitting limits.
Task Execution Flow
For a request with 3 companies and 4 strategies :
Total tasks created : 3 × 4 = 12 tasks
Parallel execution : Up to max_parallel_searches tasks run simultaneously
Completion order : Tasks complete as they finish (non-deterministic)
Aggregation : Results are collected per domain
# From pipeline.py:145-163
tasks: List[asyncio.Task[Tuple[ str , SourceResult]]] = []
for domain in self .company_domains:
for strategy in self .strategies:
tasks.append(
asyncio.create_task(
self ._execute_one(domain, strategy, self .search_depth)
)
)
# Execute all tasks and aggregate by domain
domain_to_evidence: Dict[ str , List[Evidence]] = defaultdict( list )
for coro in asyncio.as_completed(tasks):
domain, res = await coro
if res.ok and res.evidences:
domain_to_evidence[domain].extend(res.evidences)
Phase 4: Evidence Collection
Source Result Structure
Each data source returns structured evidence:
class SourceResult :
channel: str # Source identifier
domain: str # Company domain
query: str # Executed query
evidences: List[Evidence] # Found evidence
ok: bool # Success indicator
error: Optional[ str ] # Error message if failed
Evidence Model
class Evidence ( BaseModel ):
url: str # Source URL
title: str # Page/article title
snippet: str # Relevant text excerpt
source_name: str # "google_search", "news_api", etc.
Deduplication
Evidence is deduplicated based on URL and title:
def _evidence_key ( self , evidence : Evidence) -> str :
"""Generate unique key for evidence deduplication."""
import hashlib
content = f " { evidence.url } | { evidence.title } " .lower().strip()
return hashlib.sha1(content.encode()).hexdigest()
Duplicates are identified using SHA-1 hashing of normalized URL and title. If duplicates are found, the evidence with the higher score is retained.
Phase 5: AI Analysis
Once all evidence is collected for a company, AI analysis extracts insights:
Analysis Process
Batch Analysis
All evidence for a company is analyzed in a single AI call for efficiency
Technology Extraction
Extract specific technologies mentioned (TensorFlow, Python, etc.)
Signal Detection
Identify signals indicating the company matches the research goal
Confidence Scoring
Calculate a confidence score (0.0-1.0) based on evidence quality
Analysis Implementation
# From pipeline.py:124-133
async def analyze_domain ( self , domain : str , evidences : List[Evidence]):
await asyncio.to_thread(redis_client.clear_cache, domain)
if evidences:
analysis_result = await self .signal_extractor.analyze_company(
self .research_goal, evidences
)
return self ._build_company_result(domain, evidences, analysis_result)
else :
return self ._build_empty_result(domain)
Analysis Result Structure
analysis_result = {
"technologies" : [ "TensorFlow" , "Python" , "Kubernetes" ],
"goal_match_signals" : [
"Uses machine learning for fraud detection" ,
"AI-powered transaction monitoring"
],
"confidence_score" : 0.87 ,
"reasoning" : "Strong evidence of AI usage in fraud detection..."
}
Phase 6: Result Aggregation
Results are aggregated into a structured response:
Company Research Result
interface CompanyResearchResult {
domain : string ; // "stripe.com"
confidence_score : number ; // 0.87
evidence_sources : number ; // 3 (google, news, jobs)
findings : Findings ;
}
interface Findings {
technologies : string []; // ["TensorFlow", "Python"]
evidence : Evidence []; // All collected evidence
signals_found : number ; // 2 signals detected
}
Batch Research Response
interface BatchResearchResponse {
research_id : string ; // Unique ID for this research
total_companies : number ; // Number of companies researched
search_strategies_generated : number ; // Strategies created by AI
total_searches_executed : number ; // Total API calls made
processing_time_ms : number ; // Total processing time
results : CompanyResearchResult []; // Per-company results
search_performance : SearchPerformance ;
}
Example Response
{
"research_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"total_companies" : 2 ,
"search_strategies_generated" : 4 ,
"total_searches_executed" : 8 ,
"processing_time_ms" : 14016 ,
"results" : [
{
"domain" : "stripe.com" ,
"confidence_score" : 0.87 ,
"evidence_sources" : 3 ,
"findings" : {
"technologies" : [ "TensorFlow" , "Python" , "Machine Learning" ],
"evidence" : [ ... ],
"signals_found" : 2
}
}
],
"search_performance" : {
"queries_per_second" : 5.2 ,
"failed_requests" : 0
}
}
Streaming Data Flow
For real-time updates, the /research/batch/stream endpoint provides progressive results:
Streaming Event Types
Event Structure
Each SSE event is JSON-formatted:
{
"type" : "domain_analyzed" ,
"domain" : "stripe.com" ,
"confidence" : 0.87 ,
"evidence_count" : 12 ,
"technologies" : [ "TensorFlow" , "Python" ],
"progress" : 1 ,
"total" : 2 ,
"timestamp" : 1678901234.567
}
Streaming is implemented using Server-Sent Events (SSE), allowing the frontend to display real-time progress without polling.
Error Handling in Data Flow
Circuit Breaker Pattern
When a data source fails repeatedly:
breaker = self .breakers[strategy.channel]
if not breaker.allow_request():
return SourceResult(
channel = strategy.channel,
domain = domain,
query = query,
evidences = [],
ok = False ,
error = "circuit open"
)
Graceful Degradation
Partial Results If some sources fail, results are still returned from successful sources
Empty Results Companies with no evidence get empty results with 0.0 confidence
Error Tracking Failed requests are tracked in search_performance.failed_requests
Retry Logic Circuit breakers automatically reset after timeout period
Caching Strategy
Redis caching reduces redundant API calls:
# Cache is cleared per domain before analysis
await asyncio.to_thread(redis_client.clear_cache, domain)
Batch Processing
AI analysis is batched per company:
Before : One AI call per evidence item (inefficient)
After : One AI call per company with all evidence (efficient)
Async Optimization
All I/O operations are non-blocking:
# Parallel domain analysis
domain_tasks = [
asyncio.create_task( self .analyze_domain(domain, evidences))
for domain, evidences in domain_to_evidence.items()
]
results: List[CompanyResearchResult] = []
for coro in asyncio.as_completed(domain_tasks):
result = await coro
results.append(result)
Monitoring and Metrics
The system tracks performance metrics throughout the data flow:
class RunMetrics :
start_time: float
def record_query ( self , count : int ):
"""Record successful queries."""
def record_failure ( self , count : int ):
"""Record failed queries."""
def to_dict ( self ) -> Dict:
"""Export metrics for response."""
return {
"queries_per_second" : self .queries_per_second,
"failed_requests" : self .failed_requests
}
Next Steps
Backend Architecture Deep dive into backend implementation
Frontend Architecture Learn about frontend components
API Reference Explore API endpoints in detail
Development Guide Set up your development environment