Overview
The CaptureService handles incoming media uploads (images, videos, frames) and routes them through the face detection and identification pipeline.
Service Architecture
backend/capture/service.py
class CaptureService:
"""Process incoming media through the face detection pipeline."""
def __init__(self, pipeline: CapturePipeline | None = None) -> None:
self._pipeline = pipeline
@property
def pipeline(self) -> CapturePipeline | None:
return self._pipeline
@pipeline.setter
def pipeline(self, value: CapturePipeline) -> None:
self._pipeline = value
Upload Processing
Enqueue Upload
The main entry point for media uploads generates a unique capture ID and processes the file:
backend/capture/service.py
async def enqueue_upload(
self,
file: UploadFile,
source: str = "manual_upload",
person_name: str | None = None,
) -> CaptureQueuedResponse | dict:
capture_id = f"cap_{uuid4().hex[:12]}"
filename = file.filename or "upload.bin"
content_type = file.content_type or "application/octet-stream"
if self._pipeline is None:
logger.warning("No pipeline configured, returning queued response only")
return CaptureQueuedResponse(
capture_id=capture_id,
filename=filename,
content_type=content_type,
status="queued",
source=source,
)
data = await file.read()
result: PipelineResult = await self._pipeline.process(
capture_id=capture_id,
data=data,
content_type=content_type,
source=source,
person_name=person_name,
)
return {
"capture_id": capture_id,
"filename": filename,
"content_type": content_type,
"status": "processed" if result.success else "error",
"source": source,
"total_frames": result.total_frames,
"faces_detected": result.faces_detected,
"persons_created": result.persons_created,
"error": result.error,
}
API Endpoints
POST /api/capture
Upload an image or video for processing:
@app.post("/api/capture")
async def capture(
file: UploadFile = upload_file,
source: str = "manual_upload",
person_name: str | None = None,
):
return await capture_service.enqueue_upload(
file=file, source=source, person_name=person_name,
)
Image or video file to process
source
string
default:"manual_upload"
Source identifier (e.g., “manual_upload”, “telegram”, “webhook”)
Optional known person name to assist identification
Response:
{
"capture_id": "cap_a1b2c3d4e5f6",
"filename": "photo.jpg",
"content_type": "image/jpeg",
"status": "processed",
"source": "manual_upload",
"total_frames": 1,
"faces_detected": 2,
"persons_created": ["person_123", "person_456"],
"error": null
}
POST /api/capture/frame
Process a single frame with base64 encoding (used by real-time video streams):
@app.post("/api/capture/frame", response_model=FrameProcessedResponse)
async def capture_frame(submission: FrameSubmission) -> FrameProcessedResponse:
result = await frame_handler.process_frame(
frame_b64=submission.frame,
timestamp=submission.timestamp,
source=submission.source,
target=submission.target,
)
return FrameProcessedResponse(**result)
Base64-encoded image data
Frame timestamp in milliseconds
Source identifier (e.g., “webcam”, “glasses”)
Optional target tracking ID
POST /api/capture/identify
Identify a person by name and image URL:
@app.post("/api/capture/identify", response_model=IdentifyResponse)
async def identify(body: IdentifyRequest) -> IdentifyResponse:
"""Identify a person by name + image URL. Downloads the image, runs the full pipeline."""
capture_id = f"identify_{uuid4().hex[:12]}"
async with httpx.AsyncClient(timeout=30.0) as client:
try:
resp = await client.get(body.image_url)
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=400,
detail=f"Failed to download image: HTTP {exc.response.status_code}",
) from exc
except httpx.RequestError as exc:
raise HTTPException(
status_code=400,
detail=f"Failed to download image: {exc}",
) from exc
content_type = resp.headers.get("content-type", "image/jpeg").split(";")[0]
image_data = resp.content
result = await pipeline.process(
capture_id=capture_id,
data=image_data,
content_type=content_type,
source="api_identify",
person_name=body.name,
)
return IdentifyResponse(
capture_id=result.capture_id,
total_frames=result.total_frames,
faces_detected=result.faces_detected,
persons_created=list(result.persons_created),
persons_enriched=result.persons_enriched,
success=result.success,
error=result.error,
)
Person’s name to identify
URL of the image to process
Pipeline Integration
The CaptureService delegates to the CapturePipeline for the actual processing:
pipeline = CapturePipeline(
detector=detector,
embedder=embedder,
db=db_gateway,
face_searcher=face_searcher,
exa_client=exa_client,
orchestrator=orchestrator,
synthesis_engine=synthesis_engine,
synthesis_fallback=synthesis_fallback,
supermemory=supermemory_client,
)
capture_service = CaptureService(pipeline=pipeline)
Content Type Support
The service handles multiple content types:
- Images:
image/jpeg, image/png, image/webp
- Videos:
video/mp4, video/webm, video/quicktime
- Raw data:
application/octet-stream (auto-detected)
Error Handling
The service implements graceful degradation:
if self._pipeline is None:
logger.warning("No pipeline configured, returning queued response only")
return CaptureQueuedResponse(
capture_id=capture_id,
status="queued",
# ... other fields
)
If the pipeline is not configured, uploads return a “queued” status but won’t be processed.
Capture IDs follow a consistent pattern:
- Manual uploads:
cap_{12_hex_chars}
- Identify endpoint:
identify_{12_hex_chars}
- Webhook uploads:
webhook_{12_hex_chars}
capture_id = f"cap_{uuid4().hex[:12]}"
# Example: "cap_a1b2c3d4e5f6"
Next Steps
Identification Pipeline
See how faces are detected and identified
Frame Handler
Learn about real-time video frame processing