Skip to main content

Overview

The ResearchOrchestrator manages concurrent research agents that gather intelligence from social platforms, search engines, and OSINT sources.

Architecture

Two-Phase Strategy

The orchestrator runs research in two phases:

Phase 1: Static Agents + Exa

All agents run in parallel with a shared timeout:
backend/agents/orchestrator.py
# Phase 1: Launch Exa + all static agents in parallel
exa_req = EnrichmentRequest(name=request.person_name, company=request.company)
exa_task = asyncio.create_task(self._exa.enrich_person(exa_req), name="exa")

tasks: dict[str, asyncio.Task] = {"exa": exa_task}
for agent in self._static_agents:
    task = asyncio.create_task(agent.run(request), name=agent.agent_name)
    tasks[agent.agent_name] = task

# Wait for all with the overall timeout
done, pending = await asyncio.wait(
    tasks.values(),
    timeout=timeout,
)

# Cancel any stragglers
for task in pending:
    task.cancel()

Phase 2: Dynamic Scrapers

After Phase 1 completes, spawn URL scrapers for high-value URLs discovered by Exa:
backend/agents/orchestrator.py
# Phase 2: Dynamic scrapers for high-value URLs from Exa
remaining_time = timeout - (time.monotonic() - start)
if exa_result and exa_result.success and remaining_time > 15:
    dynamic_results = await self._run_dynamic_scrapers(
        request, exa_result.hits, remaining_time,
    )
    for name, result in dynamic_results.items():
        agent_results[name] = result
        if result.status == AgentStatus.SUCCESS:
            all_profiles.extend(result.profiles)
            all_snippets.extend(result.snippets)

ResearchOrchestrator Class

backend/agents/orchestrator.py
class ResearchOrchestrator:
    """Two-phase orchestrator: static agents + dynamic URL scrapers.

    Creates a shared InboxPool so all agents get pre-warmed email
    addresses for login-wall bypass without on-demand API latency.
    """

    def __init__(self, settings: Settings, *, pool_size: int = 10):
        self._settings = settings
        self._exa = ExaEnrichmentClient(settings)

        # Create shared inbox pool if AgentMail is configured
        self._inbox_pool: InboxPool | None = None
        if settings.agentmail_api_key:
            mail_client = AgentMailClient(api_key=settings.agentmail_api_key)
            self._inbox_pool = InboxPool(mail_client, pool_size=pool_size)
            logger.info("orchestrator: inbox pool created, pool_size={}", pool_size)

        self._static_agents: list[BaseBrowserAgent] = [
            LinkedInAgent(settings, inbox_pool=self._inbox_pool),
            TwitterAgent(settings, inbox_pool=self._inbox_pool),
            InstagramAgent(settings, inbox_pool=self._inbox_pool),
            GoogleAgent(settings, inbox_pool=self._inbox_pool),
            OsintAgent(settings, inbox_pool=self._inbox_pool),
            SocialAgent(settings, inbox_pool=self._inbox_pool),
        ]
        darkweb = DarkwebAgent(settings, inbox_pool=self._inbox_pool)
        if darkweb.configured:
            self._static_agents.append(darkweb)

Research Execution

Main Entry Point

backend/agents/orchestrator.py
@traced("orchestrator.research_person")
async def research_person(self, request: ResearchRequest) -> OrchestratorResult:
    """Run Exa + static agents in parallel, then spawn dynamic scrapers for discovered URLs."""
    start = time.monotonic()
    timeout = request.timeout_seconds or 90.0

    logger.info(
        "orchestrator starting research for person={} agents={} timeout={}s",
        request.person_name,
        self.agent_names,
        timeout,
    )

Result Collection

backend/agents/orchestrator.py
# Collect results from static agents
agent_results: dict[str, AgentResult] = {}
all_profiles: list[SocialProfile] = []
all_snippets: list[str] = []

