Overview
The ResearchOrchestrator manages concurrent research agents that gather intelligence from social platforms, search engines, and OSINT sources.Architecture
Two-Phase Strategy
The orchestrator runs research in two phases:Phase 1: Static Agents + Exa
All agents run in parallel with a shared timeout:backend/agents/orchestrator.py
# Phase 1: Launch Exa + all static agents in parallel
exa_req = EnrichmentRequest(name=request.person_name, company=request.company)
exa_task = asyncio.create_task(self._exa.enrich_person(exa_req), name="exa")
tasks: dict[str, asyncio.Task] = {"exa": exa_task}
for agent in self._static_agents:
task = asyncio.create_task(agent.run(request), name=agent.agent_name)
tasks[agent.agent_name] = task
# Wait for all with the overall timeout
done, pending = await asyncio.wait(
tasks.values(),
timeout=timeout,
)
# Cancel any stragglers
for task in pending:
task.cancel()
Phase 2: Dynamic Scrapers
After Phase 1 completes, spawn URL scrapers for high-value URLs discovered by Exa:backend/agents/orchestrator.py
# Phase 2: Dynamic scrapers for high-value URLs from Exa
remaining_time = timeout - (time.monotonic() - start)
if exa_result and exa_result.success and remaining_time > 15:
dynamic_results = await self._run_dynamic_scrapers(
request, exa_result.hits, remaining_time,
)
for name, result in dynamic_results.items():
agent_results[name] = result
if result.status == AgentStatus.SUCCESS:
all_profiles.extend(result.profiles)
all_snippets.extend(result.snippets)
ResearchOrchestrator Class
backend/agents/orchestrator.py
class ResearchOrchestrator:
"""Two-phase orchestrator: static agents + dynamic URL scrapers.
Creates a shared InboxPool so all agents get pre-warmed email
addresses for login-wall bypass without on-demand API latency.
"""
def __init__(self, settings: Settings, *, pool_size: int = 10):
self._settings = settings
self._exa = ExaEnrichmentClient(settings)
# Create shared inbox pool if AgentMail is configured
self._inbox_pool: InboxPool | None = None
if settings.agentmail_api_key:
mail_client = AgentMailClient(api_key=settings.agentmail_api_key)
self._inbox_pool = InboxPool(mail_client, pool_size=pool_size)
logger.info("orchestrator: inbox pool created, pool_size={}", pool_size)
self._static_agents: list[BaseBrowserAgent] = [
LinkedInAgent(settings, inbox_pool=self._inbox_pool),
TwitterAgent(settings, inbox_pool=self._inbox_pool),
InstagramAgent(settings, inbox_pool=self._inbox_pool),
GoogleAgent(settings, inbox_pool=self._inbox_pool),
OsintAgent(settings, inbox_pool=self._inbox_pool),
SocialAgent(settings, inbox_pool=self._inbox_pool),
]
darkweb = DarkwebAgent(settings, inbox_pool=self._inbox_pool)
if darkweb.configured:
self._static_agents.append(darkweb)
Research Execution
Main Entry Point
backend/agents/orchestrator.py
@traced("orchestrator.research_person")
async def research_person(self, request: ResearchRequest) -> OrchestratorResult:
"""Run Exa + static agents in parallel, then spawn dynamic scrapers for discovered URLs."""
start = time.monotonic()
timeout = request.timeout_seconds or 90.0
logger.info(
"orchestrator starting research for person={} agents={} timeout={}s",
request.person_name,
self.agent_names,
timeout,
)
Result Collection
backend/agents/orchestrator.py
# Collect results from static agents
agent_results: dict[str, AgentResult] = {}
all_profiles: list[SocialProfile] = []
all_snippets: list[str] = []
# Handle Exa result
exa_result = None
if exa_task in done and not exa_task.exception():
exa_result = exa_task.result()
if exa_result.success:
for hit in exa_result.hits:
all_snippets.append(f"[Exa] {hit.title}: {hit.snippet[:200]}" if hit.snippet else f"[Exa] {hit.title}")
agent_results["exa"] = AgentResult(
agent_name="exa",
status=AgentStatus.SUCCESS,
snippets=[h.snippet or h.title for h in exa_result.hits],
urls_found=[h.url for h in exa_result.hits],
duration_seconds=time.monotonic() - start,
completed_at=datetime.now(UTC),
)
Error Handling
backend/agents/orchestrator.py
for agent_name, task in tasks.items():
if agent_name == "exa":
continue
if task in pending:
elapsed_agent = time.monotonic() - start
logger.warning(
"orchestrator agent={} cancelled (overall timeout) elapsed={:.1f}s",
agent_name, elapsed_agent,
)
result = AgentResult(
agent_name=agent_name,
status=AgentStatus.TIMEOUT,
error="Cancelled by orchestrator timeout",
duration_seconds=elapsed_agent,
completed_at=datetime.now(UTC),
)
elif task.exception():
exc = task.exception()
logger.error(
"orchestrator agent={} raised: {}",
agent_name, str(exc),
)
result = AgentResult(
agent_name=agent_name,
status=AgentStatus.FAILED,
error=f"Unhandled exception: {exc}",
completed_at=datetime.now(UTC),
)
else:
result = task.result()
agent_results[agent_name] = result
Dynamic URL Scrapers
Scraper Selection
Only scrape URLs not already covered by static agents:backend/agents/orchestrator.py
COVERED_DOMAINS = frozenset({
"linkedin.com", "www.linkedin.com",
"twitter.com", "x.com", "www.x.com",
"instagram.com", "www.instagram.com",
"google.com", "www.google.com",
"youtube.com", "www.youtube.com",
"facebook.com", "www.facebook.com",
})
MAX_DYNAMIC_SCRAPERS = 3
Scraper Spawning
backend/agents/orchestrator.py
async def _run_dynamic_scrapers(
self,
request: ResearchRequest,
hits: list,
timeout: float,
) -> dict[str, AgentResult]:
"""Spawn URL scrapers for high-value URLs not covered by static agents."""
urls_to_scrape: list[str] = []
for hit in hits:
if not hit.url:
continue
domain = urlparse(hit.url).netloc.lower()
if domain in COVERED_DOMAINS:
continue
# Skip generic pages (Wikipedia, news sites have low per-person value)
if any(skip in domain for skip in ("wikipedia.org", "wikidata.org")):
continue
urls_to_scrape.append(hit.url)
if not urls_to_scrape:
return {}
urls_to_scrape = urls_to_scrape[:MAX_DYNAMIC_SCRAPERS]
logger.info(
"orchestrator spawning {} dynamic scrapers: {}",
len(urls_to_scrape),
urls_to_scrape,
)
scraper_tasks: dict[str, asyncio.Task] = {}
for url in urls_to_scrape:
scraper = UrlScraperAgent(
self._settings, url=url, source="exa", inbox_pool=self._inbox_pool,
)
task = asyncio.create_task(
scraper.run(request),
name=scraper.agent_name,
)
scraper_tasks[scraper.agent_name] = task
Agent Inbox Pool
The orchestrator maintains a pool of temporary email addresses for login-wall bypass:backend/agents/orchestrator.py
# Create shared inbox pool if AgentMail is configured
self._inbox_pool: InboxPool | None = None
if settings.agentmail_api_key:
mail_client = AgentMailClient(api_key=settings.agentmail_api_key)
self._inbox_pool = InboxPool(mail_client, pool_size=pool_size)
logger.info("orchestrator: inbox pool created, pool_size={}", pool_size)
Pool Cleanup
backend/agents/orchestrator.py
# Clean up inbox pool (delete temporary inboxes to free concurrent slots)
if self._inbox_pool:
self._inbox_pool.release_all()
API Integration
Start Research Endpoint
backend/main.py
@app.post("/api/agents/research", response_model=AgentStartResponse)
async def start_research(req: AgentStartRequest) -> AgentStartResponse:
"""Spawn Browser Use sessions/tasks per source type. Returns immediately."""
agents: list[AgentInfo] = []
for source_key in req.sources:
cfg = SOURCE_CONFIGS.get(source_key)
if not cfg:
logger.warning("Unknown source type: {}", source_key)
continue
try:
session = await bu_client.create_session(start_url=cfg["start_url"])
session_id = session["id"]
prompt = cfg["prompt"].replace("{name}", req.person_name)
task = await bu_client.create_task(
session_id=session_id,
task=prompt,
start_url=cfg["start_url"],
)
agents.append(AgentInfo(
source_tp=cfg["tp"],
source_nm=cfg["nm"],
session_id=session_id,
task_id=task["id"],
live_url=session.get("liveUrl"),
session_status="running",
))
except BrowserUseError as e:
logger.error("Failed to create agent for {}: {}", source_key, e)
continue
return AgentStartResponse(person_id=req.person_id, agents=agents)
Session Status Endpoint
backend/main.py
@app.get("/api/agents/sessions/{session_id}", response_model=SessionStatusResponse)
async def get_session_status(session_id: str) -> SessionStatusResponse:
"""Proxy Browser Use session + task status for frontend polling."""
try:
session = await bu_client.get_session(session_id)
except BrowserUseError as e:
logger.error("Failed to get session {}: {}", session_id, e)
return SessionStatusResponse(session_id=session_id, session_status="failed")
session_status = _map_bu_status(session.get("status"))
live_url = session.get("liveUrl")
share_url = session.get("publicShareUrl") or _share_url_cache.get(session_id)
# On first completed fetch, create public share for replay
if session_status == "completed" and not share_url and session_id not in _share_url_cache:
try:
share_data = await bu_client.make_session_public(session_id)
share_url = share_data.get("shareUrl")
if share_url:
_share_url_cache[session_id] = share_url
except BrowserUseError:
logger.warning("Could not create public share for session {}", session_id)
return SessionStatusResponse(
session_id=session_id,
session_status=session_status,
live_url=live_url,
share_url=share_url,
task=task_info,
)
Source Configuration
Agent prompts are configured per source:backend/main.py
SOURCE_CONFIGS: dict[str, dict[str, str]] = {
"linkedin": {
"tp": "SOCIAL",
"nm": "LinkedIn Profile",
"prompt": (
"Search LinkedIn for '{name}'. Navigate to their profile. "
"Extract: current role, company, work history (last 3 positions), "
"education, notable connections, and recent posts."
),
"start_url": "https://linkedin.com",
},
"twitter": {
"tp": "SOCIAL",
"nm": "Twitter/X Activity",
"prompt": (
"Search Twitter/X for '{name}'. Find their profile. "
"Extract: bio, follower count, recent tweets (last 10), "
"and accounts they interact with most."
),
"start_url": "https://twitter.com",
},
# ... additional sources
}
Observability
The orchestrator is instrumented with Laminar tracing:backend/agents/orchestrator.py
@traced("orchestrator.research_person")
async def research_person(self, request: ResearchRequest) -> OrchestratorResult:
# ... implementation
Performance Tuning
Timeout
Default: 90s (configurable per request)
Max Scrapers
3 dynamic scrapers max (prevents browser pool exhaustion)
Inbox Pool
10 pre-warmed emails (reduces login latency)
Concurrent Agents
6+ static agents run in parallel
Next Steps
Synthesis Engine
See how research results become intelligence reports
Browser Use Client
Learn about the Browser Use integration