Skip to main content

Overview

The agent swarm orchestrator coordinates parallel research across multiple platforms and data sources. Once a person is identified, the system launches 6+ specialized Browser Use agents simultaneously to gather comprehensive intelligence. Location: backend/agents/orchestrator.py

Two-Phase Architecture

Identified Person (name from face search)

  ├─▶ Phase 1: Static Agents + API Enrichment (parallel)
  │   ├─▶ Exa API (200ms) → LinkedIn URL, company, web mentions
  │   ├─▶ LinkedIn Agent (2-5s) → Profile, work history, skills
  │   ├─▶ Twitter Agent (2-5s) → Bio, tweets, followers
  │   ├─▶ Instagram Agent (20-40s) → Posts, followers
  │   ├─▶ Google Agent (3-8s) → News, articles, mentions
  │   ├─▶ OSINT Agent (10-20s) → Username enumeration across 400+ sites
  │   ├─▶ Social Agent (5-10s) → Additional social profiles
  │   └─▶ Darkweb Agent (30-60s, optional) → Breach/leak data

  └─▶ Phase 2: Dynamic URL Scrapers (parallel)
      └─▶ Spawned for high-value URLs from Exa/Google
          └─▶ Skips domains already covered by static agents
          └─▶ Limited to 3 concurrent scrapers

  All results stream to Convex as they complete
  Frontend updates in real-time (live corkboard effect)

Implementation

Research Orchestrator

from backend/agents/orchestrator.py

class ResearchOrchestrator:
    """Two-phase orchestrator: static agents + dynamic URL scrapers."""
    
    def __init__(self, settings: Settings, pool_size: int = 10):
        self._settings = settings
        self._exa = ExaEnrichmentClient(settings)
        
        # Create shared inbox pool for login-wall bypass
        self._inbox_pool = None
        if settings.agentmail_api_key:
            mail_client = AgentMailClient(api_key=settings.agentmail_api_key)
            self._inbox_pool = InboxPool(mail_client, pool_size=pool_size)
        
        # Static agents (always run)
        self._static_agents = [
            LinkedInAgent(settings, inbox_pool=self._inbox_pool),
            TwitterAgent(settings, inbox_pool=self._inbox_pool),
            InstagramAgent(settings, inbox_pool=self._inbox_pool),
            GoogleAgent(settings, inbox_pool=self._inbox_pool),
            OsintAgent(settings, inbox_pool=self._inbox_pool),
            SocialAgent(settings, inbox_pool=self._inbox_pool),
        ]
        
        # Optional agents
        darkweb = DarkwebAgent(settings, inbox_pool=self._inbox_pool)
        if darkweb.configured:
            self._static_agents.append(darkweb)

Phase 1: Static Agents + Exa

1

Launch All Agents in Parallel

Create async tasks for Exa + all static agents
@traced("orchestrator.research_person")
async def research_person(self, request: ResearchRequest) -> OrchestratorResult:
    start = time.monotonic()
    timeout = request.timeout_seconds or 90.0
    
    # Phase 1: Launch Exa + all static agents
    exa_req = EnrichmentRequest(name=request.person_name, company=request.company)
    exa_task = asyncio.create_task(self._exa.enrich_person(exa_req), name="exa")
    
    tasks = {"exa": exa_task}
    for agent in self._static_agents:
        task = asyncio.create_task(agent.run(request), name=agent.agent_name)
        tasks[agent.agent_name] = task
2

Wait with Timeout

Wait for all tasks with a 90-second timeout
done, pending = await asyncio.wait(
    tasks.values(),
    timeout=timeout,
)

# Cancel stragglers
for task in pending:
    task.cancel()
3

Collect Results

Gather results from completed agents
agent_results = {}
all_profiles = []
all_snippets = []

for agent_name, task in tasks.items():
    if task in pending:
        # Timeout
        result = AgentResult(
            agent_name=agent_name,
            status=AgentStatus.TIMEOUT,
            error="Cancelled by orchestrator timeout",
        )
    elif task.exception():
        # Exception
        result = AgentResult(
            agent_name=agent_name,
            status=AgentStatus.FAILED,
            error=str(task.exception()),
        )
    else:
        # Success
        result = task.result()
    
    agent_results[agent_name] = result
    if result.status == AgentStatus.SUCCESS:
        all_profiles.extend(result.profiles)
        all_snippets.extend(result.snippets)

Phase 2: Dynamic URL Scrapers