# Handle Exa result
exa_result = None
if exa_task in done and not exa_task.exception():
    exa_result = exa_task.result()
    if exa_result.success:
        for hit in exa_result.hits:
            all_snippets.append(f"[Exa] {hit.title}: {hit.snippet[:200]}" if hit.snippet else f"[Exa] {hit.title}")
        agent_results["exa"] = AgentResult(
            agent_name="exa",
            status=AgentStatus.SUCCESS,
            snippets=[h.snippet or h.title for h in exa_result.hits],
            urls_found=[h.url for h in exa_result.hits],
            duration_seconds=time.monotonic() - start,
            completed_at=datetime.now(UTC),
        )

Error Handling

backend/agents/orchestrator.py
for agent_name, task in tasks.items():
    if agent_name == "exa":
        continue
    if task in pending:
        elapsed_agent = time.monotonic() - start
        logger.warning(
            "orchestrator agent={} cancelled (overall timeout) elapsed={:.1f}s",
            agent_name, elapsed_agent,
        )
        result = AgentResult(
            agent_name=agent_name,
            status=AgentStatus.TIMEOUT,
            error="Cancelled by orchestrator timeout",
            duration_seconds=elapsed_agent,
            completed_at=datetime.now(UTC),
        )
    elif task.exception():
        exc = task.exception()
        logger.error(
            "orchestrator agent={} raised: {}",
            agent_name, str(exc),
        )
        result = AgentResult(
            agent_name=agent_name,
            status=AgentStatus.FAILED,
            error=f"Unhandled exception: {exc}",
            completed_at=datetime.now(UTC),
        )
    else:
        result = task.result()

    agent_results[agent_name] = result

Dynamic URL Scrapers

Scraper Selection

Only scrape URLs not already covered by static agents:
backend/agents/orchestrator.py
COVERED_DOMAINS = frozenset({
    "linkedin.com", "www.linkedin.com",
    "twitter.com", "x.com", "www.x.com",
    "instagram.com", "www.instagram.com",
    "google.com", "www.google.com",
    "youtube.com", "www.youtube.com",
    "facebook.com", "www.facebook.com",
})

MAX_DYNAMIC_SCRAPERS = 3

Scraper Spawning

backend/agents/orchestrator.py
async def _run_dynamic_scrapers(
    self,
    request: ResearchRequest,
    hits: list,
    timeout: float,
) -> dict[str, AgentResult]:
    """Spawn URL scrapers for high-value URLs not covered by static agents."""
    urls_to_scrape: list[str] = []

    for hit in hits:
        if not hit.url:
            continue
        domain = urlparse(hit.url).netloc.lower()
        if domain in COVERED_DOMAINS:
            continue
        # Skip generic pages (Wikipedia, news sites have low per-person value)
        if any(skip in domain for skip in ("wikipedia.org", "wikidata.org")):
            continue
        urls_to_scrape.append(hit.url)

    if not urls_to_scrape:
        return {}

    urls_to_scrape = urls_to_scrape[:MAX_DYNAMIC_SCRAPERS]
    logger.info(
        "orchestrator spawning {} dynamic scrapers: {}",
        len(urls_to_scrape),
        urls_to_scrape,
    )

    scraper_tasks: dict[str, asyncio.Task] = {}
    for url in urls_to_scrape:
        scraper = UrlScraperAgent(
            self._settings, url=url, source="exa", inbox_pool=self._inbox_pool,
        )
        task = asyncio.create_task(
            scraper.run(request),
            name=scraper.agent_name,
        )
        scraper_tasks[scraper.agent_name] = task

Agent Inbox Pool

The orchestrator maintains a pool of temporary email addresses for login-wall bypass:
backend/agents/orchestrator.py
# Create shared inbox pool if AgentMail is configured
self._inbox_pool: InboxPool | None = None
if settings.agentmail_api_key:
    mail_client = AgentMailClient(api_key=settings.agentmail_api_key)
    self._inbox_pool = InboxPool(mail_client, pool_size=pool_size)
    logger.info("orchestrator: inbox pool created, pool_size={}", pool_size)

Pool Cleanup

