Skip to main content

Overview

The GTM Research Engine provides real-time streaming of research progress using Server-Sent Events (SSE). Watch as evidence is collected, domains are analyzed, and confidence scores are calculated - all streamed live to your client.

Streaming Architecture

The streaming system uses FastAPI’s StreamingResponse with proper SSE formatting:
# From routes.py:108-118
return StreamingResponse(
    generator, 
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "Access-Control-Allow-Origin": "*",
        "Access-Control-Allow-Headers": "Cache-Control",
        "X-Accel-Buffering": "no",  # Disable nginx buffering
    }
)

Streaming Endpoint

POST /api/research/batch/stream
Content-Type: application/json
{
  "research_goal": "Find fintech companies using AI for fraud detection",
  "company_domains": ["stripe.com", "paypal.com", "adyen.com"],
  "search_depth": "standard",
  "max_parallel_searches": 20,
  "confidence_threshold": 0.7
}

Event Types

The pipeline emits different event types throughout the research process:

1. Pipeline Start

Fired when research begins:
# From pipeline.py:235-241
yield orjson.dumps({
    "type": "pipeline_start",
    "message": "Starting evidence collection",
    "domains": self.company_domains,
    "total_strategies": len(self.strategies),
    "timestamp": time.time()
}).decode()
Example event:
{
  "type": "pipeline_start",
  "message": "Starting evidence collection",
  "domains": ["stripe.com", "paypal.com"],
  "total_strategies": 9,
  "timestamp": 1709856234.123
}

2. Evidence Progress

Sent at 25%, 50%, 75%, and 100% completion milestones:
# From pipeline.py:256-278
progress_updates = [25, 50, 75, 100]

for coro in asyncio.as_completed(tasks):
    domain, res = await coro
    completed_count += 1
    
    if res.ok and res.evidences:
        domain_to_evidence[domain].extend(res.evidences)
    
    progress_percent = int((completed_count / total_planned) * 100)
    if progress_percent in progress_updates:
        progress_updates.remove(progress_percent)
        
        yield orjson.dumps({
            "type": "evidence_progress",
            "progress": progress_percent,
            "completed": completed_count,
            "total": total_planned,
            "domains_with_evidence": len(domain_to_evidence),
            "timestamp": time.time()
        }).decode()
Example event:
{
  "type": "evidence_progress",
  "progress": 50,
  "completed": 9,
  "total": 18,
  "domains_with_evidence": 2,
  "timestamp": 1709856235.456
}

3. Evidence Complete

Fired when all evidence collection finishes:
# From pipeline.py:281-287
yield orjson.dumps({
    "type": "evidence_complete",
    "message": "Evidence collection finished",
    "total_evidence": sum(len(evidences) for evidences in domain_to_evidence.values()),
    "domains_with_evidence": len(domain_to_evidence),
    "timestamp": time.time()
}).decode()
Example event:
{
  "type": "evidence_complete",
  "message": "Evidence collection finished",
  "total_evidence": 24,
  "domains_with_evidence": 2,
  "timestamp": 1709856236.789
}

4. Analysis Start

Marks the beginning of LLM analysis:
# From pipeline.py:290-295
yield orjson.dumps({
    "type": "analysis_start", 
    "message": "Starting domain analysis",
    "domains_to_analyze": len(domain_to_evidence),
    "timestamp": time.time()
}).decode()

5. Domain Analyzed

Streamed as each domain completes analysis:
# From pipeline.py:327-343
for task_coro in asyncio.as_completed(analysis_tasks):
    domain, result = await task_coro
    results.append(result)
    domains_analyzed += 1
    
    yield orjson.dumps({
        "type": "domain_analyzed",
        "domain": domain,
        "confidence": result.confidence_score,
        "evidence_count": len(result.findings.evidence),
        "technologies": result.findings.technologies,
        "progress": domains_analyzed,
        "total": total_domains,
        "timestamp": time.time()
    }).decode()
