Skip to main content

Backend Stack

  • Framework: FastAPI 0.129.0
  • ASGI Server: Uvicorn 0.41.0
  • Language: Python 3.14
  • ORM: SQLAlchemy 2.0.46 (async)
  • Database Driver: asyncpg 0.31.0
  • AI SDK: Anthropic 0.83.0
  • Graph Database: neo4j 5.0+
  • Memory Layer: mem0ai 0.1.98+
  • MCP: mcp 1.0.0+

Project Structure

app/
├── main.py                    # Application entry point
├── core/
│   ├── config.py              # Pydantic settings (env vars)
│   ├── database.py            # SQLAlchemy async engine
│   ├── neo4j_client.py        # Neo4j driver singleton
│   └── neo4j_schema.py        # Graph constraints & indexes
├── models/
│   ├── user.py                # User model (GitHub accounts)
│   ├── installation.py        # Repo connections
│   ├── event.py               # Webhook events
│   ├── workflow.py            # Review workflow runs
│   └── oauth_state.py         # OAuth CSRF tokens
├── api/v1/
│   ├── webhooks.py            # GitHub webhook receiver
│   ├── repos.py               # Repo management
│   ├── reviews.py             # PR review history
│   ├── events.py              # Event queries
│   ├── analytics.py           # Team metrics
│   └── memory.py              # Mem0 CRUD
├── auth/
│   ├── router.py              # GitHub OAuth flow
│   ├── dependencies.py        # get_current_user() middleware
│   ├── jwt_utils.py           # JWT sign/verify
│   └── token_encryption.py    # Fernet token encryption
├── services/
│   ├── pr_review_service.py   # Review orchestrator
│   ├── ai_service.py          # Claude integration
│   ├── context_service.py     # Context builder (Mem0+Neo4j)
│   ├── graph_builder.py       # Neo4j operations
│   ├── memory_adapter.py      # Mem0 wrapper
│   ├── memory_extractor.py    # Post-review memory extraction
│   └── project_scanner.py     # Initial repo file scan
├── integrations/github/
│   ├── client.py              # GitHub REST API client
│   └── webhook_manager.py     # Webhook setup/teardown
└── mcp/
    ├── server.py              # FastMCP server (outbound)
    └── client.py              # MCP client (inbound)

Application Entry Point

app/main.py is the FastAPI application entry point.

Lifespan Management

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    # 1. Run Alembic migrations
    await asyncio.to_thread(_run_migrations)
    
    # 2. Create PostgreSQL tables
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    # 3. Initialize Neo4j
    await init_driver()
    await create_schema()
    
    # 4. Background task: scan repos not yet indexed
    asyncio.create_task(_scan_unindexed_repos())
    
    yield  # App runs
    
    # Shutdown
    await engine.dispose()
    await close_driver()
See app/main.py:88-131.

CORS Configuration

ALLOWED_ORIGINS = [
    "http://localhost:5173",
    "http://localhost:3000",
    "http://localhost:3001",
    settings.FRONTEND_URL,
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,  # Required for cookies
    allow_methods=["GET", "POST", "DELETE", "PATCH", "OPTIONS"],
    allow_headers=["Content-Type", "Authorization"],
)
See app/main.py:143-157.

Request Logging Middleware

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    logger.info(
        f"{request.method} {request.url.path} → "
        f"{response.status_code} ({duration}ms)"
    )
    return response
See app/main.py:180-188.

Core Services

PR Review Service

File: app/services/pr_review_service.py Orchestrates the entire PR review workflow:
  1. Fetch PR data from GitHub
  2. Build review context (Neo4j + Mem0 + MCP)
  3. Run AI analysis (Claude with agentic tools)
  4. Post review comment on GitHub
  5. Index PR in Neo4j
  6. Extract memories to Mem0
Key method: process_pr_review(payload, event, db) See app/services/pr_review_service.py:464-809.

AI Service