After static agents complete, spawn specialized scrapers for high-value URLs discovered by Exa/Google.
async def _run_dynamic_scrapers(
    self,
    request: ResearchRequest,
    hits: list,
    timeout: float,
) -> dict[str, AgentResult]:
    """Spawn URL scrapers for high-value URLs not covered by static agents."""
    
    # Filter URLs to scrape
    urls_to_scrape = []
    for hit in hits:
        if not hit.url:
            continue
        
        domain = urlparse(hit.url).netloc.lower()
        
        # Skip domains already covered by static agents
        if domain in COVERED_DOMAINS:
            continue
        
        # Skip low-value generic pages
        if any(skip in domain for skip in ("wikipedia.org", "wikidata.org")):
            continue
        
        urls_to_scrape.append(hit.url)
    
    if not urls_to_scrape:
        return {}
    
    # Limit to MAX_DYNAMIC_SCRAPERS (avoid burning too many cloud browsers)
    urls_to_scrape = urls_to_scrape[:MAX_DYNAMIC_SCRAPERS]
    
    # Spawn scrapers
    scraper_tasks = {}
    for url in urls_to_scrape:
        scraper = UrlScraperAgent(
            self._settings, url=url, source="exa", inbox_pool=self._inbox_pool
        )
        task = asyncio.create_task(scraper.run(request), name=scraper.agent_name)
        scraper_tasks[scraper.agent_name] = task
    
    # Wait with remaining timeout
    done, pending = await asyncio.wait(
        scraper_tasks.values(),
        timeout=timeout,
    )
    
    # Collect results (same pattern as Phase 1)
    ...
Domains Skipped in Phase 2:
COVERED_DOMAINS = frozenset({
    "linkedin.com", "www.linkedin.com",
    "twitter.com", "x.com", "www.x.com",
    "instagram.com", "www.instagram.com",
    "google.com", "www.google.com",
    "youtube.com", "www.youtube.com",
    "facebook.com", "www.facebook.com",
})

Static Agents

LinkedIn Agent

Location: backend/agents/linkedin_agent.py Strategy: Voyager API interception (fastest) → Browser scraping fallback Speed: 2-5 seconds per profile
class LinkedInAgent(BaseBrowserAgent):
    agent_name = "linkedin"
    
    async def run(self, request: ResearchRequest) -> AgentResult:
        if exa_context.get("linkedin_url"):
            # Direct URL provided by Exa
            task = f"Go to {linkedin_url} and extract profile data"
        else:
            # Search by name + company
            task = f"""
            1. Go to https://linkedin.com/search/results/people/?keywords={name}
            2. Find profile matching "{name}" at "{company}"
            3. Extract: headline, about, experience (last 3 roles), education, skills
            """
        
        agent = Agent(task=task, llm="gemini-2.0-flash", browser=self._browser)
        result = await agent.run()
        
        return AgentResult(
            agent_name=self.agent_name,
            status=AgentStatus.SUCCESS,
            profiles=[self._parse_linkedin_profile(result)],
            snippets=[f"LinkedIn: {result.summary}"]
        )
Extracted Data:
  • Full name, headline, location
  • Current company and title
  • About/summary section
  • Experience (last 3 roles with companies and dates)
  • Education
  • Skills (top 5)
  • Connection/follower count

Twitter/X Agent

Location: backend/agents/twitter_agent.py Strategy: twscrape GraphQL (fastest) → Browser scraping fallback Speed: 2-5 seconds per profile
class TwitterAgent(BaseBrowserAgent):
    agent_name = "twitter"
    
    async def run(self, request: ResearchRequest) -> AgentResult:
        try:
            # Try twscrape first (reverse-engineered GraphQL)
            return await self._twscrape_search(request.person_name)
        except Exception:
            # Fallback to Browser Use
            return await self._browser_search(request.person_name)
    
    async def _twscrape_search(self, name: str) -> AgentResult:
        from twscrape import API
        api = API()
        
        # Search for user
        users = await api.search_users(name, limit=5)
        if not users:
            return AgentResult(agent_name=self.agent_name, status=AgentStatus.NO_RESULTS)
        
        # Pick best match (verified > follower count > name similarity)
        best = users[0]
        
        # Get recent tweets
        tweets = await api.user_tweets(best.id, limit=5)
        
        return AgentResult(
            agent_name=self.agent_name,
            status=AgentStatus.SUCCESS,
            profiles=[SocialProfile(
                platform="twitter",
                url=f"https://x.com/{best.username}",
                username=best.username,
                bio=best.rawDescription,
                followers=best.followersCount,
            )],
            snippets=[t.rawContent for t in tweets[:3]]
        )
