Skip to main content

Overview

The capture pipeline is the entry point for all visual input. It processes images or videos, detects faces, generates embeddings, and orchestrates the downstream identification and enrichment workflow. Location: backend/pipeline.py

Pipeline Architecture

Image/Video Upload

  ├─▶ Frame Extraction (FFmpeg)
  │     └─▶ 1 FPS for videos, single frame for images

  ├─▶ Face Detection (MediaPipe)
  │     └─▶ Bounding boxes + confidence scores

  ├─▶ Face Cropping (PIL)
  │     └─▶ Isolated face images for search

  ├─▶ Embedding Generation (ArcFace)
  │     └─▶ 512-dimensional face vectors

  ├─▶ Face Search (PimEyes + Reverse Image)
  │     └─▶ Person name + profile URLs

  ├─▶ Person Record Creation (MongoDB + Convex)
  │     └─▶ Real-time corkboard updates

  └─▶ Enrichment Pipeline (Exa + Browser Agents)
        └─▶ Parallel research → Synthesis → Complete dossier

Implementation Details

Core Pipeline Class

from backend/pipeline.py

class CapturePipeline:
    def __init__(
        self,
        detector: FaceDetector,
        embedder: ArcFaceEmbedder,
        db: DatabaseGateway,
        face_searcher: FaceSearchManager | None,
        exa_client: ExaEnrichmentClient | None,
        orchestrator: ResearchOrchestrator | None,
        synthesis_engine: AnthropicSynthesisEngine | GeminiSynthesisEngine | None,
        supermemory: SuperMemoryClient | None,
    ):
        self._detector = detector
        self._embedder = embedder
        self._db = db
        self._face_searcher = face_searcher
        self._exa = exa_client
        self._orchestrator = orchestrator
        self._synthesis = synthesis_engine
        self._supermemory = supermemory

Main Processing Flow

1

Frame Extraction

Extract individual frames from uploaded media (videos → 1 FPS, images → single frame)
frames = extract_frames(data, content_type)
# Uses FFmpeg for videos, PIL for images
Performance: ~100ms for typical 30-second video at 1 FPS
2

Face Detection

Detect all faces in each frame using MediaPipe
for frame_bytes in frames:
    detection_result = await self._detector.detect_faces(
        FaceDetectionRequest(image_data=frame_bytes)
    )
    
    for face in detection_result.faces:
        # Process each detected face
Performance: 5-10ms per frame, 95%+ accuracy
3

Face Cropping

Extract face region from frame for downstream processing
cropped_bytes = self._crop_face(
    frame_bytes, 
    face.bbox,
    detection_result.frame_width,
    detection_result.frame_height,
)
Bounding Box Format:
@dataclass
class BoundingBox:
    x: float       # Normalized 0-1
    y: float       # Normalized 0-1
    width: float   # Normalized 0-1
    height: float  # Normalized 0-1
4

Embedding Generation

Generate 512-dimensional face embedding using ArcFace
embedding = self._embedder.embed(face, frame_bytes)
# Returns list[float] of length 512
Purpose: Embeddings enable semantic face similarity matching beyond pixel comparison
5

Face Identification

Search for the person using face image and embedding
resolved_name = await self._identify_face(embedding, face_image)
# Uses PimEyes → reverse image search waterfall
See Identification for details
6

Person Record Creation

Store person in both MongoDB and Convex
person_id = f"person_{uuid4().hex[:12]}"
person_data = {
    "capture_id": capture_id,
    "frame_index": frame_idx,
    "bbox": face.bbox.model_dump(),
    "confidence": face.confidence,
    "embedding": embedding,
    "status": "identified" if resolved_name else "detected",
    "name": resolved_name,
}
await self._db.store_person(person_id, person_data)
MongoDB: Raw image via GridFS, embeddings for future matchingConvex: Real-time updates for frontend corkboard
7

Parallel Enrichment

Launch enrichment for all identified persons (error-isolated)
enrichment_tasks = [
    self._enrich_person(pid, name)
    for pid, (name, _img) in person_identities.items()
]
results = await asyncio.gather(*enrichment_tasks, return_exceptions=True)
Error Isolation: Each person’s enrichment runs independently. Failures never crash the pipeline.

Frame Extraction

Location: backend/capture/frame_extractor.py

Video Processing

def extract_frames(data: bytes, content_type: str) -> list[bytes]:
    if content_type.startswith("video/"):
        return _extract_video_frames(data)
    else:
        return [data]  # Single frame for images