backend/agents/orchestrator.py
# Clean up inbox pool (delete temporary inboxes to free concurrent slots)
if self._inbox_pool:
    self._inbox_pool.release_all()

API Integration

Start Research Endpoint

backend/main.py
@app.post("/api/agents/research", response_model=AgentStartResponse)
async def start_research(req: AgentStartRequest) -> AgentStartResponse:
    """Spawn Browser Use sessions/tasks per source type. Returns immediately."""
    agents: list[AgentInfo] = []
    for source_key in req.sources:
        cfg = SOURCE_CONFIGS.get(source_key)
        if not cfg:
            logger.warning("Unknown source type: {}", source_key)
            continue
        try:
            session = await bu_client.create_session(start_url=cfg["start_url"])
            session_id = session["id"]
            prompt = cfg["prompt"].replace("{name}", req.person_name)
            task = await bu_client.create_task(
                session_id=session_id,
                task=prompt,
                start_url=cfg["start_url"],
            )
            agents.append(AgentInfo(
                source_tp=cfg["tp"],
                source_nm=cfg["nm"],
                session_id=session_id,
                task_id=task["id"],
                live_url=session.get("liveUrl"),
                session_status="running",
            ))
        except BrowserUseError as e:
            logger.error("Failed to create agent for {}: {}", source_key, e)
            continue
    return AgentStartResponse(person_id=req.person_id, agents=agents)

Session Status Endpoint

backend/main.py
@app.get("/api/agents/sessions/{session_id}", response_model=SessionStatusResponse)
async def get_session_status(session_id: str) -> SessionStatusResponse:
    """Proxy Browser Use session + task status for frontend polling."""
    try:
        session = await bu_client.get_session(session_id)
    except BrowserUseError as e:
        logger.error("Failed to get session {}: {}", session_id, e)
        return SessionStatusResponse(session_id=session_id, session_status="failed")

    session_status = _map_bu_status(session.get("status"))
    live_url = session.get("liveUrl")
    share_url = session.get("publicShareUrl") or _share_url_cache.get(session_id)

    # On first completed fetch, create public share for replay
    if session_status == "completed" and not share_url and session_id not in _share_url_cache:
        try:
            share_data = await bu_client.make_session_public(session_id)
            share_url = share_data.get("shareUrl")
            if share_url:
                _share_url_cache[session_id] = share_url
        except BrowserUseError:
            logger.warning("Could not create public share for session {}", session_id)

    return SessionStatusResponse(
        session_id=session_id,
        session_status=session_status,
        live_url=live_url,
        share_url=share_url,
        task=task_info,
    )

Source Configuration

Agent prompts are configured per source:
backend/main.py
SOURCE_CONFIGS: dict[str, dict[str, str]] = {
    "linkedin": {
        "tp": "SOCIAL",
        "nm": "LinkedIn Profile",
        "prompt": (
            "Search LinkedIn for '{name}'. Navigate to their profile. "
            "Extract: current role, company, work history (last 3 positions), "
            "education, notable connections, and recent posts."
        ),
        "start_url": "https://linkedin.com",
    },
    "twitter": {
        "tp": "SOCIAL",
        "nm": "Twitter/X Activity",
        "prompt": (
            "Search Twitter/X for '{name}'. Find their profile. "
            "Extract: bio, follower count, recent tweets (last 10), "
            "and accounts they interact with most."
        ),
        "start_url": "https://twitter.com",
    },
    # ... additional sources
}

Observability

The orchestrator is instrumented with Laminar tracing:
backend/agents/orchestrator.py
@traced("orchestrator.research_person")
async def research_person(self, request: ResearchRequest) -> OrchestratorResult:
    # ... implementation

Performance Tuning

Timeout

Default: 90s (configurable per request)

Max Scrapers

3 dynamic scrapers max (prevents browser pool exhaustion)

Inbox Pool

10 pre-warmed emails (reduces login latency)

Concurrent Agents

6+ static agents run in parallel

Next Steps

Synthesis Engine

See how research results become intelligence reports

Browser Use Client

Learn about the Browser Use integration

Build docs developers (and LLMs) love