Skip to main content

Performance Overview

JARVIS is designed for real-time intelligence gathering, requiring careful optimization across multiple layers:
  • Capture Pipeline - Fast frame extraction and face detection
  • Agent Swarm - Parallel execution with timeout management
  • LLM Calls - Efficient model selection and prompt engineering
  • Real-time Updates - Streaming results via Convex

Architecture Optimizations

Two-Tier Research Strategy

JARVIS uses a two-tier approach to balance speed and depth:
# TIER 1: Fast API enrichment (< 1 second)
exa_context = await ExaEnrichment(self.exa).search_person(
    name=self.identity.get("name"),
    context=self.identity.get("context", {})
)
# Returns: LinkedIn URL, company, web mentions

# TIER 2: Deep browser research (parallel, 10-60 seconds)
agents = self._create_agents(exa_context)
results = await asyncio.gather(
    *[self._run_agent(agent) for agent in agents],
    return_exceptions=True
)
Why this works:
  • Tier 1 provides context for Tier 2 agents (faster searches)
  • Results stream to frontend as they arrive (perceived performance)
  • Tier 1 completes in less than 1s, giving immediate feedback

Parallel Agent Execution

Agents run in parallel with asyncio.gather:
backend/agents/orchestrator.py
class SwarmOrchestrator:
    async def execute(self) -> dict:
        """Run all agents in parallel, streaming results."""
        agents = [
            LinkedInAgent(name, context),
            TwitterAgent(name, context),
            InstagramAgent(name, context),
        ]
        
        # All agents run simultaneously
        results = await asyncio.gather(
            *[self._run_agent(agent) for agent in agents],
            return_exceptions=True  # Don't fail if one agent fails
        )
        
        return results
Performance gain:
  • Sequential: 60s (20s × 3 agents)
  • Parallel: 20s (longest agent)
  • 3x speedup in this example

Timeout Management

Prevent slow operations from blocking:
try:
    results = await asyncio.wait_for(
        asyncio.gather(*[agent.run() for agent in agents]),
        timeout=180  # 3 minutes max
    )
except asyncio.TimeoutError:
    logger.warning("Agent swarm timed out, using partial results")
    results = []  # Partial results already streamed to Convex

LLM Performance

Model Selection

JARVIS uses different models for different tasks:
TaskModelReason
Vision (face ID)GPT-4 VisionBest accuracy for face recognition
Report synthesisGemini 2.0 Flash25x cheaper, 2x faster than GPT-4
Agent promptsGemini 2.0 FlashFast, cheap, good for structured tasks
Cost comparison (1,000 tokens):
  • GPT-4: $0.03
  • GPT-4 Vision: $0.01
  • Gemini 2.0 Flash: $0.001
Speed comparison (average latency):
  • GPT-4 Vision: ~2s
  • Gemini 2.0 Flash: ~0.8s

Prompt Engineering

Optimize prompts for speed and accuracy:
# Bad: Verbose, slow
prompt = """
Please carefully analyze the provided image and extract all relevant 
information about the person's identity. Look for names, URLs, context 
clues, and any other identifying information. Be thorough and accurate.
Return your analysis in a well-structured JSON format...
"""

# Good: Concise, fast
prompt = """
Extract from this PimEyes results page:
1. Person's name
2. URLs shown in results
3. Context (company, title, location)
4. Confidence (high/medium/low)

Return ONLY valid JSON:
{"name": "...", "urls": [...], "context": {...}, "confidence": "..."}
"""
Impact:
  • Shorter prompts = fewer input tokens = faster, cheaper
  • Clear structure = more consistent output = less retries

Response Streaming

For long responses, use streaming to reduce perceived latency:
import asyncio

async def stream_synthesis(person_id: str):
    """Stream synthesis results to frontend as they generate."""
    response = await openai.ChatCompletion.create(
        model="gpt-4",
        messages=[...],
        stream=True,
    )
    
    async for chunk in response:
        delta = chunk.choices[0].delta.content
        if delta:
            # Stream to frontend via Convex
            await convex.mutation("persons:appendDossier", {
                "personId": person_id,
                "chunk": delta,
            })

Database Performance

Convex Real-time Subscriptions

Convex provides real-time updates with zero overhead:
frontend/app/page.tsx
import { useQuery } from "convex/react";

export default function Corkboard() {
  // Automatically updates when data changes
  const persons = useQuery(api.persons.list);
  
  return (
    <div>
      {persons?.map(person => (
        <PersonCard key={person._id} person={person} />
      ))}
    </div>
  );
}
Performance benefits:
  • No polling overhead
  • Delta-only updates (only changed data sent)
  • Automatic reconnection handling

