Skip to main content
The backend is a FastAPI-powered research engine built on Python 3.11+ with async/await patterns throughout. It orchestrates multi-source data collection, AI-powered analysis, and real-time result streaming.

Backend Components

Directory Structure

The backend follows a clean modular architecture:
backend/app/
├── api/                    # API layer
   ├── __init__.py
   └── routes.py          # Endpoint definitions
├── core/                   # Core utilities
   ├── __init__.py
   ├── circuit_breaker.py # Circuit breaker implementation
   ├── config.py          # Application configuration
   └── metrics.py         # Performance tracking
├── db/                     # Database layer
   ├── __init__.py
   └── redis_cache.py     # Redis caching
├── decorators/             # Reusable decorators
   ├── __init__.py
   └── api_rate_limiter.py
├── models/                 # Pydantic models
   ├── __init__.py
   ├── request.py         # Request schemas
   ├── response.py        # Response schemas
   └── search.py          # Search-related models
├── services/               # Business logic
   ├── __init__.py
   ├── pipeline.py        # Research orchestration
   ├── extractor.py       # AI-powered extraction
   └── enhanced_streaming_aggregator.py
├── sources/                # Data source integrations
   ├── __init__.py
   ├── base.py            # Base source interface
   ├── google_search.py
   ├── news_search.py
   └── jobs_search.py
├── streaming/              # SSE streaming
   └── stream_generator.py
└── server.py              # FastAPI application

FastAPI Server Setup

The main application is configured in server.py:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.api import api_router

