Overview
The DeepResearcher is JARVIS’s most advanced intelligence gathering system. It runs a four-phase pipeline that discovers comprehensive information about individuals using parallel Browser Use Cloud Skills, deep URL extraction, and autonomous account creation.Architecture
backend/agents/deep_researcher.py
Phase Breakdown
Runs Exa search immediately and yields results in ~1 second, while starting SixtyFour enrichment in the background.
async def _exa_pass(
self,
person: str,
company: str | None,
seen_urls: set[str],
) -> tuple[list[str], list[str]]:
"""Run Exa queries only (fast, ~1s). Returns (urls, snippets)."""
exa_queries = [
EnrichmentRequest(name=person, company=company),
EnrichmentRequest(name=person, additional_context="social media profiles"),
]
if company:
exa_queries.append(
EnrichmentRequest(name=person, additional_context=f"{company} employee")
)
results = await asyncio.gather(
*(self._exa.enrich_person(q) for q in exa_queries),
return_exceptions=True,
)
exa_urls: list[str] = []
exa_snippets: list[str] = []
for result in results:
if isinstance(result, Exception) or not result.success:
continue
for hit in result.hits:
if not hit.url or hit.url in seen_urls:
continue
# Relevance filtering
name_parts = person.lower().split()
title_lower = (hit.title or "").lower()
snippet_lower = (hit.snippet or "").lower()
if not any(part in title_lower or part in snippet_lower for part in name_parts):
continue
seen_urls.add(hit.url)
exa_urls.append(hit.url)
snippet = f"[Exa] {hit.title}: {hit.snippet[:200]}" if hit.snippet else f"[Exa] {hit.title}"
exa_snippets.append(snippet)
return exa_urls, exa_snippets
async def _phase1(
self,
person: str,
company: str | None,
exa_urls: list[str],
sixtyfour_result: object | None,
seen_urls: set[str],
failed_skills: list[tuple[str, str]],
) -> AsyncGenerator[AgentResult, None]:
"""Run platform + OSINT skills in parallel."""
skill_tasks: list[tuple[str, str, asyncio.Task]] = []
# Core platform skills
core_skills = [
("tiktok_profile", f"Get TikTok profile info for {person}"),
("github_profile", f"Get GitHub profile and projects for {person}"),
("instagram_posts", f"Get Instagram profile and posts for {person}"),
(
"linkedin_company_posts",
f"Find LinkedIn profile and posts for {person}"
+ (f" at {company}" if company else ""),
),
("facebook_page", f"Get Facebook page or profile for {person}"),
("youtube_filmography", f"Find YouTube channel for {person}"),
("reddit_subreddit", f"Find Reddit profile for {person}"),
("pinterest_pins", f"Find Pinterest profile for {person}"),
("linktree_profile", f"Get Linktree links for {person}"),
]
# OSINT skills
osint_skills = [
("osint_scraper", f"Run OSINT search for {person}"),
]
if company:
osint_skills.extend([
("sec_filings", f"Find SEC filings for {company}"),
("company_employees", f"Find employees at {company}"),
("yc_company", f"Check if {company} is a YC company"),
])
# Domain-matched skills from Exa URLs
domain_matched: list[tuple[str, str]] = []
launched_skills: set[str] = set()
for skill_name, _ in core_skills + osint_skills:
launched_skills.add(skill_name)
for url in exa_urls:
domain = urlparse(url).netloc.lower().replace("www.", "")
skill_name = DOMAIN_TO_SKILL.get(domain)
if skill_name and skill_name not in launched_skills:
domain_matched.append(
(skill_name, f"Extract all info from {url} about {person}")
)
launched_skills.add(skill_name)
all_skills = core_skills + osint_skills + domain_matched
# Launch all skill tasks with semaphore
for skill_name, task_str in all_skills:
augmented = self._augment_task_with_auth(skill_name, task_str)
task = asyncio.ensure_future(
self._run_skill_with_semaphore(skill_name, augmented)
)
skill_tasks.append((skill_name, task_str, task))
# Gather all results then yield them
task_objs = [t for _, _, t in skill_tasks]
all_results = await asyncio.gather(*task_objs, return_exceptions=True)
for idx, result in enumerate(all_results):
sn, ts, _ = skill_tasks[idx]
if isinstance(result, Exception):
logger.warning("deep_researcher: skill {} error: {}", sn, str(result))
failed_skills.append((sn, ts))
continue
if result and result.get("success"):
output = result.get("output", "")
label = result.get("label", sn)
agent_result = AgentResult(
agent_name=f"skill_{label}",
status=AgentStatus.SUCCESS,
snippets=[output] if output else [],
confidence=self._compute_confidence(output, person),
)
if self._verify_result(agent_result, person):
yield agent_result
MAX_CONCURRENT_SESSIONS = 25
async def _run_skill_with_semaphore(
self, skill_name: str, task_str: str
) -> dict | None:
"""Run a skill task, respecting concurrency limit."""
timeout = 120.0 if skill_name in self._SLOW_SKILLS else 60.0
if skill_name in self._AUTH_HEAVY_SKILLS:
max_steps = 15
elif skill_name in self._SLOW_SKILLS:
max_steps = 8
else:
max_steps = 5
secrets = self._secrets_for_skill(skill_name)
async with self._semaphore:
return await self._cloud.run_skill(
skill_name, task_str, timeout=timeout,
max_steps=max_steps,
secrets=secrets,
)
async def _phase2(
self,
person: str,
exa_urls: list[str],
seen_urls: set[str],
deep_search_task_id: str | None,
) -> AsyncGenerator[AgentResult, None]:
"""Deep URL extraction for uncovered URLs, SixtyFour deep search results, dark web."""
tasks: list[asyncio.Task] = []
task_labels: list[str] = []
# High-impact freeform tasks (public records that wow judges)
wow_tasks = [
(
"court_records",
f"Go to courtlistener.com/? and search for '{person}'. "
f"Extract any court cases, lawsuits, or legal filings. "
f"Also try unicourt.com. Report all findings.",
),
(
"political_donations",
f"Go to fec.gov/data/receipts/individual-contributions/ "
f"and search for '{person}'. Extract all political donations — "
f"amounts, recipients, dates, employer. Be thorough.",
),
(
"academic_papers",
f"Go to scholar.google.com and search for '{person}'. "
f"Extract all academic papers, citations, h-index, co-authors, "
f"and research areas. Also check semanticscholar.org.",
),
(
"podcast_appearances",
f"Go to listennotes.com and search for '{person}'. "
f"Find any podcast episodes they appeared on or hosted. "
f"Extract episode titles, show names, and dates.",
),
(
"crunchbase_profile",
f"Go to crunchbase.com and search for '{person}'. "
f"Extract: roles, companies founded/worked at, funding rounds, "
f"investors, board positions, and exits.",
),
]
for label, task_str in wow_tasks:
async def _run_wow(lbl: str, prompt: str) -> dict | None:
async with self._semaphore:
return await self._cloud.run_task(
prompt, max_steps=8, timeout=60.0,
)
task = asyncio.ensure_future(_run_wow(label, task_str))
tasks.append(task)
task_labels.append(f"wow:{label}")
# Gather all results
all_results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(all_results):
if isinstance(result, Exception):
logger.warning("deep_researcher: phase 2 task error: {}", result)
continue
label = task_labels[idx] if idx < len(task_labels) else "phase2"
if label.startswith("wow:") and isinstance(result, dict) and result.get("success"):
output = result.get("output", "")
wow_name = label.replace("wow:", "")
# Strict verification for wow tasks
if not self._verify_result_strict(output, person):
logger.info("deep_researcher: wow_{} filtered — wrong person", wow_name)
continue
yield AgentResult(
agent_name=f"wow_{wow_name}",
status=AgentStatus.SUCCESS,
snippets=[output] if output else [],
confidence=self._compute_confidence(output, person),
)
# Platform signup URLs for autonomous account creation
PLATFORM_SIGNUP_URLS = {
"twitter.com": "https://x.com/i/flow/signup",
"instagram.com": "https://www.instagram.com/accounts/emailsignup/",
"tiktok.com": "https://www.tiktok.com/signup",
"reddit.com": "https://www.reddit.com/register/",
"github.com": "https://github.com/signup",
"medium.com": "https://medium.com/m/signin?operation=register",
"linkedin.com": "https://www.linkedin.com/signup",
}
async def _phase3(
self,
person: str,
failed_skills: list[tuple[str, str]],
) -> AsyncGenerator[AgentResult, None]:
"""Retry failed skills with auth — check saved credentials first."""
for skill_name, task_str in failed_skills:
domain = SKILL_TO_DOMAIN.get(skill_name)
if not domain:
continue
# Check saved verified credentials FIRST
cred = self._all_credentials.get(domain)
if cred:
creds = {"email": cred["email"], "password": cred["password"]}
else:
# Fall back to account creation
signup_url = PLATFORM_SIGNUP_URLS.get(domain)
if not signup_url:
continue
creds = await self._accounts.ensure_account(
domain, signup_url, person_name="Specter Agent"
)
if not creds:
continue
# Augment task with auth instructions and retry
augmented = self._augment_task_with_auth(skill_name, task_str)
try:
async with self._semaphore:
result = await self._cloud.run_skill(
skill_name,
augmented,
timeout=120.0,
max_steps=20,
secrets={
domain: f"{creds['email']}:{creds['password']}"
},
)
if result and result.get("success"):
output = result.get("output", "")
yield AgentResult(
agent_name=f"skill_{skill_name}_retry",
status=AgentStatus.SUCCESS,
snippets=[output] if output else [],
confidence=self._compute_confidence(output, person),
)
except Exception as exc:
logger.warning("deep_researcher: retry failed for {}: {}", skill_name, exc)
Credential Management
DeepResearcher loads verified platform credentials for authenticated access:backend/agents/deep_researcher.py
Result Verification
Filters results to ensure they’re about the correct person:backend/agents/deep_researcher.py
Usage Example
Performance Metrics
Phase Timings
Typical Results
- Phase 0: 5-10 Exa URLs, yielded in ~1s
- Phase 1: 10-15 successful skills, 20-35s total
- Phase 2: 3-8 wow task results, 30-60s total
- Phase 3: 2-5 retry successes, 30-90s total
- Overall: 80-180s for comprehensive research
Troubleshooting
Skills Timing Out
Skills Failing Auth
Too Many Concurrent Sessions
Filtering Wrong People
Next Steps
Browser Use Integration
Learn about Cloud Skills and SDK usage
LinkedIn Agent
See how specialized agents work