MongoDB Optimization

Optimize MongoDB queries:
# Bad: Fetches all fields
person = await db.persons.find_one({"_id": person_id})

# Good: Project only needed fields
person = await db.persons.find_one(
    {"_id": person_id},
    projection={"name": 1, "photoUrl": 1, "confidence": 1}
)

# Bad: N+1 query problem
for person_id in person_ids:
    person = await db.persons.find_one({"_id": person_id})

# Good: Batch query
persons = await db.persons.find(
    {"_id": {"$in": person_ids}}
).to_list(length=100)

Capture Pipeline Performance

Frame Extraction

Optimize video frame extraction:
backend/capture/frame_extractor.py
def extract_frames(video_bytes: bytes, content_type: str, fps: int = 1) -> list[bytes]:
    """Extract frames from video using ffmpeg."""
    if not content_type.startswith("video/"):
        # Image: no extraction needed
        return [video_bytes]
    
    # Use ffmpeg for fast frame extraction
    with tempfile.TemporaryDirectory() as tmpdir:
        input_path = Path(tmpdir) / "input.mp4"
        input_path.write_bytes(video_bytes)
        
        output_pattern = Path(tmpdir) / "frame_%03d.jpg"
        subprocess.run([
            "ffmpeg", "-i", str(input_path),
            "-vf", f"fps={fps}",  # Extract 1 frame per second
            "-q:v", "2",  # Quality: 2-5 is good (lower = better)
            str(output_pattern)
        ], check=True, capture_output=True)
        
        frames = []
        for frame_path in sorted(Path(tmpdir).glob("frame_*.jpg")):
            frames.append(frame_path.read_bytes())
        return frames
Performance tips:
  • Lower FPS = fewer frames = faster processing
  • Quality 2-5 balances size and accuracy
  • Use tmpdir for automatic cleanup

Face Detection

MediaPipe is optimized for speed:
backend/identification/detector.py
import mediapipe as mp

class MediaPipeFaceDetector:
    def __init__(self):
        self.detector = mp.solutions.face_detection.FaceDetection(
            min_detection_confidence=0.7,  # Lower = faster, more false positives
            model_selection=0,  # 0 = optimized for speed, 1 = accuracy
        )
    
    async def detect_faces(self, request: FaceDetectionRequest) -> FaceDetectionResult:
        # MediaPipe is fast: 5-10ms per frame
        results = self.detector.process(image_rgb)
        # ...
Benchmark (100x100 image):
  • MediaPipe: ~5-10ms
  • MTCNN: ~100ms
  • dlib: ~50-100ms

Browser Automation Performance

Headless Browsers

Always use headless mode for speed:
from browser_use import Browser, BrowserConfig

browser = Browser(config=BrowserConfig(
    headless=True,  # 2-3x faster than headed
    disable_images=True,  # Skip loading images
    disable_css=False,  # Keep CSS for layout
))

Persistent Sessions

Reuse browser sessions when possible:
class LinkedInAgent:
    # Class-level browser pool
    _browser_pool = []
    
    async def run(self) -> dict:
        # Reuse browser if available
        if self._browser_pool:
            browser = self._browser_pool.pop()
        else:
            browser = Browser(config=BrowserConfig(headless=True))
        
        try:
            result = await agent.run()
            return result
        finally:
            # Return to pool for reuse
            self._browser_pool.append(browser)
Impact:
  • Cold start: ~2-3s (launch browser)
  • Warm start: ~0.5s (reuse browser)
  • 4-6x speedup for multiple requests

Caching Strategies

Cache Person Lookups

from functools import lru_cache
import hashlib

class PersonCache:
    @lru_cache(maxsize=128)
    async def get_person(self, person_id: str) -> dict | None:
        """Cached person lookup."""
        return await db.persons.find_one({"_id": person_id})
    
    def cache_key(self, name: str, photo_hash: str) -> str:
        """Generate cache key for face lookup."""
        return hashlib.sha256(f"{name}:{photo_hash}".encode()).hexdigest()

Cache Exa Results

class ExaEnrichment:
    def __init__(self, exa_client, cache_ttl: int = 3600):
        self.exa = exa_client
        self.cache = {}  # In production: use Redis
        self.cache_ttl = cache_ttl
    
    async def search_person(self, name: str, context: dict) -> dict:
        cache_key = f"exa:{name}"
        
        # Check cache
        if cache_key in self.cache:
            cached_at, result = self.cache[cache_key]
            if time.time() - cached_at < self.cache_ttl:
                return result
        
        # Fetch from API
        result = await self._fetch_from_api(name, context)
        
        # Store in cache
        self.cache[cache_key] = (time.time(), result)
        return result

