Overview
The capture pipeline is the entry point for all visual input. It processes images or videos, detects faces, generates embeddings, and orchestrates the downstream identification and enrichment workflow.
Location: backend/pipeline.py
Pipeline Architecture
Image/Video Upload
│
├─▶ Frame Extraction (FFmpeg)
│ └─▶ 1 FPS for videos, single frame for images
│
├─▶ Face Detection (MediaPipe)
│ └─▶ Bounding boxes + confidence scores
│
├─▶ Face Cropping (PIL)
│ └─▶ Isolated face images for search
│
├─▶ Embedding Generation (ArcFace)
│ └─▶ 512-dimensional face vectors
│
├─▶ Face Search (PimEyes + Reverse Image)
│ └─▶ Person name + profile URLs
│
├─▶ Person Record Creation (MongoDB + Convex)
│ └─▶ Real-time corkboard updates
│
└─▶ Enrichment Pipeline (Exa + Browser Agents)
└─▶ Parallel research → Synthesis → Complete dossier
Implementation Details
Core Pipeline Class
from backend / pipeline.py
class CapturePipeline :
def __init__ (
self ,
detector : FaceDetector,
embedder : ArcFaceEmbedder,
db : DatabaseGateway,
face_searcher : FaceSearchManager | None ,
exa_client : ExaEnrichmentClient | None ,
orchestrator : ResearchOrchestrator | None ,
synthesis_engine : AnthropicSynthesisEngine | GeminiSynthesisEngine | None ,
supermemory : SuperMemoryClient | None ,
):
self ._detector = detector
self ._embedder = embedder
self ._db = db
self ._face_searcher = face_searcher
self ._exa = exa_client
self ._orchestrator = orchestrator
self ._synthesis = synthesis_engine
self ._supermemory = supermemory
Main Processing Flow
Frame Extraction
Extract individual frames from uploaded media (videos → 1 FPS, images → single frame) frames = extract_frames(data, content_type)
# Uses FFmpeg for videos, PIL for images
Performance: ~100ms for typical 30-second video at 1 FPS
Face Detection
Detect all faces in each frame using MediaPipe for frame_bytes in frames:
detection_result = await self ._detector.detect_faces(
FaceDetectionRequest( image_data = frame_bytes)
)
for face in detection_result.faces:
# Process each detected face
Performance: 5-10ms per frame, 95%+ accuracy
Face Cropping
Extract face region from frame for downstream processing cropped_bytes = self ._crop_face(
frame_bytes,
face.bbox,
detection_result.frame_width,
detection_result.frame_height,
)
Bounding Box Format: @dataclass
class BoundingBox :
x: float # Normalized 0-1
y: float # Normalized 0-1
width: float # Normalized 0-1
height: float # Normalized 0-1
Embedding Generation
Generate 512-dimensional face embedding using ArcFace embedding = self ._embedder.embed(face, frame_bytes)
# Returns list[float] of length 512
Purpose: Embeddings enable semantic face similarity matching beyond pixel comparison
Face Identification
Search for the person using face image and embedding resolved_name = await self ._identify_face(embedding, face_image)
# Uses PimEyes → reverse image search waterfall
See Identification for details
Person Record Creation
Store person in both MongoDB and Convex person_id = f "person_ { uuid4().hex[: 12 ] } "
person_data = {
"capture_id" : capture_id,
"frame_index" : frame_idx,
"bbox" : face.bbox.model_dump(),
"confidence" : face.confidence,
"embedding" : embedding,
"status" : "identified" if resolved_name else "detected" ,
"name" : resolved_name,
}
await self ._db.store_person(person_id, person_data)
MongoDB: Raw image via GridFS, embeddings for future matchingConvex: Real-time updates for frontend corkboard
Parallel Enrichment
Launch enrichment for all identified persons (error-isolated) enrichment_tasks = [
self ._enrich_person(pid, name)
for pid, (name, _img) in person_identities.items()
]
results = await asyncio.gather( * enrichment_tasks, return_exceptions = True )
Error Isolation: Each person’s enrichment runs independently. Failures never crash the pipeline.
Location: backend/capture/frame_extractor.py
Video Processing
def extract_frames ( data : bytes , content_type : str ) -> list[ bytes ]:
if content_type.startswith( "video/" ):
return _extract_video_frames(data)
else :
return [data] # Single frame for images
def _extract_video_frames ( video_bytes : bytes , fps : int = 1 ) -> list[ bytes ]:
with tempfile.TemporaryDirectory() as tmpdir:
input_path = Path(tmpdir) / "input.mp4"
input_path.write_bytes(video_bytes)
output_pattern = Path(tmpdir) / "frame_ %03d .jpg"
subprocess.run([
"ffmpeg" , "-i" , str (input_path),
"-vf" , f "fps= { fps } " ,
"-q:v" , "2" , # High quality JPEG
str (output_pattern)
], check = True , capture_output = True )
frames = []
for frame_path in sorted (Path(tmpdir).glob( "frame_*.jpg" )):
frames.append(frame_path.read_bytes())
return frames
Why 1 FPS?
Reduces processing time 30x for 30-second videos
Faces don’t change significantly frame-to-frame
Sufficient for identification in most scenarios
Can be increased if needed (configurable)
Face Detection
Location: backend/identification/detector.py
import mediapipe as mp
class FaceDetector :
def __init__ ( self , min_confidence : float = 0.7 ):
self ._detector = mp.solutions.face_detection.FaceDetection(
min_detection_confidence = min_confidence
)
async def detect_faces ( self , request : FaceDetectionRequest) -> FaceDetectionResult:
# Convert bytes to RGB numpy array
image = Image.open(io.BytesIO(request.image_data))
rgb_array = np.array(image)
# Detect faces
results = self ._detector.process(rgb_array)
if not results.detections:
return FaceDetectionResult( faces = [], success = True )
faces = []
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
faces.append(FaceInfo(
bbox = BoundingBox(
x = bbox.xmin,
y = bbox.ymin,
width = bbox.width,
height = bbox.height,
),
confidence = detection.score[ 0 ],
))
return FaceDetectionResult(
faces = faces,
frame_width = image.width,
frame_height = image.height,
success = True ,
)
Why MediaPipe over alternatives?
Option Speed Accuracy Dependencies MediaPipe ✅5-10ms 95%+ pip install mediapipedlib 50-100ms 98% Complex C++ compilation MTCNN 100ms 97% TensorFlow dependency OpenCV Haar 2-5ms 80% Lower accuracy
MediaPipe provides the best balance of speed, accuracy, and ease of installation for hackathon development.
Face Embedding Generation
Location: backend/identification/embedder.py
ArcFace Integration
from insightface.app import FaceAnalysis
class ArcFaceEmbedder :
def __init__ ( self , model_name : str = "buffalo_l" ):
self ._app = FaceAnalysis( name = model_name)
self ._app.prepare( ctx_id = 0 , det_size = ( 640 , 640 ))
def embed ( self , face : FaceInfo, image_data : bytes ) -> list[ float ]:
image = Image.open(io.BytesIO(image_data))
rgb_array = np.array(image)
# Get face embeddings
faces = self ._app.get(rgb_array)
if not faces:
# Fallback: return zero embedding
return [ 0.0 ] * 512
# Return first face's embedding
return faces[ 0 ].embedding.tolist()
ArcFace Characteristics:
512-dimensional embeddings
State-of-the-art face recognition accuracy
Cosine similarity for matching (values 0-1)
Pre-trained on millions of faces
Embedding Use Cases:
De-duplication: Detect same person across multiple frames
Similarity search: Find similar faces in database
Clustering: Group photos by person
Verification: Confirm identity matches
Enrichment Orchestration
Two-Tier Enrichment
@traced ( "pipeline.enrich_person" )
async def _enrich_person ( self , person_id : str , person_name : str ) -> bool :
# Check SuperMemory cache first
cached_dossier = await self ._check_supermemory_cache(person_name)
if cached_dossier:
await self ._db.update_person(person_id, {
"status" : "enriched" ,
"dossier" : cached_dossier,
"source" : "supermemory_cache" ,
})
return True
# Fan out Exa + browser research in parallel
exa_result, browser_result = await asyncio.gather(
self ._run_exa(person_name),
self ._run_browser_research(person_name),
return_exceptions = True ,
)
# Merge into synthesis request
synthesis_request = self ._merge_to_synthesis_request(
person_name, exa_result, browser_result
)
# Synthesize with primary engine, fallback on failure
synthesis_result = await self ._synthesis.synthesize(synthesis_request)
if not synthesis_result.success and self ._synthesis_fallback:
synthesis_result = await self ._synthesis_fallback.synthesize(synthesis_request)
# Store dossier and detect connections
await self ._db.update_person(person_id, update_data)
await self ._store_supermemory_cache(person_name, update_data, dossier_dict)
await self ._detect_and_store_connections(person_id, synthesis_result.dossier)
Error Isolation
Critical Design Pattern Enrichment for each person runs independently with return_exceptions=True. This ensures:
One person’s failure never crashes others
Pipeline always completes for all detected faces
Partial results are acceptable (demo requirement)
enrichment_tasks = [
self ._enrich_person(pid, name)
for pid, (name, _img) in person_identities.items()
]
results = await asyncio.gather( * enrichment_tasks, return_exceptions = True )
for pid, result in zip (pids, results):
if isinstance (result, Exception ):
logger.error( "Enrichment crashed for person= {} : {} " , pid, result)
# Continue processing others
elif result:
persons_enriched += 1
SuperMemory Caching
Purpose: Cache completed dossiers to avoid re-running expensive enrichment for previously researched people.
async def _check_supermemory_cache ( self , person_name : str ) -> dict | None :
if not self ._supermemory:
return None
try :
return await self ._supermemory.search_person(person_name)
except Exception as exc:
logger.error( "SuperMemory cache check failed: {} " , exc)
return None
async def _store_supermemory_cache (
self , person_name : str , update_data : dict , dossier_dict : dict | None
) -> None :
if not self ._supermemory:
return
cache_payload = {
"summary" : update_data.get( "summary" , "" ),
"occupation" : update_data.get( "occupation" , "" ),
"organization" : update_data.get( "organization" , "" ),
}
if dossier_dict:
cache_payload[ "dossier" ] = dossier_dict
await self ._supermemory.store_dossier(person_name, cache_payload)
Cache Hit Rate:
First lookup: Full enrichment (45-130 seconds)
Subsequent lookups: Instant (less than 100ms)
Especially valuable for repeat demo subjects
Pipeline Result
@dataclass ( frozen = True )
class PipelineResult :
capture_id: str
total_frames: int = 0
faces_detected: int = 0
persons_created: list[ str ] = field( default_factory = list )
persons_enriched: int = 0
success: bool = True
error: str | None = None
Example Output:
{
"capture_id" : "cap_a7f9e2c1" ,
"total_frames" : 30 ,
"faces_detected" : 2 ,
"persons_created" : [ "person_d4e7f8a2" , "person_c9b3e1f5" ],
"persons_enriched" : 2 ,
"success" : true ,
"error" : null
}
Typical Timeline (single person from 30-second video):
Stage Duration Notes Frame extraction 100ms FFmpeg at 1 FPS Face detection 300ms 30 frames × 10ms Embedding generation 150ms 30 frames × 5ms Face search 10-30s PimEyes + reverse search Research (parallel) 20-60s All agents run concurrently Synthesis 5-10s LLM aggregation Total 45-130s End-to-end
Scaling:
Multiple faces: Processed sequentially (not parallelized within a frame)
Multiple frames: Processed sequentially (detection is CPU-bound)
Multiple people: Enrichment parallelized (I/O-bound, benefits from concurrency)
Observability
Laminar Tracing:
@traced ( "pipeline.process" )
async def process ( self , capture_id : str , data : bytes , ...) -> PipelineResult:
# Entire pipeline traced
...
@traced ( "pipeline.identify_face" )
async def _identify_face ( self , embedding : list[ float ], image_data : bytes ):
# Face search traced separately
...
@traced ( "pipeline.enrich_person" )
async def _enrich_person ( self , person_id : str , person_name : str ) -> bool :
# Enrichment traced per person
...
Trace Hierarchy:
pipeline.process
├── pipeline.identify_face (per face)
│ ├── pimeyes.search
│ └── reverse_search.search
├── pipeline.enrich_person (per person)
│ ├── exa.enrich
│ ├── orchestrator.research_person
│ │ ├── linkedin_agent.run
│ │ ├── twitter_agent.run
│ │ └── ...
│ └── synthesis.synthesize
└── connections.detect
Next Steps
Identification How PimEyes and reverse image search identify people from faces
Agent Swarm Research orchestration with parallel agent execution