File: app/services/ai_service.py Claude integration with two review modes: Standard mode (default):
  • Single agentic loop
  • Claude uses 8 MCP-style tools on demand:
    • read_file — Read full source file
    • search_project_memory — Query Mem0 project patterns
    • search_developer_memory — Query Mem0 developer habits
    • get_file_history — File experts + related PRs (Neo4j)
    • get_issue_details — Fetch GitHub issue details
    • search_open_issues — Find semantic issue matches
    • get_linked_issues — Fetch Linear/GitHub issues (MCP)
    • get_related_errors — Fetch Sentry errors (MCP)
Parallel mode (PARALLEL_REVIEW_AGENTS=true):
  • 3 specialized agents run concurrently:
    • Security agent — Injection, auth, secrets, crypto
    • Performance agent — N+1 queries, algorithms, memory leaks
    • Style agent — Tests, complexity, naming, error handling
  • Synthesis agent combines all three into final review
See Parallel Agents for details.

Context Service

File: app/services/context_service.py Builds rich context for PR reviews by querying:
  • Mem0 — Project patterns, developer habits
  • Neo4j — File experts, related PRs
  • MCP integrations — Linear issues, Sentry errors, Slack messages

Graph Builder

File: app/services/graph_builder.py Neo4j operations:
  • build_repo_graph(owner, repo, token) — Initial file tree scan
  • index_pr(repo, pr_number, title, author, files, verdict, issues) — Index PR after review
  • get_file_experts(repo, file_paths, top_k) — Developers who most touch these files
  • get_related_prs(repo, file_paths, top_k) — Past PRs with file overlap

Memory Adapter

File: app/services/memory_adapter.py Async wrapper for Mem0 API:
  • add(memory, repo, developer, metadata) — Store memory
  • search_relevant(repo, query, developer, top_k) — Semantic search
  • delete(memory_id) — Delete memory

Memory Extractor

File: app/services/memory_extractor.py Post-review memory extraction:
  1. Claude analyzes PR + review summary
  2. Extracts structured memories:
    • project_pattern — Architectural patterns
    • decision — Design decisions
    • developer_pattern — Developer habits
    • developer_strength — Developer expertise
    • risk_module — High-risk code areas
    • contributor_profile — Contributor traits
  3. Stores in Mem0 with metadata

GitHub Integration

GitHub Client

File: app/integrations/github/client.py
class GitHubClient:
    async def get_pr_diff(owner, repo, pr_number) -> str
    async def get_pr_files(owner, repo, pr_number) -> list[dict]
    async def get_file_content(owner, repo, path, ref) -> str
    async def get_issue(owner, repo, issue_number) -> dict
    async def get_repo_issues(owner, repo, state, per_page) -> list[dict]
    async def post_pr_review(owner, repo, pr_number, commit_id, body, event, comments) -> dict
    async def post_pr_comment(owner, repo, pr_number, body) -> dict

Webhook Manager

File: app/integrations/github/webhook_manager.py
async def install_webhook(
    owner: str,
    repo: str,
    access_token: str,
    backend_url: str,
) -> tuple[int, str]:  # (webhook_id, webhook_secret)

async def uninstall_webhook(
    owner: str,
    repo: str,
    webhook_id: int,
    access_token: str,
) -> None

MCP Integration

Nectr as MCP Server

File: app/mcp/server.py Exposes Nectr data to external agents (e.g., Claude Desktop):
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Nectr PR Review Agent")

@mcp.tool()
async def get_recent_reviews(repo: str, limit: int = 10) -> list[dict]:
    """Recent PR reviews with verdicts and summaries."""
    ...

@mcp.tool()
async def get_contributor_stats(repo: str, limit: int = 10) -> list[dict]:
    """Top contributors with PR-touch counts."""
    ...

@mcp.tool()
async def get_pr_verdict(repo: str, pr_number: int) -> dict:
    """AI verdict for specific PR."""
    ...

@mcp.tool()
async def get_repo_health(repo: str) -> dict:
    """Repository health score (0-100)."""
    ...