def _extract_video_frames(video_bytes: bytes, fps: int = 1) -> list[bytes]:
    with tempfile.TemporaryDirectory() as tmpdir:
        input_path = Path(tmpdir) / "input.mp4"
        input_path.write_bytes(video_bytes)
        
        output_pattern = Path(tmpdir) / "frame_%03d.jpg"
        subprocess.run([
            "ffmpeg", "-i", str(input_path),
            "-vf", f"fps={fps}",
            "-q:v", "2",  # High quality JPEG
            str(output_pattern)
        ], check=True, capture_output=True)
        
        frames = []
        for frame_path in sorted(Path(tmpdir).glob("frame_*.jpg")):
            frames.append(frame_path.read_bytes())
        return frames
Why 1 FPS?
  • Reduces processing time 30x for 30-second videos
  • Faces don’t change significantly frame-to-frame
  • Sufficient for identification in most scenarios
  • Can be increased if needed (configurable)

Face Detection

Location: backend/identification/detector.py

MediaPipe Integration

import mediapipe as mp

class FaceDetector:
    def __init__(self, min_confidence: float = 0.7):
        self._detector = mp.solutions.face_detection.FaceDetection(
            min_detection_confidence=min_confidence
        )
    
    async def detect_faces(self, request: FaceDetectionRequest) -> FaceDetectionResult:
        # Convert bytes to RGB numpy array
        image = Image.open(io.BytesIO(request.image_data))
        rgb_array = np.array(image)
        
        # Detect faces
        results = self._detector.process(rgb_array)
        
        if not results.detections:
            return FaceDetectionResult(faces=[], success=True)
        
        faces = []
        for detection in results.detections:
            bbox = detection.location_data.relative_bounding_box
            faces.append(FaceInfo(
                bbox=BoundingBox(
                    x=bbox.xmin,
                    y=bbox.ymin,
                    width=bbox.width,
                    height=bbox.height,
                ),
                confidence=detection.score[0],
            ))
        
        return FaceDetectionResult(
            faces=faces,
            frame_width=image.width,
            frame_height=image.height,
            success=True,
        )
Why MediaPipe over alternatives?
OptionSpeedAccuracyDependencies
MediaPipe5-10ms95%+pip install mediapipe
dlib50-100ms98%Complex C++ compilation
MTCNN100ms97%TensorFlow dependency
OpenCV Haar2-5ms80%Lower accuracy
MediaPipe provides the best balance of speed, accuracy, and ease of installation for hackathon development.

Face Embedding Generation

Location: backend/identification/embedder.py

ArcFace Integration

from insightface.app import FaceAnalysis

class ArcFaceEmbedder:
    def __init__(self, model_name: str = "buffalo_l"):
        self._app = FaceAnalysis(name=model_name)
        self._app.prepare(ctx_id=0, det_size=(640, 640))
    
    def embed(self, face: FaceInfo, image_data: bytes) -> list[float]:
        image = Image.open(io.BytesIO(image_data))
        rgb_array = np.array(image)
        
        # Get face embeddings
        faces = self._app.get(rgb_array)
        
        if not faces:
            # Fallback: return zero embedding
            return [0.0] * 512
        
        # Return first face's embedding
        return faces[0].embedding.tolist()
ArcFace Characteristics:
  • 512-dimensional embeddings
  • State-of-the-art face recognition accuracy
  • Cosine similarity for matching (values 0-1)
  • Pre-trained on millions of faces
Embedding Use Cases:
  1. De-duplication: Detect same person across multiple frames
  2. Similarity search: Find similar faces in database
  3. Clustering: Group photos by person
  4. Verification: Confirm identity matches

Enrichment Orchestration

Two-Tier Enrichment

@traced("pipeline.enrich_person")
async def _enrich_person(self, person_id: str, person_name: str) -> bool:
    # Check SuperMemory cache first
    cached_dossier = await self._check_supermemory_cache(person_name)
    if cached_dossier:
        await self._db.update_person(person_id, {
            "status": "enriched",
            "dossier": cached_dossier,
            "source": "supermemory_cache",
        })
        return True
    
    # Fan out Exa + browser research in parallel
    exa_result, browser_result = await asyncio.gather(
        self._run_exa(person_name),
        self._run_browser_research(person_name),
        return_exceptions=True,
    )
    
    # Merge into synthesis request
    synthesis_request = self._merge_to_synthesis_request(
        person_name, exa_result, browser_result
    )
    
    # Synthesize with primary engine, fallback on failure
    synthesis_result = await self._synthesis.synthesize(synthesis_request)
    if not synthesis_result.success and self._synthesis_fallback:
        synthesis_result = await self._synthesis_fallback.synthesize(synthesis_request)
    
    # Store dossier and detect connections
    await self._db.update_person(person_id, update_data)
    await self._store_supermemory_cache(person_name, update_data, dossier_dict)
    await self._detect_and_store_connections(person_id, synthesis_result.dossier)