Example event:
{
  "type": "domain_analyzed",
  "domain": "stripe.com",
  "confidence": 0.92,
  "evidence_count": 12,
  "technologies": ["TensorFlow", "Python", "Kubernetes"],
  "progress": 1,
  "total": 2,
  "timestamp": 1709856238.123
}

6. Pipeline Complete

Final event with complete results:
# From pipeline.py:353-368
yield orjson.dumps({
    "type": "pipeline_complete",
    "message": "Research pipeline completed",
    "summary": {
        "total_domains": len(self.company_domains),
        "domains_analyzed": len(results),
        "high_confidence_matches": len(high_confidence_results),
        "avg_confidence": round(sum(r.confidence_score for r in results) / len(results), 2),
        "total_evidence": sum(len(r.findings.evidence) for r in results),
        "processing_time": round(time.time() - start_time, 2)
    },
    "results": [r.model_dump() for r in results],
    "high_confidence_results": [r.model_dump() for r in high_confidence_results],
    "metrics": self.metrics.to_dict(),
    "timestamp": time.time()
}).decode()

Client Implementation

JavaScript/TypeScript

const eventSource = new EventSource(
  '/api/research/batch/stream',
  {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      research_goal: "Find AI companies",
      company_domains: ["openai.com", "anthropic.com"],
      search_depth: "standard",
      max_parallel_searches: 20,
      confidence_threshold: 0.7
    })
  }
);

// Handle different event types
eventSource.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  
  switch(data.type) {
    case 'pipeline_start':
      console.log(`🚀 Starting research on ${data.domains.length} domains`);
      break;
      
    case 'evidence_progress':
      console.log(`📊 Progress: ${data.progress}% (${data.completed}/${data.total})`);
      updateProgressBar(data.progress);
      break;
      
    case 'domain_analyzed':
      console.log(`✅ ${data.domain}: ${data.confidence} confidence`);
      displayResult(data);
      break;
      
    case 'pipeline_complete':
      console.log('🎉 Research complete!');
      console.log(data.summary);
      eventSource.close();
      break;
  }
});

eventSource.addEventListener('error', (error) => {
  console.error('Stream error:', error);
  eventSource.close();
});

Python

import httpx
import json

url = "http://localhost:8000/research/batch/stream"
payload = {
    "research_goal": "Find AI companies",
    "company_domains": ["openai.com"],
    "search_depth": "standard",
    "max_parallel_searches": 20,
    "confidence_threshold": 0.7
}

async with httpx.AsyncClient(timeout=300.0) as client:
    async with client.stream(
        "POST", 
        url, 
        json=payload,
        headers={"Accept": "text/event-stream"}
    ) as response:
        async for line in response.aiter_lines():
            if line.startswith("data: "):
                event_data = json.loads(line[6:])
                
                if event_data["type"] == "domain_analyzed":
                    print(f"✅ {event_data['domain']}: {event_data['confidence']}")
                    
                elif event_data["type"] == "pipeline_complete":
                    print("Research complete!")
                    print(f"Results: {len(event_data['results'])}")
                    break

cURL

curl -N -X POST http://localhost:8000/research/batch/stream \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "research_goal": "Find AI companies",
    "company_domains": ["openai.com"],
    "search_depth": "standard",
    "max_parallel_searches": 20,
    "confidence_threshold": 0.7
  }'
The -N flag disables buffering so you see events in real-time.

Optimized Event Frequency

The streaming system balances real-time updates with performance:

Evidence Collection

Only sends updates at key milestones (25%, 50%, 75%, 100%) instead of every query:
# From pipeline.py:256-257
progress_updates = [25, 50, 75, 100]  # Send updates at these percentages
This reduces event noise from potentially hundreds of queries to just 4 progress events.

Domain Analysis

Streams every completed domain since analysis is slower and users want immediate feedback:
# From pipeline.py:333-342
yield orjson.dumps({
    "type": "domain_analyzed",
    "domain": domain,
    "confidence": result.confidence_score,
    "evidence_count": len(result.findings.evidence),
    "technologies": result.findings.technologies,
    "progress": domains_analyzed,
    "total": total_domains,
    "timestamp": time.time()
}).decode()