Mount in FastAPI:
from app.mcp.server import mcp
app.mount("/mcp", mcp.sse_app())
See app/main.py:164-169.

Nectr as MCP Client

File: app/mcp/client.py Pulls live context from third-party MCP servers during reviews:
class MCPClientManager:
    async def get_linear_issues(team_id: str, query: str) -> list[dict]
    async def get_github_issues(repo: str, query: str) -> list[dict]
    async def get_sentry_errors(project: str, filename: str) -> list[dict]
    async def get_slack_messages(channel: str, query: str) -> list[dict]
Falls back gracefully when MCP server URLs are not configured.

Authentication

GitHub OAuth Flow

File: app/auth/router.py
@router.get("/auth/github")
async def github_oauth(db: AsyncSession):
    # Generate CSRF state token
    state = secrets.token_urlsafe(32)
    await db.execute(insert(OAuthState).values(state=state))
    
    # Redirect to GitHub
    return RedirectResponse(
        f"https://github.com/login/oauth/authorize?"
        f"client_id={settings.GITHUB_CLIENT_ID}&"
        f"redirect_uri={callback_url}&"
        f"state={state}&"
        f"scope=repo%20read:user"
    )

@router.get("/auth/github/callback")
async def github_callback(code: str, state: str, db: AsyncSession):
    # Verify state
    # Exchange code for access_token
    # Create/update user in DB
    # Encrypt token with Fernet
    # Generate JWT
    # Set httpOnly cookie
    # Redirect to frontend dashboard

JWT Utils

File: app/auth/jwt_utils.py
def create_access_token(data: dict, expires_delta: timedelta) -> str:
    to_encode = data.copy()
    expire = datetime.now(UTC) + expires_delta
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)

def verify_token(token: str) -> dict:
    payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
    return payload

Token Encryption

File: app/auth/token_encryption.py GitHub access tokens are encrypted at rest using Fernet (AES-128-CBC):
from cryptography.fernet import Fernet
import hashlib
import base64

def get_fernet() -> Fernet:
    key = base64.urlsafe_b64encode(
        hashlib.sha256(settings.SECRET_KEY.encode()).digest()
    )
    return Fernet(key)

def encrypt_token(token: str) -> str:
    return get_fernet().encrypt(token.encode()).decode()

def decrypt_token(encrypted: str) -> str:
    return get_fernet().decrypt(encrypted.encode()).decode()

Current User Dependency

File: app/auth/dependencies.py
async def get_current_user(
    request: Request,
    db: AsyncSession = Depends(get_db),
) -> User:
    token = request.cookies.get("access_token")
    if not token:
        raise HTTPException(status_code=401)
    
    payload = verify_token(token)
    user_id = payload.get("sub")
    
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=401)
    
    return user

API Endpoints

Webhooks

File: app/api/v1/webhooks.py
@router.post("/webhooks/github")
async def github_webhook(
    request: Request,
    background_tasks: BackgroundTasks,
    db: AsyncSession,
):
    # Verify HMAC-SHA256 signature
    # Check deduplication
    # Create Event row
    # Add background task
    background_tasks.add_task(process_pr_in_background, event_id, payload)
    return {"status": "accepted"}
Returns 200 immediately and processes PR in background.

Repos