Error Isolation

Critical Design PatternEnrichment for each person runs independently with return_exceptions=True. This ensures:
  • One person’s failure never crashes others
  • Pipeline always completes for all detected faces
  • Partial results are acceptable (demo requirement)
enrichment_tasks = [
    self._enrich_person(pid, name)
    for pid, (name, _img) in person_identities.items()
]
results = await asyncio.gather(*enrichment_tasks, return_exceptions=True)

for pid, result in zip(pids, results):
    if isinstance(result, Exception):
        logger.error("Enrichment crashed for person={}: {}", pid, result)
        # Continue processing others
    elif result:
        persons_enriched += 1

SuperMemory Caching

Purpose: Cache completed dossiers to avoid re-running expensive enrichment for previously researched people.
async def _check_supermemory_cache(self, person_name: str) -> dict | None:
    if not self._supermemory:
        return None
    try:
        return await self._supermemory.search_person(person_name)
    except Exception as exc:
        logger.error("SuperMemory cache check failed: {}", exc)
        return None

async def _store_supermemory_cache(
    self, person_name: str, update_data: dict, dossier_dict: dict | None
) -> None:
    if not self._supermemory:
        return
    cache_payload = {
        "summary": update_data.get("summary", ""),
        "occupation": update_data.get("occupation", ""),
        "organization": update_data.get("organization", ""),
    }
    if dossier_dict:
        cache_payload["dossier"] = dossier_dict
    await self._supermemory.store_dossier(person_name, cache_payload)
Cache Hit Rate:
  • First lookup: Full enrichment (45-130 seconds)
  • Subsequent lookups: Instant (less than 100ms)
  • Especially valuable for repeat demo subjects

Pipeline Result

@dataclass(frozen=True)
class PipelineResult:
    capture_id: str
    total_frames: int = 0
    faces_detected: int = 0
    persons_created: list[str] = field(default_factory=list)
    persons_enriched: int = 0
    success: bool = True
    error: str | None = None
Example Output:
{
  "capture_id": "cap_a7f9e2c1",
  "total_frames": 30,
  "faces_detected": 2,
  "persons_created": ["person_d4e7f8a2", "person_c9b3e1f5"],
  "persons_enriched": 2,
  "success": true,
  "error": null
}

Performance Characteristics

Typical Timeline (single person from 30-second video):
StageDurationNotes
Frame extraction100msFFmpeg at 1 FPS
Face detection300ms30 frames × 10ms
Embedding generation150ms30 frames × 5ms
Face search10-30sPimEyes + reverse search
Research (parallel)20-60sAll agents run concurrently
Synthesis5-10sLLM aggregation
Total45-130sEnd-to-end
Scaling:
  • Multiple faces: Processed sequentially (not parallelized within a frame)
  • Multiple frames: Processed sequentially (detection is CPU-bound)
  • Multiple people: Enrichment parallelized (I/O-bound, benefits from concurrency)

Observability

Laminar Tracing:
@traced("pipeline.process")
async def process(self, capture_id: str, data: bytes, ...) -> PipelineResult:
    # Entire pipeline traced
    ...

@traced("pipeline.identify_face")
async def _identify_face(self, embedding: list[float], image_data: bytes):
    # Face search traced separately
    ...

@traced("pipeline.enrich_person")
async def _enrich_person(self, person_id: str, person_name: str) -> bool:
    # Enrichment traced per person
    ...
Trace Hierarchy:
pipeline.process
├── pipeline.identify_face (per face)
│   ├── pimeyes.search
│   └── reverse_search.search
├── pipeline.enrich_person (per person)
│   ├── exa.enrich
│   ├── orchestrator.research_person
│   │   ├── linkedin_agent.run
│   │   ├── twitter_agent.run
│   │   └── ...
│   └── synthesis.synthesize
└── connections.detect

Next Steps

Identification

How PimEyes and reverse image search identify people from faces

Agent Swarm

Research orchestration with parallel agent execution

Build docs developers (and LLMs) love