app = FastAPI(title="GTM Research Engine", version="0.1.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router)
The server uses ORJSON for high-performance JSON serialization, providing 2-3x faster response times compared to standard JSON.

API Endpoints

The backend exposes two primary research endpoints defined in api/routes.py:

1. Batch Research Endpoint

@router.post(
    path="/research/batch",
    response_model=BatchResearchResponse,
    response_class=ORJSONResponse,
)
async def research_batch(
    payload: BatchResearchRequest
) -> BatchResearchResponse:
    run_id = str(uuid.uuid4())
    metrics = RunMetrics(start_time=time.perf_counter())
    
    # Generate search strategies
    query_generator = QueryGenerator()
    strategies = await query_generator.generate_strategies(
        research_goal=payload.research_goal,
        search_depth=payload.search_depth,
    )
    
    # Execute research pipeline
    pipeline = ResearchPipeline(
        run_id=run_id,
        research_goal=payload.research_goal,
        search_depth=payload.search_depth,
        company_domains=payload.company_domains,
        strategies=strategies,
        max_parallel_searches=payload.max_parallel_searches,
        confidence_threshold=payload.confidence_threshold,
        metrics=metrics,
    )
    
    results, total_searches_executed = await pipeline.run()
    
    return BatchResearchResponse(
        research_id=run_id,
        total_companies=len(payload.company_domains),
        search_strategies_generated=len(strategies),
        total_searches_executed=total_searches_executed,
        processing_time_ms=int((time.perf_counter() - metrics.start_time) * 1000),
        results=results,
        search_performance=metrics.to_dict(),
    )

2. Streaming Research Endpoint

@router.post("/research/batch/stream")
async def research_batch_stream(
    payload: BatchResearchRequest
) -> StreamingResponse:
    """
    Enhanced streaming research with proper SSE implementation.
    
    Features:
    - Proper SSE format with event types and IDs
    - Connection management with heartbeats
    - Error handling and recovery
    - Optimized event frequency
    - Real-time evidence processing
    """
    run_id = str(uuid.uuid4())
    metrics = RunMetrics(start_time=time.perf_counter())
    
    # Generate strategies and create pipeline
    query_generator = QueryGenerator()
    strategies = await query_generator.generate_strategies(
        research_goal=payload.research_goal,
        search_depth=payload.search_depth,
    )
    
    pipeline = ResearchPipeline(...)
    
    # Use enhanced stream generator
    stream_generator = ResearchStreamGenerator(heartbeat_interval=30)
    generator = stream_generator.generate_stream(pipeline)
    
    return StreamingResponse(
        generator,
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": "*",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )
The streaming endpoint keeps connections open for 10-30 seconds. Ensure your reverse proxy (nginx, etc.) is configured with appropriate timeout settings.

Research Pipeline

The core orchestration logic is in services/pipeline.py. This is the heart of the backend.

Pipeline Initialization

class ResearchPipeline:
    def __init__(
        self,
        run_id: str,
        research_goal: str,
        search_depth: str,
        company_domains: List[str],
        strategies: List[QueryStrategy],
        max_parallel_searches: int,
        confidence_threshold: float,
        metrics: RunMetrics,
    ) -> None:
        settings = get_settings()
        
        # Source Pools for rate limiting
        self.source_pools: Dict[str, asyncio.Semaphore] = {
            "google_search": asyncio.Semaphore(max_parallel_searches),
            "jobs_search": asyncio.Semaphore(max_parallel_searches),
            "news_search": asyncio.Semaphore(max_parallel_searches),
        }
        
        # Data sources
        self.sources: Dict[str, BaseSource] = {
            "google_search": GoogleSearchSource(),
            "jobs_search": JobsSearchSource(),
            "news_search": NewsSearchSource(),
        }
        
        # Circuit breakers per source
        self.breakers: Dict[str, CircuitBreaker] = {
            channel: CircuitBreaker(
                failure_threshold=settings.circuit_breaker_failures,
                reset_timeout_seconds=settings.circuit_breaker_reset_seconds,
            )
            for channel in self.sources.keys()
        }

Pipeline Execution Flow

1

Task Creation

Create async tasks for all company × strategy combinations
tasks: List[asyncio.Task] = []
for domain in self.company_domains:
    for strategy in self.strategies:
        tasks.append(
            asyncio.create_task(
                self._execute_one(domain, strategy, self.search_depth)
            )
        )
2

Parallel Execution

Execute tasks with semaphore-based concurrency control
async with source_pool:
    try:
        result = await source.fetch(
            domain=domain, query=query, search_depth=search_depth
        )
        if result.ok:
            breaker.record_success()
        else:
            breaker.record_failure()
    except Exception as exc:
        breaker.record_failure()
3

Evidence Collection

Aggregate evidence by domain as tasks complete
domain_to_evidence: Dict[str, List[Evidence]] = defaultdict(list)
for coro in asyncio.as_completed(tasks):
    domain, res = await coro
    if res.ok and res.evidences:
        domain_to_evidence[domain].extend(res.evidences)
4

AI Analysis

Analyze collected evidence per domain in parallel
domain_tasks = [
    asyncio.create_task(self.analyze_domain(domain, evidences))
    for domain, evidences in domain_to_evidence.items()
]

results: List[CompanyResearchResult] = []
for coro in asyncio.as_completed(domain_tasks):
    result = await coro
    results.append(result)

Single Search Execution

Each individual search is executed with full error handling:
async def _execute_one(
    self, domain: str, strategy: QueryStrategy, search_depth: str
) -> Tuple[str, SourceResult]:
    query = strategy.build_query(domain)
    
    source = self.sources.get(strategy.channel)
    if source is None:
        return domain, SourceResult(
            channel=strategy.channel,
            domain=domain,
            query=query,
            evidences=[],
            ok=False,
            error="unknown channel"
        )
    
    # Check circuit breaker
    breaker = self.breakers[strategy.channel]
    if not breaker.allow_request():
        return domain, SourceResult(
            channel=strategy.channel,
            domain=domain,
            query=query,
            evidences=[],
            ok=False,
            error="circuit open"
        )
    
    # Execute with semaphore
    source_pool = self.source_pools.get(strategy.channel)
    async with source_pool:
        try:
            result = await source.fetch(
                domain=domain, query=query, search_depth=search_depth
            )
            
            if result.ok and result.evidences:
                self.metrics.record_query(1)
                breaker.record_success()
            else:
                self.metrics.record_failure(1)
                breaker.record_failure()
            
            return domain, result
        except Exception as exc:
            self.metrics.record_failure(1)
            breaker.record_failure()
            return domain, SourceResult(
                channel=strategy.channel,
                domain=domain,
                query=query,
                evidences=[],
                ok=False,
                error=str(exc)
            )

Data Sources

All data sources implement the BaseSource interface:
class BaseSource(ABC):
    @abstractmethod
    async def fetch(
        self, domain: str, query: str, search_depth: str
    ) -> SourceResult:
        """Fetch evidence from the data source."""
        pass

Source Implementations

Google Search

Uses Google Search API for web results

News Search

Queries news APIs for recent articles

Jobs Search

Searches job boards for hiring signals
Each source handles:
  • Query execution
  • Response parsing
  • Evidence extraction
  • Error handling
  • Rate limiting

AI-Powered Extraction

The Extractor service in services/extractor.py uses Gemini 2.5 Flash for intelligent analysis:
class Extractor:
    async def analyze_company(
        self, research_goal: str, evidences: List[Evidence]
    ) -> Dict:
        """
        Analyze all evidence for a company in a single AI call.
        
        Returns:
            {
                "technologies": ["TensorFlow", "Python"],
                "goal_match_signals": ["Uses AI for fraud detection"],
                "confidence_score": 0.87,
                "reasoning": "Strong evidence of AI usage..."
            }
        """
        prompt = self._build_analysis_prompt(research_goal, evidences)
        response = await self._call_gemini(prompt)
        return self._parse_response(response)
Batch analysis (one AI call per company) is significantly more efficient than per-evidence analysis, reducing API calls by 80-90%.

Infrastructure Components

Circuit Breaker

Implemented in core/circuit_breaker.py:
class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        reset_timeout_seconds: float = 30.0
    ):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout_seconds
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.state = "closed"  # closed, open, half-open
    
    def allow_request(self) -> bool:
        """Check if request should be allowed."""
        if self.state == "closed":
            return True
        
        if self.state == "open":
            if time.time() - self.last_failure_time >= self.reset_timeout:
                self.state = "half-open"
                return True
            return False
        
        # half-open: allow one request to test
        return True
    
    def record_success(self):
        """Record successful request."""
        self.failure_count = 0
        self.state = "closed"
    
    def record_failure(self):
        """Record failed request."""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

Redis Caching

Caching layer in db/redis_cache.py:
class RedisCache:
    def __init__(self):
        self.client = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True
        )
    
    def get(self, key: str) -> Optional[str]:
        """Get cached value."""
        return self.client.get(key)
    
    def set(self, key: str, value: str, ttl: int = 3600):
        """Set cached value with TTL."""
        self.client.setex(key, ttl, value)
    
    def clear_cache(self, pattern: str):
        """Clear cache entries matching pattern."""
        for key in self.client.scan_iter(match=pattern):
            self.client.delete(key)