Extracted Data:
  • Username, display name, bio
  • Follower/following counts
  • Verification status
  • Location, website
  • Recent tweets (last 5)

Instagram Agent

Location: backend/agents/instagram_agent.py Strategy: Browser scraping only (no API available) Speed: 20-40 seconds per profile
class InstagramAgent(BaseBrowserAgent):
    agent_name = "instagram"
    
    async def run(self, request: ResearchRequest) -> AgentResult:
        # Instagram has no public API — must use browser
        agent = Agent(
            task=f"""
            1. Go to https://instagram.com
            2. Search for "{request.person_name}"
            3. Click on the most relevant profile
            4. Extract: username, bio, follower count, post count
            5. Get recent post captions (last 3)
            """,
            llm="gemini-2.0-flash",
            browser=self._browser,
        )
        result = await agent.run()
        return self._parse_result(result)
Why so slow?
  • No API access
  • Must navigate full web UI
  • Often requires bypassing login walls
  • Rate-limited by Instagram’s anti-bot measures

Google Agent

Location: backend/agents/google_agent.py Strategy: Browser-based web search + snippet extraction Speed: 3-8 seconds
class GoogleAgent(BaseBrowserAgent):
    agent_name = "google"
    
    async def run(self, request: ResearchRequest) -> AgentResult:
        agent = Agent(
            task=f"""
            1. Search Google for "{request.person_name}"
            2. Extract top 10 result URLs, titles, snippets
            3. Focus on: news articles, company pages, press releases
            4. Return as structured JSON
            """,
            llm="gemini-2.0-flash",
            browser=self._browser,
        )
        result = await agent.run()
        return AgentResult(
            agent_name=self.agent_name,
            status=AgentStatus.SUCCESS,
            urls_found=[r["url"] for r in result.data],
            snippets=[r["snippet"] for r in result.data],
        )

OSINT Agent

Location: backend/agents/osint_agent.py Strategy: Sherlock username enumeration across 400+ sites Speed: 10-20 seconds
class OsintAgent(BaseBrowserAgent):
    agent_name = "osint"
    
    async def run(self, request: ResearchRequest) -> AgentResult:
        # Extract likely username from name
        username = self._name_to_username(request.person_name)
        
        # Run Sherlock
        from sherlock import sherlock
        results = await sherlock.check_username(username)
        
        profiles = []
        for site, data in results.items():
            if data["status"] == "Claimed":
                profiles.append(SocialProfile(
                    platform=site,
                    url=data["url"],
                    username=username,
                ))
        
        return AgentResult(
            agent_name=self.agent_name,
            status=AgentStatus.SUCCESS,
            profiles=profiles[:20],  # Top 20 matches
            snippets=[f"Found on: {p.platform}" for p in profiles[:5]]
        )
Username Enumeration:
  • Converts “Jane Smith” → “janesmith”, “jane.smith”, “jane_smith”
  • Checks 400+ sites in parallel
  • Returns only confirmed accounts

Exa API Integration

Location: backend/enrichment/exa_client.py Why Exa?
  • 200ms average response time
  • Structured person/company search
  • Returns LinkedIn URLs directly
  • Provides context for other agents
class ExaEnrichmentClient:
    async def enrich_person(self, request: EnrichmentRequest) -> EnrichmentResult:
        query = f"{request.name}"
        if request.company:
            query += f" {request.company}"
        query += " LinkedIn profile"
        
        results = self._exa.search_and_contents(
            query=query,
            type="neural",
            num_results=5,
            text=True,
            highlights=True,
        )
        
        # Extract LinkedIn URL
        linkedin_url = None
        for r in results.results:
            if "linkedin.com/in/" in r.url:
                linkedin_url = r.url
                break
        
        return EnrichmentResult(
            linkedin_url=linkedin_url,
            hits=[Hit(url=r.url, title=r.title, snippet=r.text[:200]) 
                  for r in results.results],
            success=True,
        )
Exa provides:
  • Direct LinkedIn profile URLs (skips search step for LinkedIn agent)
  • Company websites
  • News articles mentioning the person
  • Social media profile URLs
  • Context for Phase 2 dynamic scrapers

Inbox Pool for Login Walls

