Deep dive into the FastAPI-powered backend research engine, including pipeline orchestration, data sources, and AI integration
The backend is a FastAPI-powered research engine built on Python 3.11+ with async/await patterns throughout. It orchestrates multi-source data collection, AI-powered analysis, and real-time result streaming.
@router.post("/research/batch/stream")async def research_batch_stream( payload: BatchResearchRequest) -> StreamingResponse: """ Enhanced streaming research with proper SSE implementation. Features: - Proper SSE format with event types and IDs - Connection management with heartbeats - Error handling and recovery - Optimized event frequency - Real-time evidence processing """ run_id = str(uuid.uuid4()) metrics = RunMetrics(start_time=time.perf_counter()) # Generate strategies and create pipeline query_generator = QueryGenerator() strategies = await query_generator.generate_strategies( research_goal=payload.research_goal, search_depth=payload.search_depth, ) pipeline = ResearchPipeline(...) # Use enhanced stream generator stream_generator = ResearchStreamGenerator(heartbeat_interval=30) generator = stream_generator.generate_stream(pipeline) return StreamingResponse( generator, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "X-Accel-Buffering": "no", # Disable nginx buffering } )
The streaming endpoint keeps connections open for 10-30 seconds. Ensure your reverse proxy (nginx, etc.) is configured with appropriate timeout settings.
Create async tasks for all company × strategy combinations
tasks: List[asyncio.Task] = []for domain in self.company_domains: for strategy in self.strategies: tasks.append( asyncio.create_task( self._execute_one(domain, strategy, self.search_depth) ) )
2
Parallel Execution
Execute tasks with semaphore-based concurrency control
async with source_pool: try: result = await source.fetch( domain=domain, query=query, search_depth=search_depth ) if result.ok: breaker.record_success() else: breaker.record_failure() except Exception as exc: breaker.record_failure()
3
Evidence Collection
Aggregate evidence by domain as tasks complete
domain_to_evidence: Dict[str, List[Evidence]] = defaultdict(list)for coro in asyncio.as_completed(tasks): domain, res = await coro if res.ok and res.evidences: domain_to_evidence[domain].extend(res.evidences)
4
AI Analysis
Analyze collected evidence per domain in parallel
domain_tasks = [ asyncio.create_task(self.analyze_domain(domain, evidences)) for domain, evidences in domain_to_evidence.items()]results: List[CompanyResearchResult] = []for coro in asyncio.as_completed(domain_tasks): result = await coro results.append(result)
The Extractor service in services/extractor.py uses Gemini 2.5 Flash for intelligent analysis:
class Extractor: async def analyze_company( self, research_goal: str, evidences: List[Evidence] ) -> Dict: """ Analyze all evidence for a company in a single AI call. Returns: { "technologies": ["TensorFlow", "Python"], "goal_match_signals": ["Uses AI for fraud detection"], "confidence_score": 0.87, "reasoning": "Strong evidence of AI usage..." } """ prompt = self._build_analysis_prompt(research_goal, evidences) response = await self._call_gemini(prompt) return self._parse_response(response)
Batch analysis (one AI call per company) is significantly more efficient than per-evidence analysis, reducing API calls by 80-90%.