Metrics Tracking

Performance metrics in core/metrics.py:
class RunMetrics:
    def __init__(self, start_time: float):
        self.start_time = start_time
        self.total_queries = 0
        self.failed_queries = 0
    
    def record_query(self, count: int):
        self.total_queries += count
    
    def record_failure(self, count: int):
        self.failed_queries += count
    
    def to_dict(self) -> Dict:
        elapsed = time.perf_counter() - self.start_time
        return {
            "queries_per_second": round(self.total_queries / elapsed, 2),
            "failed_requests": self.failed_queries
        }

Configuration

Application settings defined in core/config.py:
@dataclass(frozen=True)
class Settings:
    max_parallel_searches: int = 20
    circuit_breaker_failures: int = 5
    circuit_breaker_reset_seconds: float = 30.0
    
    # API Rate Limits
    tavily_rpm: int = 500
    gemini_rpm: int = 2000
    newsapi_rpm: int = 300

@lru_cache(maxsize=1)
def get_settings() -> Settings:
    return Settings()
The @lru_cache decorator ensures settings are loaded only once and cached for the application lifetime.

Pydantic Models

Request Models

# models/request.py
class BatchResearchRequest(BaseModel):
    research_goal: str = Field(..., description="The high-level research objective")
    company_domains: List[str] = Field(..., description="Company domains to analyze")
    search_depth: Literal["quick", "standard", "comprehensive"] = Field(...)
    max_parallel_searches: int = Field(...)
    confidence_threshold: float = Field(..., ge=0.0, le=1.0)

Response Models

# models/response.py
class Evidence(BaseModel):
    url: str
    title: str
    snippet: str
    source_name: str

class Findings(BaseModel):
    technologies: List[str] = Field(default_factory=list)
    evidence: List[Evidence] = Field(default_factory=list)
    signals_found: int = 0

class CompanyResearchResult(BaseModel):
    domain: str
    confidence_score: float
    evidence_sources: int
    findings: Findings

class BatchResearchResponse(BaseModel):
    research_id: str
    total_companies: int
    search_strategies_generated: int
    total_searches_executed: int
    processing_time_ms: int
    results: List[CompanyResearchResult]
    search_performance: SearchPerformance

Running the Backend

Development Mode

cd backend
uvicorn app.server:app --reload --port 8000

Production Mode

uvicorn app.server:app --host 0.0.0.0 --port 8000 --workers 4

Docker

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app/ ./app/

CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8000"]

Next Steps

Frontend Architecture

Learn about the React frontend

Data Flow

Understand end-to-end data flow

API Reference

Explore API endpoints

Development Guide

Set up development environment

Build docs developers (and LLMs) love