Skip to main content
POST
/
research
/
batch
/
stream
Streaming Research
curl --request POST \
  --url https://api.example.com/research/batch/stream \
  --header 'Content-Type: application/json' \
  --data '
{
  "research_goal": "<string>",
  "company_domains": [
    {}
  ],
  "search_depth": {},
  "max_parallel_searches": 123,
  "confidence_threshold": 123
}
'
{
  "connected": {},
  "heartbeat": {},
  "disconnected": {},
  "pipeline_start": {},
  "evidence_progress": {},
  "evidence_complete": {},
  "analysis_start": {},
  "domain_analyzed": {},
  "pipeline_complete": {},
  "completed": {},
  "error": {},
  "stream_error": {}
}

Overview

The streaming research endpoint provides real-time progress updates using Server-Sent Events (SSE). This is ideal for:
  • Interactive dashboards with live progress bars
  • Real-time UI updates during long-running research
  • Monitoring research pipeline execution
  • Early access to partial results
For simple request/response workflows, use the Batch Research endpoint instead.

Endpoint

POST /research/batch/stream

Request Body

The request body is identical to the Batch Research endpoint:
research_goal
string
required
The high-level research objective.
company_domains
array
required
List of company domains to analyze.
search_depth
enum
required
Controls research breadth: quick, standard, or comprehensive.
max_parallel_searches
integer
required
Maximum concurrent searches (5-50).
confidence_threshold
float
required
Minimum confidence score (0.0-1.0).

Response Format

The endpoint returns a stream of Server-Sent Events with the following headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Access-Control-Allow-Origin: *
X-Accel-Buffering: no
Each event follows the SSE protocol:
id: {event_id}
event: {event_type}
data: {json_payload}

Event Types

The stream emits various event types throughout the research pipeline:

Connection Events

connected
event
Initial connection established. Always the first event.
{
  "message": "Research pipeline connected",
  "timestamp": 1709856000.123
}
heartbeat
event
Periodic heartbeat to keep connection alive (every 30 seconds).
{
  "timestamp": 1709856030.456,
  "message": "Connection alive"
}
disconnected
event
Connection closed (normal or abnormal).
{
  "message": "Client disconnected",
  "timestamp": 1709856100.789
}

Pipeline Events

pipeline_start
event
Research pipeline has started evidence collection.
{
  "type": "pipeline_start",
  "message": "Starting evidence collection",
  "domains": ["stripe.com", "paypal.com"],
  "total_strategies": 12,
  "timestamp": 1709856001.234
}
evidence_progress
event
Evidence collection progress update (sent at 25%, 50%, 75%, 100%).
{
  "type": "evidence_progress",
  "progress": 50,
  "completed": 12,
  "total": 24,
  "domains_with_evidence": 2,
  "timestamp": 1709856015.567
}
evidence_complete
event
Evidence collection finished for all companies.
{
  "type": "evidence_complete",
  "message": "Evidence collection finished",
  "total_evidence": 47,
  "domains_with_evidence": 2,
  "timestamp": 1709856025.890
}

Analysis Events

analysis_start
event
Domain analysis phase has started.
{
  "type": "analysis_start",
  "message": "Starting domain analysis",
  "domains_to_analyze": 2,
  "timestamp": 1709856026.123
}
domain_analyzed
event
A single domain has been analyzed. Emitted for each company as analysis completes.
{
  "type": "domain_analyzed",
  "domain": "stripe.com",
  "confidence": 0.92,
  "evidence_count": 23,
  "technologies": ["tensorflow", "python", "kubernetes"],
  "progress": 1,
  "total": 2,
  "timestamp": 1709856032.456
}

Completion Events

