Skip to main content

Overview

The DeepResearcher is JARVIS’s most advanced intelligence gathering system. It runs a four-phase pipeline that discovers comprehensive information about individuals using parallel Browser Use Cloud Skills, deep URL extraction, and autonomous account creation.

Architecture

backend/agents/deep_researcher.py
class DeepResearcher:
    """Multi-phase deep research pipeline that streams results.
    
    Phase 0: Exa + SixtyFour enrich-lead in parallel (~3s)
    Phase 1: Platform + OSINT skills in parallel (~20-35s, up to 15 concurrent)
    Phase 2: Deep URL extraction + SixtyFour deep-search + dark web (~30-60s)
    Phase 3: Verification loop — retry failed skills with account creation (~30-90s)
    
    Results stream as an async generator so the UI can update live.
    """
    
    def __init__(self, settings: Settings):
        self._settings = settings
        self._exa = ExaEnrichmentClient(settings)
        self._sixtyfour = SixtyFourClient(settings)
        self._cloud = CloudSkillRunner(settings)
        self._accounts = AccountManager(settings)
        self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_SESSIONS)

Phase Breakdown

1
Phase 0: Fast Initial Discovery (~1-3s)
2
Runs Exa search immediately and yields results in ~1 second, while starting SixtyFour enrichment in the background.
3
Key Feature: Frontend gets data almost instantly while deeper research continues.
4
async def _exa_pass(
    self,
    person: str,
    company: str | None,
    seen_urls: set[str],
) -> tuple[list[str], list[str]]:
    """Run Exa queries only (fast, ~1s). Returns (urls, snippets)."""
    exa_queries = [
        EnrichmentRequest(name=person, company=company),
        EnrichmentRequest(name=person, additional_context="social media profiles"),
    ]
    if company:
        exa_queries.append(
            EnrichmentRequest(name=person, additional_context=f"{company} employee")
        )
    
    results = await asyncio.gather(
        *(self._exa.enrich_person(q) for q in exa_queries),
        return_exceptions=True,
    )
    
    exa_urls: list[str] = []
    exa_snippets: list[str] = []
    for result in results:
        if isinstance(result, Exception) or not result.success:
            continue
        for hit in result.hits:
            if not hit.url or hit.url in seen_urls:
                continue
            # Relevance filtering
            name_parts = person.lower().split()
            title_lower = (hit.title or "").lower()
            snippet_lower = (hit.snippet or "").lower()
            if not any(part in title_lower or part in snippet_lower for part in name_parts):
                continue
            seen_urls.add(hit.url)
            exa_urls.append(hit.url)
            snippet = f"[Exa] {hit.title}: {hit.snippet[:200]}" if hit.snippet else f"[Exa] {hit.title}"
            exa_snippets.append(snippet)
    
    return exa_urls, exa_snippets
