Skip to main content

Overview

Scribe follows a modular, domain-driven architecture with clear separation between API routes, business logic, and data models.

Directory Tree

pythonserver/
├── 📄 main.py                      # FastAPI application entry point
├── 📄 celery_config.py             # Celery task queue configuration
├── 📄 alembic.ini                  # Database migration config
├── 📄 pytest.ini                   # Test configuration
├── 📄 requirements.txt             # Python dependencies
├── 📄 Makefile                     # Development commands
├── 📄 .env.example                 # Environment variables template
├── 📄 README.md                    # Project documentation
├── 📄 CLAUDE.md                    # AI assistant context

├── 📁 api/                         # FastAPI routes and dependencies
│   ├── dependencies.py             # Auth dependencies (JWT validation)
│   └── routes/
│       ├── user.py                 # User management endpoints
│       ├── email.py                # Email generation endpoints
│       ├── template.py             # Template endpoints
│       └── queue.py                # Queue management endpoints

├── 📁 models/                      # SQLAlchemy ORM models
│   ├── user.py                     # User model (Supabase auth.users)
│   ├── email.py                    # Email model (generated emails)
│   ├── template.py                 # Template model
│   └── queue_item.py               # Queue item model (batch processing)

├── 📁 schemas/                     # Pydantic schemas for validation
│   ├── auth.py                     # Authentication schemas
│   ├── pipeline.py                 # Email generation schemas
│   ├── template.py                 # Template schemas
│   └── queue.py                    # Queue request/response schemas

├── 📁 database/                    # Database configuration
│   ├── base.py                     # SQLAlchemy Base and engine
│   ├── session.py                  # Session management
│   ├── dependencies.py             # Database dependencies
│   └── utils.py                    # Health checks, connection validation

├── 📁 config/                      # Application configuration
│   └── settings.py                 # Pydantic Settings (environment variables)

├── 📁 pipeline/                    # 4-step email generation pipeline
│   ├── __init__.py
│   ├── core/                       # Pipeline infrastructure
│   │   ├── runner.py               # BasePipelineStep, PipelineRunner
│   │   └── exceptions.py           # Custom exceptions
│   ├── models/                     # Pipeline data models
│   │   └── core.py                 # PipelineData, StepResult, TemplateType
│   ├── steps/                      # Pipeline step implementations
│   │   ├── template_parser/        # Step 1: Parse template
│   │   │   ├── main.py             # Template analysis
│   │   │   └── tests/
│   │   │       ├── test_template_parser.py
│   │   │       └── test_template_parser_logging.py
│   │   ├── web_scraper/            # Step 2: Web scraping
│   │   │   ├── main.py             # Google Search + Playwright
│   │   │   └── tests/
│   │   │       └── test_full_pipeline.py
│   │   ├── arxiv_helper/           # Step 3: Academic papers
│   │   │   ├── main.py             # ArXiv paper fetching
│   │   │   └── tests/
│   │   └── email_composer/         # Step 4: Email generation
│   │       ├── main.py             # Final email composition + DB write
│   │       └── tests/
│   │           ├── test_email_composer.py
│   │           ├── test_db_insert.py
│   │           └── test_db_performance.py
│   └── tests/
│       └── test_full_pipeline_request.py

├── 📁 tasks/                       # Celery background tasks
│   └── email_tasks.py              # Email generation task orchestration

├── 📁 services/                    # External service integrations
│   ├── supabase.py                 # Supabase client singleton
│   ├── template_generator.py       # AI template generation from resume
│   └── prompts/                    # LLM prompts

├── 📁 utils/                       # Shared utilities
│   ├── uuid_helpers.py             # UUID validation and conversion
│   └── validators.py               # Common validators

├── 📁 observability/               # Monitoring and logging
│   └── logfire_config.py           # Logfire initialization

├── 📁 alembic/                     # Database migrations
│   ├── env.py                      # Alembic environment
│   ├── script.py.mako              # Migration template
│   └── versions/                   # Migration files
│       ├── 001_initial.py
│       ├── 002_add_queue_items.py
│       └── ...

├── 📁 tests/                       # Integration tests
│   └── integration/
│       ├── test_email_api.py       # Email API tests
│       ├── test_templates_api.py   # Template API tests
│       └── test_infrastructure.py  # Health checks, database

├── 📁 scripts/                     # Utility scripts
│   └── ...                         # Deployment, data migration scripts

├── 📁 docs/                        # Documentation
│   ├── QUICKSTART.MD
│   ├── ARCHITECTURE.MD
│   ├── PIPELINE.MD
│   ├── DEVELOPMENT.MD
│   └── API_REFERENCE.MD

└── 📁 prompts/                     # LLM system prompts
    └── ...                         # Prompt templates

Core Components

Entry Point

Purpose: FastAPI application initialization and configurationKey Responsibilities:
  • Create FastAPI app instance
  • Configure CORS middleware
  • Register API routers
  • Initialize Logfire observability
  • Mount static files (if any)
  • Health check endpoint
Example:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.routes import user, email, queue, template
import logfire

# Initialize Logfire
logfire.configure()