File: app/api/v1/repos.py
@router.post("/repos/{owner}/{repo}/install")
async def install_repo(
    owner: str,
    repo: str,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    # Install webhook
    # Save Installation to DB
    # Scan file tree (background task)
    return {"success": True}

@router.delete("/repos/{owner}/{repo}/install")
async def uninstall_repo(owner, repo, current_user, db):
    # Uninstall webhook
    # Mark Installation inactive
    return {"success": True}

Reviews

File: app/api/v1/reviews.py
@router.get("/reviews")
async def get_reviews(
    repo: str | None = None,
    limit: int = 20,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
) -> list[dict]:
    # Query WorkflowRun + Event + Installation
    # Filter by user's repos
    # Return PR review history

Analytics

File: app/api/v1/analytics.py
@router.get("/analytics")
async def get_analytics(
    repo: str | None = None,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
) -> dict:
    # Query WorkflowRun statistics
    # Group by verdict, date, file
    # Return metrics

Memory

File: app/api/v1/memory.py
@router.get("/memory")
async def list_memories(repo: str, current_user, db) -> list[dict]:
    # Query Mem0 for repo

@router.post("/memory")
async def add_memory(repo: str, memory: str, current_user, db) -> dict:
    # Store custom rule in Mem0

@router.delete("/memory/{id}")
async def delete_memory(id: str, current_user, db) -> dict:
    # Delete memory from Mem0

Database Models

User Model

File: app/models/user.py
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    github_id = Column(Integer, unique=True, index=True, nullable=False)
    github_username = Column(String, nullable=False)
    github_access_token = Column(String, nullable=False)  # Encrypted with Fernet
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

Installation Model

File: app/models/installation.py
class Installation(Base):
    __tablename__ = "installations"
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    repo_full_name = Column(String, nullable=False)  # "owner/repo"
    installation_id = Column(Integer, nullable=True)  # GitHub App installation ID
    github_repo_id = Column(Integer, nullable=True)   # GitHub repo ID
    webhook_id = Column(Integer, nullable=True)
    webhook_secret = Column(String, nullable=False)   # Unique per repo
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Event Model

File: app/models/event.py
class Event(Base):
    __tablename__ = "events"
    
    id = Column(Integer, primary_key=True)
    event_type = Column(String, nullable=False)  # "pull_request", "issues"
    payload = Column(JSON, nullable=False)
    status = Column(String, default="pending")  # "pending", "completed", "failed"
    deduplication_hash = Column(String, index=True)  # SHA256 hash
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    processed_at = Column(DateTime(timezone=True), nullable=True)

WorkflowRun Model

File: app/models/workflow.py
class WorkflowRun(Base):
    __tablename__ = "workflow_runs"
    
    id = Column(Integer, primary_key=True)
    event_id = Column(Integer, ForeignKey("events.id"), nullable=False)
    workflow_type = Column(String, nullable=False)  # "pr_review"
    status = Column(String, default="running")  # "running", "completed", "failed"
    result = Column(JSON, nullable=True)  # {"verdict": "APPROVE", "files_analyzed": 10, ...}
    error = Column(Text, nullable=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    completed_at = Column(DateTime(timezone=True), nullable=True)

Async Patterns

Database Sessions

from app.core.database import async_session

async with async_session() as db:
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()

Neo4j Sessions

from app.core.neo4j_client import get_session

async with get_session() as session:
    result = await session.run(
        "MATCH (r:Repository {full_name: $fn}) RETURN r",
        fn=repo_full_name,
    )
    record = await result.single()

Parallel Operations

# Run multiple async operations concurrently
issue_details, conflicts, candidates = await asyncio.gather(
    _fetch_issue_details(owner, repo, issue_refs),
    _get_open_pr_conflicts(owner, repo, pr_number, file_paths),
    _find_candidate_issues(owner, repo, pr_title, pr_body, file_paths),
    return_exceptions=True,
)

Error Handling

Graceful Fallbacks

All optional integrations fail gracefully:
try:
    errors = await mcp_client.get_sentry_errors(project=repo, filename=path)
except Exception as exc:
    logger.warning(f"Sentry integration failed: {exc}")
    errors = []  # Continue without Sentry data

Webhook Processing

Webhook failures are logged but don’t block:
try:
    await pr_review_service.process_pr_review(payload, event, db)
except Exception as e:
    logger.error(f"PR review failed: {e}", exc_info=True)
    event.status = "failed"
    await db.flush()

Next Steps

Frontend Architecture

Next.js frontend structure

Review Flow

Complete PR review workflow

Parallel Agents

3 specialized agents mode

Local Development

Set up locally

Build docs developers (and LLMs) love