5
Phase 1: Platform + OSINT Skills (~20-35s)
6
Launches 15+ Browser Use Cloud Skills in parallel with semaphore limiting (max 25 concurrent).
7
Social Media Skills:
8
  • TikTok Profile
  • GitHub Profile
  • Instagram Posts
  • LinkedIn Company Posts
  • Facebook Page
  • YouTube Filmography
  • Reddit Profile
  • Pinterest Pins
  • Linktree Profile
  • 9
    OSINT Skills:
    10
  • OSINT Scraper
  • SEC Filings (if company provided)
  • Company Employees
  • YC Company Check
  • Ancestry Records
  • 11
    async def _phase1(
        self,
        person: str,
        company: str | None,
        exa_urls: list[str],
        sixtyfour_result: object | None,
        seen_urls: set[str],
        failed_skills: list[tuple[str, str]],
    ) -> AsyncGenerator[AgentResult, None]:
        """Run platform + OSINT skills in parallel."""
        skill_tasks: list[tuple[str, str, asyncio.Task]] = []
        
        # Core platform skills
        core_skills = [
            ("tiktok_profile", f"Get TikTok profile info for {person}"),
            ("github_profile", f"Get GitHub profile and projects for {person}"),
            ("instagram_posts", f"Get Instagram profile and posts for {person}"),
            (
                "linkedin_company_posts",
                f"Find LinkedIn profile and posts for {person}"
                + (f" at {company}" if company else ""),
            ),
            ("facebook_page", f"Get Facebook page or profile for {person}"),
            ("youtube_filmography", f"Find YouTube channel for {person}"),
            ("reddit_subreddit", f"Find Reddit profile for {person}"),
            ("pinterest_pins", f"Find Pinterest profile for {person}"),
            ("linktree_profile", f"Get Linktree links for {person}"),
        ]
        
        # OSINT skills
        osint_skills = [
            ("osint_scraper", f"Run OSINT search for {person}"),
        ]
        if company:
            osint_skills.extend([
                ("sec_filings", f"Find SEC filings for {company}"),
                ("company_employees", f"Find employees at {company}"),
                ("yc_company", f"Check if {company} is a YC company"),
            ])
        
        # Domain-matched skills from Exa URLs
        domain_matched: list[tuple[str, str]] = []
        launched_skills: set[str] = set()
        
        for skill_name, _ in core_skills + osint_skills:
            launched_skills.add(skill_name)
        
        for url in exa_urls:
            domain = urlparse(url).netloc.lower().replace("www.", "")
            skill_name = DOMAIN_TO_SKILL.get(domain)
            if skill_name and skill_name not in launched_skills:
                domain_matched.append(
                    (skill_name, f"Extract all info from {url} about {person}")
                )
                launched_skills.add(skill_name)
        
        all_skills = core_skills + osint_skills + domain_matched
        
        # Launch all skill tasks with semaphore
        for skill_name, task_str in all_skills:
            augmented = self._augment_task_with_auth(skill_name, task_str)
            task = asyncio.ensure_future(
                self._run_skill_with_semaphore(skill_name, augmented)
            )
            skill_tasks.append((skill_name, task_str, task))
        
        # Gather all results then yield them
        task_objs = [t for _, _, t in skill_tasks]
        all_results = await asyncio.gather(*task_objs, return_exceptions=True)
        
        for idx, result in enumerate(all_results):
            sn, ts, _ = skill_tasks[idx]
            if isinstance(result, Exception):
                logger.warning("deep_researcher: skill {} error: {}", sn, str(result))
                failed_skills.append((sn, ts))
                continue
            
            if result and result.get("success"):
                output = result.get("output", "")
                label = result.get("label", sn)
                agent_result = AgentResult(
                    agent_name=f"skill_{label}",
                    status=AgentStatus.SUCCESS,
                    snippets=[output] if output else [],
                    confidence=self._compute_confidence(output, person),
                )
                if self._verify_result(agent_result, person):
                    yield agent_result
    
    12
    Semaphore Limiting:
    13
    MAX_CONCURRENT_SESSIONS = 25
    
    async def _run_skill_with_semaphore(
        self, skill_name: str, task_str: str
    ) -> dict | None:
        """Run a skill task, respecting concurrency limit."""
        timeout = 120.0 if skill_name in self._SLOW_SKILLS else 60.0
        if skill_name in self._AUTH_HEAVY_SKILLS:
            max_steps = 15
        elif skill_name in self._SLOW_SKILLS:
            max_steps = 8
        else:
            max_steps = 5
        
        secrets = self._secrets_for_skill(skill_name)
        async with self._semaphore:
            return await self._cloud.run_skill(
                skill_name, task_str, timeout=timeout,
                max_steps=max_steps,
                secrets=secrets,
            )
    
    14
    Phase 2: Deep Extraction (~30-60s)
    15
    Navigates to uncovered URLs and runs high-impact freeform tasks that impress judges:
    16
    High-Impact Tasks:
    17
  • Court Records: CourtListener, UniCourt
  • Political Donations: FEC.gov individual contributions
  • Academic Papers: Google Scholar, Semantic Scholar
  • Podcast Appearances: ListenNotes search
  • Crunchbase Profiles: Funding, investors, board positions
  • Dark Web: HIBP breach checks
  • 18
    async def _phase2(
        self,
        person: str,
        exa_urls: list[str],
        seen_urls: set[str],
        deep_search_task_id: str | None,
    ) -> AsyncGenerator[AgentResult, None]:
        """Deep URL extraction for uncovered URLs, SixtyFour deep search results, dark web."""
        tasks: list[asyncio.Task] = []
        task_labels: list[str] = []
        
        # High-impact freeform tasks (public records that wow judges)
        wow_tasks = [
            (
                "court_records",
                f"Go to courtlistener.com/? and search for '{person}'. "
                f"Extract any court cases, lawsuits, or legal filings. "
                f"Also try unicourt.com. Report all findings.",
            ),
            (
                "political_donations",
                f"Go to fec.gov/data/receipts/individual-contributions/ "
                f"and search for '{person}'. Extract all political donations — "
                f"amounts, recipients, dates, employer. Be thorough.",
            ),
            (
                "academic_papers",
                f"Go to scholar.google.com and search for '{person}'. "
                f"Extract all academic papers, citations, h-index, co-authors, "
                f"and research areas. Also check semanticscholar.org.",
            ),
            (
                "podcast_appearances",
                f"Go to listennotes.com and search for '{person}'. "
                f"Find any podcast episodes they appeared on or hosted. "
                f"Extract episode titles, show names, and dates.",
            ),
            (
                "crunchbase_profile",
                f"Go to crunchbase.com and search for '{person}'. "
                f"Extract: roles, companies founded/worked at, funding rounds, "
                f"investors, board positions, and exits.",
            ),
        ]
        
        for label, task_str in wow_tasks:
            async def _run_wow(lbl: str, prompt: str) -> dict | None:
                async with self._semaphore:
                    return await self._cloud.run_task(
                        prompt, max_steps=8, timeout=60.0,
                    )
            task = asyncio.ensure_future(_run_wow(label, task_str))
            tasks.append(task)
            task_labels.append(f"wow:{label}")
        
        # Gather all results
        all_results = await asyncio.gather(*tasks, return_exceptions=True)
        for idx, result in enumerate(all_results):
            if isinstance(result, Exception):
                logger.warning("deep_researcher: phase 2 task error: {}", result)
                continue
            label = task_labels[idx] if idx < len(task_labels) else "phase2"
            
            if label.startswith("wow:") and isinstance(result, dict) and result.get("success"):
                output = result.get("output", "")
                wow_name = label.replace("wow:", "")
                # Strict verification for wow tasks
                if not self._verify_result_strict(output, person):
                    logger.info("deep_researcher: wow_{} filtered — wrong person", wow_name)
                    continue
                yield AgentResult(
                    agent_name=f"wow_{wow_name}",
                    status=AgentStatus.SUCCESS,
                    snippets=[output] if output else [],
                    confidence=self._compute_confidence(output, person),
                )
    
    19
    Phase 3: Verification Loop (~30-90s)
    20
    Retries failed skills with autonomous account creation using AgentMail disposable emails.
    21
    # Platform signup URLs for autonomous account creation
    PLATFORM_SIGNUP_URLS = {
        "twitter.com": "https://x.com/i/flow/signup",
        "instagram.com": "https://www.instagram.com/accounts/emailsignup/",
        "tiktok.com": "https://www.tiktok.com/signup",
        "reddit.com": "https://www.reddit.com/register/",
        "github.com": "https://github.com/signup",
        "medium.com": "https://medium.com/m/signin?operation=register",
        "linkedin.com": "https://www.linkedin.com/signup",
    }
    
    async def _phase3(
        self,
        person: str,
        failed_skills: list[tuple[str, str]],
    ) -> AsyncGenerator[AgentResult, None]:
        """Retry failed skills with auth — check saved credentials first."""
        for skill_name, task_str in failed_skills:
            domain = SKILL_TO_DOMAIN.get(skill_name)
            if not domain:
                continue
            
            # Check saved verified credentials FIRST
            cred = self._all_credentials.get(domain)
            if cred:
                creds = {"email": cred["email"], "password": cred["password"]}
            else:
                # Fall back to account creation
                signup_url = PLATFORM_SIGNUP_URLS.get(domain)
                if not signup_url:
                    continue
                creds = await self._accounts.ensure_account(
                    domain, signup_url, person_name="Specter Agent"
                )
            
            if not creds:
                continue
            
            # Augment task with auth instructions and retry
            augmented = self._augment_task_with_auth(skill_name, task_str)
            try:
                async with self._semaphore:
                    result = await self._cloud.run_skill(
                        skill_name,
                        augmented,
                        timeout=120.0,
                        max_steps=20,
                        secrets={
                            domain: f"{creds['email']}:{creds['password']}"
                        },
                    )
                
                if result and result.get("success"):
                    output = result.get("output", "")
                    yield AgentResult(
                        agent_name=f"skill_{skill_name}_retry",
                        status=AgentStatus.SUCCESS,
                        snippets=[output] if output else [],
                        confidence=self._compute_confidence(output, person),
                    )
            except Exception as exc:
                logger.warning("deep_researcher: retry failed for {}: {}", skill_name, exc)
    

    Credential Management

    DeepResearcher loads verified platform credentials for authenticated access:
    backend/agents/deep_researcher.py
    def __init__(self, settings: Settings):
        # Load platform credentials from file (only verified entries)
        self._all_credentials: dict[str, dict] = {}
        creds_path = pathlib.Path(__file__).resolve().parent.parent / "agent_credentials.json"
        if creds_path.exists():
            try:
                raw = json.loads(creds_path.read_text())
                self._all_credentials = {
                    domain: cred for domain, cred in raw.items()
                    if cred.get("verified") is True
                }
                logger.info(
                    "deep_researcher: loaded {} verified credentials (of {} total)",
                    len(self._all_credentials), len(raw),
                )
            except Exception as exc:
                logger.warning("deep_researcher: failed to load credentials: {}", exc)
    
    def _secrets_for_skill(self, skill_name: str) -> dict[str, str] | None:
        """Return only the relevant platform credential for a skill.
        
        Browser Use Cloud throws 422 when >10 secrets are passed.
        This returns at most 1 entry: {domain: "email:password"}.
        """
        domain = SKILL_TO_DOMAIN.get(skill_name)
        if not domain:
            return None
        cred = self._all_credentials.get(domain)
        if not cred:
            return None
        return {domain: f"{cred['email']}:{cred['password']}"}
    

    Result Verification

    Filters results to ensure they’re about the correct person:
    backend/agents/deep_researcher.py
    @staticmethod
    def _verify_result(result: AgentResult, person_name: str) -> bool:
        """Check if a result is about the target person.
        
        At least one name part must appear in the combined text output.
        """
        if not result.snippets:
            return True
        
        name_parts = person_name.lower().split()
        all_text = " ".join(result.snippets).lower()
        return any(part in all_text for part in name_parts)
    
    @staticmethod
    def _verify_result_strict(output: str, person_name: str) -> bool:
        """Strict verification — ALL name parts must appear in output.
        
        Used for wow tasks (court records, political donations etc.) that
        grab whatever they find on the page, often for wrong people.
        """
        if not output:
            return False
        output_lower = output.lower()
        name_parts = person_name.lower().split()
        return all(part in output_lower for part in name_parts)
    
    @staticmethod
    def _compute_confidence(output: str, person_name: str) -> float:
        """Compute confidence score based on name match strength."""
        if not output:
            return 0.3
        
        output_lower = output.lower()
        name_parts = person_name.lower().split()
        matched = sum(1 for part in name_parts if part in output_lower)
        
        if matched == 0:
            return 0.1
        if matched == len(name_parts):
            return 1.0  # Full name match
        return 0.3 + (0.7 * matched / len(name_parts))
    

    Usage Example

    from agents.deep_researcher import DeepResearcher
    from agents.models import ResearchRequest
    from config import Settings
    
    settings = Settings()
    researcher = DeepResearcher(settings)
    
    request = ResearchRequest(
        person_name="Elon Musk",
        company="Tesla",
    )
    
    # Stream results as they complete
    all_results: list[AgentResult] = []
    async for result in researcher.research(request):
        print(f"\n{result.agent_name} completed:")
        print(f"  Status: {result.status}")
        print(f"  Snippets: {len(result.snippets)}")
        print(f"  Confidence: {result.confidence:.2f}")
        
        for snippet in result.snippets[:3]:
            print(f"  - {snippet[:100]}")
        
        all_results.append(result)
    
    print(f"\nTotal results: {len(all_results)}")
    successful = [r for r in all_results if r.status == AgentStatus.SUCCESS]
    print(f"Successful: {len(successful)}")
    

    Performance Metrics

    Phase Timings

    {
        "phase_0": 1.2,   # Exa fast pass
        "phase_1": 28.5,  # Platform + OSINT skills
        "phase_2": 45.3,  # Deep extraction + wow tasks
        "phase_3": 62.1   # Verification loop (if needed)
    }
    

    Typical Results

    • Phase 0: 5-10 Exa URLs, yielded in ~1s
    • Phase 1: 10-15 successful skills, 20-35s total
    • Phase 2: 3-8 wow task results, 30-60s total
    • Phase 3: 2-5 retry successes, 30-90s total
    • Overall: 80-180s for comprehensive research

    Troubleshooting

    Skills Timing Out

    # Increase timeout for slow platforms
    _SLOW_SKILLS = frozenset({
        "linkedin_company_posts", "youtube_filmography", "pinterest_pins",
        "instagram_posts", "company_employees",
    })
    
    # These get 120s timeout instead of 60s
    timeout = 120.0 if skill_name in self._SLOW_SKILLS else 60.0
    

    Skills Failing Auth

    # Add credentials to agent_credentials.json
    {
      "instagram.com": {
        "email": "[email protected]",
        "password": "Specter2026!",
        "verified": true
      }
    }
    

    Too Many Concurrent Sessions

    # Reduce semaphore limit
    MAX_CONCURRENT_SESSIONS = 15  # Down from 25
    self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_SESSIONS)
    

    Filtering Wrong People

    # Results are automatically verified
    if self._verify_result(agent_result, person):
        yield agent_result
    else:
        logger.info("deep_researcher: filtered low-confidence result")
    

    Next Steps

    Browser Use Integration

    Learn about Cloud Skills and SDK usage

    LinkedIn Agent

    See how specialized agents work

    Build docs developers (and LLMs) love