# Create app
app = FastAPI(
    title="Scribe API",
    version="1.0.0",
    description="AI-powered cold email generation"
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Register routes
app.include_router(user.router)
app.include_router(email.router)
app.include_router(queue.router)
app.include_router(template.router)
Purpose: Celery task queue configurationKey Configuration:
  • Redis broker URL
  • Result backend
  • Task serialization (JSON)
  • Task routing (email_default queue)
  • Timezone settings
Example:
from celery import Celery

celery_app = Celery(
    "scribe",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=['tasks.email_tasks']
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

API Layer

Purpose: FastAPI route handlers (controllers)Structure:
  • user.py: User initialization and profile management
  • email.py: Email generation and retrieval
  • queue.py: Batch submission, status polling, cancellation
  • template.py: AI template generation from resumes
Pattern:
from fastapi import APIRouter, Depends
from api.dependencies import get_current_user
from models.user import User

router = APIRouter(prefix="/api/email", tags=["Email"])

@router.post("/generate")
async def generate_email(
    request: GenerateEmailRequest,
    current_user: User = Depends(get_current_user)
) -> GenerateEmailResponse:
    # Dispatch Celery task
    task = generate_email_task.apply_async(
        kwargs={"user_id": str(current_user.id), ...}
    )
    return {"task_id": task.id}
Purpose: Reusable dependencies for authenticationKey Functions:
  • get_supabase_user(): Validates JWT and returns Supabase user
  • get_current_user(): Fetches user from local database
  • get_db(): Database session dependency
Example:
from fastapi import Depends, HTTPException, Header
from services.supabase import get_supabase_client

async def get_supabase_user(
    authorization: str = Header(...)
):
    token = authorization.replace("Bearer ", "")
    supabase = get_supabase_client()
    user = supabase.auth.get_user(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

Data Layer

Purpose: SQLAlchemy ORM models (database schema)Key Models:
  • User: User accounts (synced with Supabase auth.users)
  • Email: Generated emails with JSONB metadata
  • Template: User-created email templates
  • QueueItem: Batch processing queue with status tracking
Example:
from sqlalchemy import Column, String, Integer, TIMESTAMP, UUID, ForeignKey
from sqlalchemy.dialects.postgresql import JSONB
from database.base import Base

class Email(Base):
    __tablename__ = "emails"

    id = Column(UUID(as_uuid=True), primary_key=True)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"))
    recipient_name = Column(String(255), nullable=False)
    recipient_interest = Column(String(500), nullable=False)
    email_message = Column(Text, nullable=False)
    template_type = Column(String(50), nullable=False)
    metadata = Column(JSONB, default={})
    created_at = Column(TIMESTAMP, server_default=func.now())
Purpose: Pydantic schemas for request/response validationKey Features:
  • Automatic validation
  • Type coercion
  • OpenAPI documentation
  • Serialization/deserialization
Example:
from pydantic import BaseModel, Field

class GenerateEmailRequest(BaseModel):
    email_template: str = Field(..., min_length=20, max_length=5000)
    recipient_name: str = Field(..., min_length=2, max_length=255)
    recipient_interest: str = Field(..., min_length=2, max_length=500)

class GenerateEmailResponse(BaseModel):
    task_id: str
Purpose: Database configuration and utilitiesFiles:
  • base.py: SQLAlchemy engine and Base class
  • session.py: Session factory and context managers
  • dependencies.py: FastAPI database dependencies
  • utils.py: Health checks, connection testing
Configuration:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import NullPool
from config.settings import settings

# Use NullPool for Supabase transaction pooler
engine = create_engine(
    settings.database_url,
    poolclass=NullPool,
    connect_args={
        "connect_timeout": settings.db_connect_timeout,
        "options": f"-c statement_timeout={settings.db_statement_timeout}"
    }
)

Base = declarative_base()

Pipeline Architecture

Purpose: Base classes and infrastructure for pipeline stepsKey Components:
  • runner.py: BasePipelineStep abstract class, PipelineRunner orchestrator
  • exceptions.py: Custom exceptions (ValidationError, StepExecutionError, etc.)
Pattern:
from abc import ABC, abstractmethod
from pipeline.models.core import PipelineData, StepResult

class BasePipelineStep(ABC):
    def __init__(self, step_name: str):
        self.step_name = step_name

    @abstractmethod
    async def _validate_input(self, data: PipelineData) -> Optional[str]:
        """Validate prerequisites before execution"""
        pass

    @abstractmethod
    async def _execute_step(self, data: PipelineData) -> StepResult:
        """Execute step logic"""
        pass

    async def execute(self, data: PipelineData) -> StepResult:
        """Main entry point with timing and error handling"""
        # Validation → Execution → Timing → Error handling
        pass
Purpose: Data models for pipeline executionKey Classes:
  • PipelineData: Dataclass holding all pipeline state (in-memory)
  • StepResult: Result object from each step (success, metadata, error)
  • TemplateType: Enum (RESEARCH, BOOK, GENERAL)
Example:
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class TemplateType(str, Enum):
    RESEARCH = "research"
    BOOK = "book"
    GENERAL = "general"

@dataclass
class PipelineData:
    task_id: str
    user_id: str
    email_template: str
    recipient_name: str
    recipient_interest: str

    # Populated by steps
    search_terms: List[str] = field(default_factory=list)
    template_type: Optional[TemplateType] = None
    scraped_content: str = ""
    arxiv_papers: List[Dict] = field(default_factory=list)
    final_email: str = ""
    step_timings: Dict[str, float] = field(default_factory=dict)
Purpose: 4-step email generation pipelineSteps:
  1. template_parser: Analyze template, extract search terms, classify type
  2. web_scraper: Google Search + Playwright scraping + summarization
  3. arxiv_helper: Fetch academic papers (if RESEARCH type)
  4. email_composer: Generate final email and write to database
Each step directory contains:
  • main.py: Step implementation (inherits BasePipelineStep)
  • tests/: Step-specific tests
See Pipeline Deep Dive for detailed architecture.

Task Queue

Purpose: Celery task definitionsKey Tasks:
  • generate_email_task: Orchestrates 4-step pipeline
  • Updates queue_items status in database
  • Handles errors and retries
Example:
from celery import Task
from celery_config import celery_app
from pipeline import create_email_pipeline
from database.session import SessionLocal

@celery_app.task(bind=True, name="tasks.generate_email")
def generate_email_task(
    self: Task,
    queue_item_id: str
):
    # Update status to PROCESSING
    update_queue_status(queue_item_id, "PROCESSING", started_at=datetime.utcnow())

    try:
        # Run pipeline
        runner = create_email_pipeline()
        await runner.run(pipeline_data)

        # Update status to COMPLETED
        update_queue_status(
            queue_item_id,
            "COMPLETED",
            email_id=pipeline_data.email_id,
            completed_at=datetime.utcnow()
        )
    except Exception as e:
        # Update status to FAILED
        update_queue_status(
            queue_item_id,
            "FAILED",
            error_message=str(e)[:1000],
            completed_at=datetime.utcnow()
        )
        raise

Configuration

Purpose: Centralized configuration using Pydantic SettingsFeatures:
  • Type-safe environment variables
  • Validation at startup
  • Automatic .env file loading
  • Computed properties (e.g., database_url)
Example:
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    environment: str = Field(default="development")
    debug: bool = Field(default=False)

    # Database
    db_user: str = Field(..., description="Database user")
    db_password: str = Field(..., description="Database password")
    db_host: str = Field(..., description="Database host")
    db_port: int = Field(default=6543)
    db_name: str = Field(..., description="Database name")

    @property
    def database_url(self) -> str:
        return (
            f"postgresql+psycopg2://{self.db_user}:{self.db_password}@"
            f"{self.db_host}:{self.db_port}/{self.db_name}?sslmode=require"
        )

settings = Settings()

Testing

Structure:
  • Unit tests live alongside source code in tests/ subdirectories
  • Integration tests in top-level tests/integration/
  • Global fixtures in conftest.py
Test Discovery (pytest.ini):
[pytest]
testpaths = pipeline api scripts
python_files = test_*.py *_test.py
python_functions = test_*
asyncio_mode = auto

markers =
    unit: Fast unit tests
    integration: External dependencies
    slow: Long-running tests
See Testing Guide for more details.

Code Organization Principles

1. Separation of Concerns

📁 api/          → HTTP layer (routing, validation, serialization)
📁 models/       → Data layer (database schema)
📁 schemas/      → Contract layer (request/response shapes)
📁 pipeline/     → Business logic (email generation)
📁 tasks/        → Background processing (Celery)
📁 services/     → External integrations (Supabase, Anthropic)

2. Dependency Flow

API Routes → Dependencies → Services → Models → Database

   Tasks → Pipeline Steps → External APIs

3. Backend-First Architecture

  • Frontend: Supabase SDK for auth ONLY (OAuth, JWT)
  • Backend: All database operations via SQLAlchemy
  • Authentication: JWT validated on every request, user_id extracted
  • Database: No direct access from frontend

4. Type Safety Throughout

  • Pydantic models for API validation
  • SQLAlchemy models for database operations
  • Type hints on all function signatures
  • Structured LLM outputs via pydantic-ai

File Naming Conventions

TypePatternExample
Models{entity}.pymodels/email.py
Schemas{domain}.pyschemas/pipeline.py
Routes{resource}.pyapi/routes/user.py
Teststest_{module}.pytest_template_parser.py
Migrations{revision}_{description}.py002_add_queue_items.py
Fixturesconftest.pypipeline/conftest.py

Import Patterns

# ✅ CORRECT: Absolute imports from project root
from pipeline.steps.template_parser.main import TemplateParserStep
from pipeline.models.core import PipelineData
from models.email import Email
from config.settings import settings

Relative Imports (Avoid)

# ❌ WRONG: Relative imports fail in tests
from .main import TemplateParserStep
from ..models.core import PipelineData

Next Steps

Development Setup

Set up your local development environment

Testing Guide

Learn how to write and run tests

Pipeline Deep Dive

Understand the 4-step email generation pipeline

API Reference

Explore the REST API endpoints

Build docs developers (and LLMs) love