Heartbeat & Connection Management

The ResearchStreamGenerator manages connection health:
# From routes.py:104
stream_generator = ResearchStreamGenerator(heartbeat_interval=30)
Heartbeats prevent connection timeouts during long-running operations.

Async Domain Processing

Domains are analyzed in parallel for maximum performance:
# From pipeline.py:317-320
analysis_tasks = []
for domain, evidences in domain_to_evidence.items():
    task = asyncio.create_task(analyze_with_domain(domain, evidences))
    analysis_tasks.append(task)
Results stream as they complete:
# From pipeline.py:327-328
for task_coro in asyncio.as_completed(analysis_tasks):
    domain, result = await task_coro
This means faster domains appear in the stream first, even if slower domains are still processing.

Error Handling

The streaming endpoint handles failures gracefully:
# From routes.py:120-121
except Exception as e:
    raise HTTPException(status_code=500, detail=f"Failed to start streaming: {str(e)}")
Individual source failures don’t break the stream - the pipeline continues with available data.

Performance Characteristics

Memory Efficiency

Events are yielded one at a time, not buffered:
yield orjson.dumps({...}).decode()  # Yields immediately

Latency

Typical event timing:
  • pipeline_start: Immediate (< 100ms)
  • evidence_progress: Every 25% (~1-2s intervals for standard depth)
  • evidence_complete: After all sources finish (~3-5s)
  • domain_analyzed: As each LLM analysis completes (~1-2s per domain)
  • pipeline_complete: After all analysis (~5-10s total)

Best Practices

Implement reconnection logic for production:
let retries = 0;
const maxRetries = 3;

function connectStream() {
  const eventSource = new EventSource('/api/research/batch/stream');
  
  eventSource.onerror = () => {
    eventSource.close();
    
    if (retries < maxRetries) {
      retries++;
      setTimeout(connectStream, 1000 * retries);
    }
  };
}
Streaming can take minutes for large batches:
async with httpx.AsyncClient(timeout=300.0) as client:
    async with client.stream("POST", url, json=payload) as response:
        # ... handle events
Always validate event structure:
eventSource.addEventListener('message', (event) => {
  try {
    const data = JSON.parse(event.data);
    if (data.type && data.timestamp) {
      handleEvent(data);
    }
  } catch (error) {
    console.error('Failed to parse event:', error);
  }
});
Always close EventSource when done:
eventSource.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  
  if (data.type === 'pipeline_complete') {
    eventSource.close();  // Important!
  }
});

Example: Complete Event Flow

For a request with 2 domains and 9 strategies (18 total queries):
// Event 1 (t=0ms)
{"type": "pipeline_start", "domains": [...], "total_strategies": 9}

// Event 2 (t=500ms, 25% complete)
{"type": "evidence_progress", "progress": 25, "completed": 4, "total": 18}

// Event 3 (t=1000ms, 50% complete)
{"type": "evidence_progress", "progress": 50, "completed": 9, "total": 18}

// Event 4 (t=1500ms, 75% complete)
{"type": "evidence_progress", "progress": 75, "completed": 13, "total": 18}

// Event 5 (t=2000ms, 100% complete)
{"type": "evidence_progress", "progress": 100, "completed": 18, "total": 18}

// Event 6 (t=2100ms)
{"type": "evidence_complete", "total_evidence": 24, "domains_with_evidence": 2}

// Event 7 (t=2200ms)
{"type": "analysis_start", "domains_to_analyze": 2}

// Event 8 (t=3500ms, first domain done)
{"type": "domain_analyzed", "domain": "stripe.com", "confidence": 0.92, ...}

// Event 9 (t=4000ms, second domain done)
{"type": "domain_analyzed", "domain": "paypal.com", "confidence": 0.78, ...}

// Event 10 (t=4100ms)
{"type": "pipeline_complete", "summary": {...}, "results": [...]}

Parallel Processing

Learn how async processing enables real-time streaming

AI-Powered Analysis

Understand what happens during domain analysis

Build docs developers (and LLMs) love