Location: backend/agents/agentmail_client.py Many sites require email verification to view content. The inbox pool pre-creates temporary email addresses using AgentMail.
class InboxPool:
    """Pre-warmed pool of temporary email inboxes for login-wall bypass."""
    
    def __init__(self, client: AgentMailClient, pool_size: int = 10):
        self._client = client
        self._pool_size = pool_size
        self._inboxes: list[dict] = []
        self._lock = asyncio.Lock()
    
    async def get_inbox(self) -> dict:
        """Get or create a temporary inbox."""
        async with self._lock:
            if not self._inboxes:
                # Create new inbox
                inbox = await self._client.create_inbox()
                self._inboxes.append(inbox)
            return self._inboxes.pop(0)
    
    def release(self, inbox: dict):
        """Return inbox to pool for reuse."""
        self._inboxes.append(inbox)
    
    def release_all(self):
        """Delete all temporary inboxes (cleanup after research complete)."""
        for inbox in self._inboxes:
            asyncio.create_task(self._client.delete_inbox(inbox["id"]))
        self._inboxes.clear()
Usage in agents:
if self._inbox_pool:
    inbox = await self._inbox_pool.get_inbox()
    # Use inbox["email"] for site registration
    # Check inbox["messages"] for verification links
    self._inbox_pool.release(inbox)

Result Streaming

Results stream to Convex as agents complete, enabling real-time frontend updates.
from backend/pipeline.py

async def stream_research(
    self,
    person_name: str,
    person_id: str | None = None,
) -> AsyncGenerator[AgentResult, None]:
    """Stream research results as an async generator for SSE endpoints."""
    request = ResearchRequest(person_name=person_name)
    
    async for result in self._deep_researcher.research(request):
        # Push to Convex as intel fragment
        if person_id:
            content = " | ".join(result.snippets[:3])
            await self._db.store_intel_fragment(
                person_id=person_id,
                source=result.agent_name,
                content=content[:1000],
                urls=result.urls_found[:10],
                confidence=result.confidence,
            )
        yield result
Convex Integration:
// frontend/convex/intel.ts
export const create = mutation({
  args: {
    personId: v.id("persons"),
    source: v.string(),
    content: v.string(),
  },
  handler: async (ctx, { personId, source, content }) => {
    await ctx.db.insert("intelFragments", {
      personId,
      source,
      dataType: "profile",
      content,
      verified: false,
      timestamp: Date.now(),
    });
    
    // Update activity log for live feed
    await ctx.db.insert("activityLog", {
      type: "research",
      message: `${source} agent completed research`,
      personId,
      agentName: source,
      timestamp: Date.now(),
    });
  },
});

Performance Characteristics

Phase 1 Timeline (90-second timeout):
AgentTypical DurationSuccess Rate
Exa API200ms95%
LinkedIn Agent2-5s80%
Twitter Agent2-5s70%
Instagram Agent20-40s60%
Google Agent3-8s95%
OSINT Agent10-20s85%
Social Agent5-10s75%
Darkweb Agent30-60s40%
Phase 2 Timeline (remaining time):
  • Dynamic scrapers: 10-30s each
  • Limited to 3 concurrent scrapers
  • Only spawned if Phase 1 leaves >15s remaining
Total Research Time: 45-90 seconds per person

Error Handling

Graceful DegradationAgent failures are expected and handled gracefully:
  • Timeouts → Status: TIMEOUT, no error thrown
  • Exceptions → Status: FAILED, logged but pipeline continues
  • No results → Status: NO_RESULTS, treated as success
  • Partial results → Always acceptable
# All agents run with return_exceptions=True
results = await asyncio.gather(*tasks.values(), return_exceptions=True)

for task in tasks.values():
    if isinstance(task.exception(), Exception):
        # Log and continue
        logger.error("Agent failed: {}", task.exception())
    else:
        # Process result
        result = task.result()

Observability

Laminar Tracing:
orchestrator.research_person
├── exa.enrich_person
├── linkedin_agent.run
│   ├── browser_use.navigate
│   └── gemini.parse_profile
├── twitter_agent.run
│   └── twscrape.search_users
├── instagram_agent.run
│   ├── browser_use.navigate
│   └── browser_use.extract_data
├── google_agent.run
├── osint_agent.run
│   └── sherlock.check_username
└── dynamic_scrapers
    ├── url_scraper_1.run
    └── url_scraper_2.run

Next Steps

Real-Time Streaming

How agent results stream to the frontend in real-time

Architecture

Full system architecture and component integration

Build docs developers (and LLMs) love