pipeline_complete
event
Research pipeline completed successfully. Contains full results.
{
  "type": "pipeline_complete",
  "message": "Research pipeline completed",
  "summary": {
    "total_domains": 2,
    "domains_analyzed": 2,
    "high_confidence_matches": 2,
    "avg_confidence": 0.88,
    "total_evidence": 47,
    "processing_time": 35.2
  },
  "results": [...],
  "high_confidence_results": [...],
  "metrics": {
    "queries_per_second": 18.5,
    "failed_requests": 2
  },
  "timestamp": 1709856040.789
}
completed
event
Stream has ended successfully.
{
  "message": "Research pipeline completed successfully",
  "total_events": 12,
  "timestamp": 1709856041.012
}

Error Events

error
event
An error occurred during processing.
{
  "message": "Failed to parse event: Invalid JSON",
  "recoverable": true,
  "timestamp": 1709856020.345
}
stream_error
event
A fatal error occurred and the stream is ending.
{
  "message": "Stream ended due to error",
  "error": "API key invalid",
  "timestamp": 1709856021.678
}

Example Request

curl -N -X POST http://localhost:8000/research/batch/stream \
  -H "Content-Type: application/json" \
  -d '{
    "research_goal": "Find fintech companies using AI for fraud detection",
    "company_domains": ["stripe.com", "paypal.com"],
    "search_depth": "standard",
    "max_parallel_searches": 20,
    "confidence_threshold": 0.7
  }'

Example Event Stream

Here’s what a complete event stream looks like:
id: 0
retry: 1000
event: connected
data: {"message":"Research pipeline connected","timestamp":1709856000.123}

event: pipeline_start
data: {"type":"pipeline_start","message":"Starting evidence collection","domains":["stripe.com","paypal.com"],"total_strategies":12,"timestamp":1709856001.234}

event: evidence_progress
data: {"type":"evidence_progress","progress":25,"completed":6,"total":24,"domains_with_evidence":2,"timestamp":1709856008.567}

event: evidence_progress
data: {"type":"evidence_progress","progress":50,"completed":12,"total":24,"domains_with_evidence":2,"timestamp":1709856015.890}

event: evidence_progress
data: {"type":"evidence_progress","progress":75,"completed":18,"total":24,"domains_with_evidence":2,"timestamp":1709856022.123}

event: evidence_complete
data: {"type":"evidence_complete","message":"Evidence collection finished","total_evidence":47,"domains_with_evidence":2,"timestamp":1709856025.456}

event: analysis_start
data: {"type":"analysis_start","message":"Starting domain analysis","domains_to_analyze":2,"timestamp":1709856026.789}

event: domain_analyzed
data: {"type":"domain_analyzed","domain":"stripe.com","confidence":0.92,"evidence_count":23,"technologies":["tensorflow","python","kubernetes"],"progress":1,"total":2,"timestamp":1709856032.012}

event: domain_analyzed
data: {"type":"domain_analyzed","domain":"paypal.com","confidence":0.85,"evidence_count":24,"technologies":["deep-learning","java","scala"],"progress":2,"total":2,"timestamp":1709856038.345}

event: pipeline_complete
data: {"type":"pipeline_complete","message":"Research pipeline completed","summary":{"total_domains":2,"domains_analyzed":2,"high_confidence_matches":2,"avg_confidence":0.88,"total_evidence":47,"processing_time":35.2},"results":[...],"timestamp":1709856040.678}

event: completed
data: {"message":"Research pipeline completed successfully","total_events":12,"timestamp":1709856041.901}

Integration Examples

React Progress Dashboard

React
import { useState, useEffect } from 'react';

