Overview
The GTM Research Engine provides real-time streaming of research progress using Server-Sent Events (SSE). Watch as evidence is collected, domains are analyzed, and confidence scores are calculated - all streamed live to your client.
Streaming Architecture
The streaming system uses FastAPI’s StreamingResponse with proper SSE formatting:
# From routes.py:108-118
return StreamingResponse(
generator,
media_type = "text/event-stream" ,
headers = {
"Cache-Control" : "no-cache" ,
"Connection" : "keep-alive" ,
"Access-Control-Allow-Origin" : "*" ,
"Access-Control-Allow-Headers" : "Cache-Control" ,
"X-Accel-Buffering" : "no" , # Disable nginx buffering
}
)
Streaming Endpoint
POST /api/research/batch/stream
Content-Type : application/json
{
"research_goal" : "Find fintech companies using AI for fraud detection" ,
"company_domains" : [ "stripe.com" , "paypal.com" , "adyen.com" ],
"search_depth" : "standard" ,
"max_parallel_searches" : 20 ,
"confidence_threshold" : 0.7
}
Event Types
The pipeline emits different event types throughout the research process:
1. Pipeline Start
Fired when research begins:
# From pipeline.py:235-241
yield orjson.dumps({
"type" : "pipeline_start" ,
"message" : "Starting evidence collection" ,
"domains" : self .company_domains,
"total_strategies" : len ( self .strategies),
"timestamp" : time.time()
}).decode()
Example event:
{
"type" : "pipeline_start" ,
"message" : "Starting evidence collection" ,
"domains" : [ "stripe.com" , "paypal.com" ],
"total_strategies" : 9 ,
"timestamp" : 1709856234.123
}
2. Evidence Progress
Sent at 25%, 50%, 75%, and 100% completion milestones:
# From pipeline.py:256-278
progress_updates = [ 25 , 50 , 75 , 100 ]
for coro in asyncio.as_completed(tasks):
domain, res = await coro
completed_count += 1
if res.ok and res.evidences:
domain_to_evidence[domain].extend(res.evidences)
progress_percent = int ((completed_count / total_planned) * 100 )
if progress_percent in progress_updates:
progress_updates.remove(progress_percent)
yield orjson.dumps({
"type" : "evidence_progress" ,
"progress" : progress_percent,
"completed" : completed_count,
"total" : total_planned,
"domains_with_evidence" : len (domain_to_evidence),
"timestamp" : time.time()
}).decode()
Example event:
{
"type" : "evidence_progress" ,
"progress" : 50 ,
"completed" : 9 ,
"total" : 18 ,
"domains_with_evidence" : 2 ,
"timestamp" : 1709856235.456
}
3. Evidence Complete
Fired when all evidence collection finishes:
# From pipeline.py:281-287
yield orjson.dumps({
"type" : "evidence_complete" ,
"message" : "Evidence collection finished" ,
"total_evidence" : sum ( len (evidences) for evidences in domain_to_evidence.values()),
"domains_with_evidence" : len (domain_to_evidence),
"timestamp" : time.time()
}).decode()
Example event:
{
"type" : "evidence_complete" ,
"message" : "Evidence collection finished" ,
"total_evidence" : 24 ,
"domains_with_evidence" : 2 ,
"timestamp" : 1709856236.789
}
4. Analysis Start
Marks the beginning of LLM analysis:
# From pipeline.py:290-295
yield orjson.dumps({
"type" : "analysis_start" ,
"message" : "Starting domain analysis" ,
"domains_to_analyze" : len (domain_to_evidence),
"timestamp" : time.time()
}).decode()
5. Domain Analyzed
Streamed as each domain completes analysis:
# From pipeline.py:327-343
for task_coro in asyncio.as_completed(analysis_tasks):
domain, result = await task_coro
results.append(result)
domains_analyzed += 1
yield orjson.dumps({
"type" : "domain_analyzed" ,
"domain" : domain,
"confidence" : result.confidence_score,
"evidence_count" : len (result.findings.evidence),
"technologies" : result.findings.technologies,
"progress" : domains_analyzed,
"total" : total_domains,
"timestamp" : time.time()
}).decode()
Example event:
{
"type" : "domain_analyzed" ,
"domain" : "stripe.com" ,
"confidence" : 0.92 ,
"evidence_count" : 12 ,
"technologies" : [ "TensorFlow" , "Python" , "Kubernetes" ],
"progress" : 1 ,
"total" : 2 ,
"timestamp" : 1709856238.123
}
6. Pipeline Complete
Final event with complete results:
# From pipeline.py:353-368
yield orjson.dumps({
"type" : "pipeline_complete" ,
"message" : "Research pipeline completed" ,
"summary" : {
"total_domains" : len ( self .company_domains),
"domains_analyzed" : len (results),
"high_confidence_matches" : len (high_confidence_results),
"avg_confidence" : round ( sum (r.confidence_score for r in results) / len (results), 2 ),
"total_evidence" : sum ( len (r.findings.evidence) for r in results),
"processing_time" : round (time.time() - start_time, 2 )
},
"results" : [r.model_dump() for r in results],
"high_confidence_results" : [r.model_dump() for r in high_confidence_results],
"metrics" : self .metrics.to_dict(),
"timestamp" : time.time()
}).decode()
Client Implementation
JavaScript/TypeScript
const eventSource = new EventSource (
'/api/research/batch/stream' ,
{
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({
research_goal: "Find AI companies" ,
company_domains: [ "openai.com" , "anthropic.com" ],
search_depth: "standard" ,
max_parallel_searches: 20 ,
confidence_threshold: 0.7
})
}
);
// Handle different event types
eventSource . addEventListener ( 'message' , ( event ) => {
const data = JSON . parse ( event . data );
switch ( data . type ) {
case 'pipeline_start' :
console . log ( `🚀 Starting research on ${ data . domains . length } domains` );
break ;
case 'evidence_progress' :
console . log ( `📊 Progress: ${ data . progress } % ( ${ data . completed } / ${ data . total } )` );
updateProgressBar ( data . progress );
break ;
case 'domain_analyzed' :
console . log ( `✅ ${ data . domain } : ${ data . confidence } confidence` );
displayResult ( data );
break ;
case 'pipeline_complete' :
console . log ( '🎉 Research complete!' );
console . log ( data . summary );
eventSource . close ();
break ;
}
});
eventSource . addEventListener ( 'error' , ( error ) => {
console . error ( 'Stream error:' , error );
eventSource . close ();
});
Python
import httpx
import json
url = "http://localhost:8000/research/batch/stream"
payload = {
"research_goal" : "Find AI companies" ,
"company_domains" : [ "openai.com" ],
"search_depth" : "standard" ,
"max_parallel_searches" : 20 ,
"confidence_threshold" : 0.7
}
async with httpx.AsyncClient( timeout = 300.0 ) as client:
async with client.stream(
"POST" ,
url,
json = payload,
headers = { "Accept" : "text/event-stream" }
) as response:
async for line in response.aiter_lines():
if line.startswith( "data: " ):
event_data = json.loads(line[ 6 :])
if event_data[ "type" ] == "domain_analyzed" :
print ( f "✅ { event_data[ 'domain' ] } : { event_data[ 'confidence' ] } " )
elif event_data[ "type" ] == "pipeline_complete" :
print ( "Research complete!" )
print ( f "Results: { len (event_data[ 'results' ]) } " )
break
cURL
curl -N -X POST http://localhost:8000/research/batch/stream \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{
"research_goal": "Find AI companies",
"company_domains": ["openai.com"],
"search_depth": "standard",
"max_parallel_searches": 20,
"confidence_threshold": 0.7
}'
The -N flag disables buffering so you see events in real-time.
Optimized Event Frequency
The streaming system balances real-time updates with performance:
Evidence Collection
Only sends updates at key milestones (25%, 50%, 75%, 100%) instead of every query:
# From pipeline.py:256-257
progress_updates = [ 25 , 50 , 75 , 100 ] # Send updates at these percentages
This reduces event noise from potentially hundreds of queries to just 4 progress events.
Domain Analysis
Streams every completed domain since analysis is slower and users want immediate feedback:
# From pipeline.py:333-342
yield orjson.dumps({
"type" : "domain_analyzed" ,
"domain" : domain,
"confidence" : result.confidence_score,
"evidence_count" : len (result.findings.evidence),
"technologies" : result.findings.technologies,
"progress" : domains_analyzed,
"total" : total_domains,
"timestamp" : time.time()
}).decode()
Heartbeat & Connection Management
The ResearchStreamGenerator manages connection health:
# From routes.py:104
stream_generator = ResearchStreamGenerator( heartbeat_interval = 30 )
Heartbeats prevent connection timeouts during long-running operations.
Async Domain Processing
Domains are analyzed in parallel for maximum performance:
# From pipeline.py:317-320
analysis_tasks = []
for domain, evidences in domain_to_evidence.items():
task = asyncio.create_task(analyze_with_domain(domain, evidences))
analysis_tasks.append(task)
Results stream as they complete:
# From pipeline.py:327-328
for task_coro in asyncio.as_completed(analysis_tasks):
domain, result = await task_coro
This means faster domains appear in the stream first, even if slower domains are still processing.
Error Handling
The streaming endpoint handles failures gracefully:
# From routes.py:120-121
except Exception as e:
raise HTTPException( status_code = 500 , detail = f "Failed to start streaming: { str (e) } " )
Individual source failures don’t break the stream - the pipeline continues with available data.
Memory Efficiency
Events are yielded one at a time, not buffered:
yield orjson.dumps({ ... }).decode() # Yields immediately
Latency
Typical event timing:
pipeline_start: Immediate (< 100ms)
evidence_progress: Every 25% (~1-2s intervals for standard depth)
evidence_complete: After all sources finish (~3-5s)
domain_analyzed: As each LLM analysis completes (~1-2s per domain)
pipeline_complete: After all analysis (~5-10s total)
Best Practices
Implement reconnection logic for production: let retries = 0 ;
const maxRetries = 3 ;
function connectStream () {
const eventSource = new EventSource ( '/api/research/batch/stream' );
eventSource . onerror = () => {
eventSource . close ();
if ( retries < maxRetries ) {
retries ++ ;
setTimeout ( connectStream , 1000 * retries );
}
};
}
Streaming can take minutes for large batches: async with httpx.AsyncClient( timeout = 300.0 ) as client:
async with client.stream( "POST" , url, json = payload) as response:
# ... handle events
Always validate event structure: eventSource . addEventListener ( 'message' , ( event ) => {
try {
const data = JSON . parse ( event . data );
if ( data . type && data . timestamp ) {
handleEvent ( data );
}
} catch ( error ) {
console . error ( 'Failed to parse event:' , error );
}
});
Close Connections Properly
Always close EventSource when done: eventSource . addEventListener ( 'message' , ( event ) => {
const data = JSON . parse ( event . data );
if ( data . type === 'pipeline_complete' ) {
eventSource . close (); // Important!
}
});
Example: Complete Event Flow
For a request with 2 domains and 9 strategies (18 total queries):
// Event 1 (t=0ms)
{ "type" : "pipeline_start" , "domains" : [ ... ], "total_strategies" : 9 }
// Event 2 (t=500ms, 25% complete)
{ "type" : "evidence_progress" , "progress" : 25 , "completed" : 4 , "total" : 18 }
// Event 3 (t=1000ms, 50% complete)
{ "type" : "evidence_progress" , "progress" : 50 , "completed" : 9 , "total" : 18 }
// Event 4 (t=1500ms, 75% complete)
{ "type" : "evidence_progress" , "progress" : 75 , "completed" : 13 , "total" : 18 }
// Event 5 (t=2000ms, 100% complete)
{ "type" : "evidence_progress" , "progress" : 100 , "completed" : 18 , "total" : 18 }
// Event 6 (t=2100ms)
{ "type" : "evidence_complete" , "total_evidence" : 24 , "domains_with_evidence" : 2 }
// Event 7 (t=2200ms)
{ "type" : "analysis_start" , "domains_to_analyze" : 2 }
// Event 8 (t=3500ms, first domain done)
{ "type" : "domain_analyzed" , "domain" : "stripe.com" , "confidence" : 0.92 , ... }
// Event 9 (t=4000ms, second domain done)
{ "type" : "domain_analyzed" , "domain" : "paypal.com" , "confidence" : 0.78 , ... }
// Event 10 (t=4100ms)
{ "type" : "pipeline_complete" , "summary" : { ... }, "results" : [ ... ]}
Parallel Processing Learn how async processing enables real-time streaming
AI-Powered Analysis Understand what happens during domain analysis