Monitoring Performance

Track Metrics

Use Laminar traces to identify bottlenecks:
from observability.laminar import traced

@traced(
    name="capture_pipeline",
    metadata={"video_duration": video_duration},
    tags=["performance"],
)
async def process_capture(capture_id: str, data: bytes) -> Result:
    # Automatically logs duration
    result = await pipeline.process(capture_id, data, content_type)
    return result
View in Laminar dashboard:
  • Sort by duration to find slowest operations
  • Filter by tags:performance
  • Compare across different inputs

Performance Benchmarks

Typical JARVIS performance:
OperationTargetTypical
Frame extraction (30s video)Less than 2s0.8s
Face detection (per frame)Less than 50ms10ms
PimEyes searchLess than 5s3s
Vision LLM extractionLess than 2s1.2s
Tier 1 enrichment (Exa)Less than 1s0.3s
Agent (LinkedIn)Less than 10s5s
Agent (Twitter)Less than 10s3s
Report synthesisLess than 5s2s
Total (1 person)Less than 30s15s

Production Optimizations

Connection Pooling

Reuse connections to external services:
import httpx

class APIClient:
    def __init__(self):
        # Connection pool automatically reuses connections
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(
                max_connections=100,
                max_keepalive_connections=20,
            ),
            timeout=httpx.Timeout(30.0),
        )
    
    async def close(self):
        await self.client.aclose()

Worker Processes

Scale horizontally with multiple workers:
# Development: 1 worker
uvicorn main:app --reload

# Production: 4 workers (1 per CPU core)
uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000

Background Tasks

Offload slow operations to background tasks:
from fastapi import BackgroundTasks

@app.post("/api/capture")
async def capture(
    file: UploadFile,
    background_tasks: BackgroundTasks,
):
    """Upload returns immediately, processing happens in background."""
    capture_id = generate_id()
    data = await file.read()
    
    # Return immediately
    background_tasks.add_task(
        process_capture_pipeline,
        capture_id=capture_id,
        data=data,
        content_type=file.content_type,
    )
    
    return {"capture_id": capture_id, "status": "processing"}

Best Practices

Always measure performance before optimizing:
from observability.laminar import traced

@traced(name="operation", tags=["performance"])
async def operation():
    # Laminar will show duration in dashboard
    pass
Use the Laminar dashboard to identify actual bottlenecks.
Focus on operations that block user experience:
  1. Capture → Face detection (user waits)
  2. Face detection → Identity (user waits)
  3. Identity → Tier 1 enrichment (user sees first results)
  4. Background: Tier 2 agents, synthesis
Don’t wait for all agents to complete:
async def stream_results():
    async for result in agent_results():
        # Stream to frontend immediately
        await convex.mutation("intel:create", result)
Don’t let slow operations block the pipeline:
try:
    result = await asyncio.wait_for(
        slow_operation(),
        timeout=5.0  # 5 second max
    )
except asyncio.TimeoutError:
    logger.warning("Operation timed out, skipping")
    result = None
  • Fast, cheap tasks: Gemini 2.0 Flash
  • Vision tasks: GPT-4 Vision
  • Complex reasoning: GPT-4
  • Don’t use GPT-4 for everything

Troubleshooting

Slow Pipeline

  1. Check Laminar traces to identify bottleneck
  2. Verify network latency to APIs
  3. Check if hitting rate limits
  4. Profile with cProfile for CPU-bound operations:
    python -m cProfile -o profile.stats main.py
    python -m pstats profile.stats
    

High Memory Usage

  1. Check for memory leaks with objgraph:
    import objgraph
    objgraph.show_most_common_types(limit=20)
    
  2. Limit concurrent operations:
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
    
    async def limited_operation():
        async with semaphore:
            await operation()
    

Rate Limiting

If hitting API rate limits:
  1. Add exponential backoff:
    import asyncio
    
    async def retry_with_backoff(fn, max_retries=3):
        for i in range(max_retries):
            try:
                return await fn()
            except RateLimitError:
                wait = 2 ** i  # 1s, 2s, 4s
                await asyncio.sleep(wait)
        raise
    
  2. Use account pools (PimEyes)
  3. Cache results aggressively

Next Steps

Observability

Monitor performance with Laminar

Testing

Write performance tests

Architecture

Understand system design

Deployment

Deploy for production

Build docs developers (and LLMs) love