function ResearchDashboard() {
  const [progress, setProgress] = useState(0);
  const [results, setResults] = useState([]);
  const [status, setStatus] = useState('idle');

  const startResearch = async () => {
    setStatus('running');
    
    const response = await fetch('http://localhost:8000/research/batch/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        research_goal: 'Find fintech companies using AI',
        company_domains: ['stripe.com', 'paypal.com'],
        search_depth: 'standard',
        max_parallel_searches: 20,
        confidence_threshold: 0.7,
      }),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const events = chunk.split('\n\n').filter(e => e.trim());

      for (const event of events) {
        const lines = event.split('\n');
        let eventType, eventData;

        for (const line of lines) {
          if (line.startsWith('event: ')) eventType = line.slice(7);
          if (line.startsWith('data: ')) eventData = JSON.parse(line.slice(6));
        }

        if (eventType === 'evidence_progress') {
          setProgress(eventData.progress);
        } else if (eventType === 'domain_analyzed') {
          setResults(prev => [...prev, eventData]);
        } else if (eventType === 'pipeline_complete') {
          setStatus('complete');
        }
      }
    }
  };

  return (
    <div>
      <button onClick={startResearch} disabled={status === 'running'}>
        Start Research
      </button>
      
      {status === 'running' && (
        <div>
          <div className="progress-bar">
            <div style={{ width: `${progress}%` }} />
          </div>
          <p>Progress: {progress}%</p>
        </div>
      )}
      
      <div>
        {results.map(result => (
          <div key={result.domain}>
            <h3>{result.domain}</h3>
            <p>Confidence: {result.confidence}</p>
            <p>Technologies: {result.technologies.join(', ')}</p>
          </div>
        ))}
      </div>
    </div>
  );
}

Python Real-Time Processing

import requests
import json
from typing import Generator

def stream_research(payload: dict) -> Generator[dict, None, None]:
    """Stream research events and yield parsed data."""
    url = 'http://localhost:8000/research/batch/stream'
    response = requests.post(url, json=payload, stream=True)
    
    event_type = None
    
    for line in response.iter_lines():
        if not line:
            continue
            
        decoded = line.decode('utf-8')
        
        if decoded.startswith('event: '):
            event_type = decoded[7:]
        elif decoded.startswith('data: '):
            data = json.loads(decoded[6:])
            yield {'type': event_type, 'data': data}

# Usage
payload = {
    'research_goal': 'Find AI startups',
    'company_domains': ['company1.com', 'company2.com'],
    'search_depth': 'standard',
    'max_parallel_searches': 20,
    'confidence_threshold': 0.7,
}

for event in stream_research(payload):
    if event['type'] == 'domain_analyzed':
        domain = event['data']['domain']
        confidence = event['data']['confidence']
        print(f"✓ {domain} - {confidence*100}% match")
    elif event['type'] == 'pipeline_complete':
        summary = event['data']['summary']
        print(f"\nComplete: {summary['high_confidence_matches']} matches found")
        break

Error Handling

Always implement proper error handling for connection failures and stream interruptions.
const eventSource = new EventSource(url);

eventSource.addEventListener('error', (event) => {
  if (event.data) {
    const error = JSON.parse(event.data);
    console.error('Stream error:', error.message);
    
    if (!error.recoverable) {
      eventSource.close();
      showErrorToUser('Research failed. Please try again.');
    }
  } else {
    // Connection error
    console.error('Connection lost');
    eventSource.close();
  }
});

// Reconnection logic
let reconnectAttempts = 0;
const maxReconnects = 3;

eventSource.addEventListener('disconnected', () => {
  if (reconnectAttempts < maxReconnects) {
    reconnectAttempts++;
    console.log(`Reconnecting (${reconnectAttempts}/${maxReconnects})...`);
    setTimeout(() => startStream(), 2000 * reconnectAttempts);
  }
});

Performance Considerations

Client-Side Buffering

For high-throughput streams, implement client-side buffering:
const eventBuffer = [];
const BATCH_SIZE = 10;

eventSource.addEventListener('domain_analyzed', (event) => {
  eventBuffer.push(JSON.parse(event.data));
  
  if (eventBuffer.length >= BATCH_SIZE) {
    updateUI(eventBuffer);
    eventBuffer.length = 0;
  }
});

Connection Keep-Alive

The server sends heartbeats every 30 seconds. Configure client timeouts accordingly:
import requests

response = requests.post(
    url,
    json=payload,
    stream=True,
    timeout=(5, 60)  # (connect timeout, read timeout)
)

Next Steps

Batch Research

Learn about the synchronous batch endpoint

API Overview

Review API fundamentals and architecture

Build docs developers (and LLMs) love