Skip to main content

Database Technology

Nectr uses PostgreSQL with async SQLAlchemy for all relational data.
  • Driver: asyncpg (pure-async PostgreSQL driver)
  • ORM: SQLAlchemy 2.0 (async session API)
  • Migrations: Alembic
  • Connection Pool: Managed by SQLAlchemy

Table Overview

users

  ├── installations (1:N)
  │       │
  │       └── events (1:N)
  │               │
  │               └── workflow_runs (1:N)

  └── oauth_states (1:N)

Users

Table: users
Model: app/models/user.py
Stores GitHub OAuth users who have logged into Nectr.
ColumnTypeConstraintsDescription
idIntegerPrimary KeyAuto-increment user ID
github_idIntegerUnique, Not Null, IndexedGitHub user ID (from OAuth)
github_usernameStringNot NullGitHub username (e.g., alice)
github_access_tokenStringNot NullEncrypted GitHub OAuth token
emailStringNullableUser’s public GitHub email
avatar_urlStringNullableGitHub avatar URL
nameStringNullableUser’s display name
created_atDateTime(timezone=True)Default now()When user first logged in
updated_atDateTime(timezone=True)Default now(), On UpdateLast profile update
Token Encryption: github_access_token is encrypted using Fernet (AES-128-CBC) before storing. The encryption key is SECRET_KEY from environment variables.
# app/models/user.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from app.core.database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    github_id = Column(Integer, unique=True, nullable=False, index=True)
    github_username = Column(String, nullable=False)
    github_access_token = Column(String, nullable=False)  # Encrypted
    email = Column(String, nullable=True)
    avatar_url = Column(String, nullable=True)
    name = Column(String, nullable=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

Installations

Table: installations
Model: app/models/installation.py
Tracks connected repositories (repos that have webhook installed).
ColumnTypeConstraintsDescription
idIntegerPrimary KeyAuto-increment ID
user_idIntegerForeign Key (users.id), IndexedOwner of this installation
repo_full_nameStringNot Null, IndexedRepo name (e.g., owner/repo)
github_repo_idIntegerNullableGitHub repo ID
webhook_idIntegerNullableGitHub webhook ID
webhook_secretStringNullablePer-repo HMAC secret
installation_idIntegerNullableGitHub App installation ID (future)
is_activeBooleanNot Null, Default TrueWhether webhook is active
installed_atDateTime(timezone=True)Default now()When repo was connected
Per-Repo Webhook Secrets: Each installation has its own webhook_secret for HMAC-SHA256 signature verification. This is more secure than a global secret.
# app/models/installation.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from app.core.database import Base

class Installation(Base):
    __tablename__ = "installations"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
    repo_full_name = Column(String, nullable=False, index=True)
    github_repo_id = Column(Integer, nullable=True)
    webhook_id = Column(Integer, nullable=True)
    webhook_secret = Column(String, nullable=True)
    installation_id = Column(Integer, nullable=True)
    is_active = Column(Boolean, default=True, nullable=False)
    installed_at = Column(DateTime(timezone=True), server_default=func.now())

Events

Table: events
Model: app/models/event.py
Records incoming webhook events from GitHub.
ColumnTypeConstraintsDescription
idIntegerPrimary KeyAuto-increment event ID
event_typeString(50)Not NullGitHub event type (e.g., pull_request)
sourceString(50)Not NullEvent source (always github)
payloadTextNot NullJSON-serialized webhook payload
statusString(20)Not Nullpendingcompleted or failed
created_atDateTimeDefault now()When event was received
processed_atDateTimeNullableWhen background processing finished
# app/models/event.py
from sqlalchemy import Column, String, DateTime, Text, Integer
from sqlalchemy.sql import func
from app.core.database import Base

class Event(Base):
    __tablename__ = "events"
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    event_type = Column(String(50), nullable=False)
    source = Column(String(50), nullable=False)
    payload = Column(Text, nullable=False)  # JSON string
    status = Column(String(20), nullable=False)  # pending, completed, failed
    created_at = Column(DateTime, server_default=func.now())
    processed_at = Column(DateTime, nullable=True)
  1. pending - Event received, not yet processed
  2. completed - Background task finished successfully
  3. failed - Background task encountered an error

Workflow Runs

Table: workflow_runs
Model: app/models/workflow.py
Tracks execution of background workflows (PR reviews, error triage, etc.).
ColumnTypeConstraintsDescription
idIntegerPrimary KeyAuto-increment ID
event_idIntegerForeign Key (events.id), Not NullParent event
workflow_typeString(50)Not NullWorkflow type (e.g., pr_review)
statusString(20)Default runningrunningcompleted or failed
resultTextNullableJSON-serialized workflow result
errorTextNullableError message if failed
started_atDateTimeDefault now()When workflow started
completed_atDateTimeNullableWhen workflow finished
# app/models/workflow.py
from sqlalchemy import Column, String, Text, DateTime, Integer, ForeignKey
from sqlalchemy.sql import func
from app.core.database import Base

class WorkflowRun(Base):
    __tablename__ = "workflow_runs"
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    event_id = Column(Integer, ForeignKey("events.id"), nullable=False)
    workflow_type = Column(String(50), nullable=False)  # pr_review, error_triage, etc.
    status = Column(String(20), default="running")      # running, completed, failed
    result = Column(Text, nullable=True)                # JSON result
    error = Column(Text, nullable=True)                 # Error message
    started_at = Column(DateTime, server_default=func.now())
    completed_at = Column(DateTime, nullable=True)
  • pr_review - AI-powered PR review
  • error_triage - Sentry error classification (future)
  • ticket_sync - Linear ticket updates (future)
{
  "ai_summary": "## Summary\n...",
  "files_analyzed": 12,
  "comment_posted": true,
  "verdict": "APPROVE",
  "inline_suggestions": 3,
  "linked_issues": [42, 58],
  "related_prs": 2,
  "semantic_issue_matches": [67]
}

OAuth States

Table: oauth_states
Model: app/models/oauth_state.py
Stores CSRF state tokens for GitHub OAuth flow.
ColumnTypeConstraintsDescription
idIntegerPrimary KeyAuto-increment ID
stateStringUnique, Not Null, IndexedRandom state token
user_idIntegerForeign Key (users.id), NullableUser who initiated OAuth (nullable for new users)
created_atDateTime(timezone=True)Default now()When state was created
expires_atDateTime(timezone=True)Not NullState expiry (5 minutes)
usedBooleanDefault FalseWhether state was consumed
CSRF Protection: State tokens prevent CSRF attacks by ensuring the OAuth callback is for a session we initiated. States expire after 5 minutes and can only be used once.

Database Initialization

File: app/core/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from app.core.config import settings

engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_size=10,
    max_overflow=20,
)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

Base = declarative_base()

async def get_db() -> AsyncSession:
    """FastAPI dependency for database sessions."""
    async with async_session() as session:
        yield session

Migrations (Alembic)

Directory: alembic/versions/ Nectr uses Alembic for schema migrations.
  1. Generate migration:
    alembic revision --autogenerate -m "Add github_repo_id to installations"
    
  2. Apply migration:
    alembic upgrade head
    
  3. Automatic on startup:
    # app/main.py:92
    def _run_migrations():
        from alembic.config import Config
        from alembic import command
        alembic_cfg = Config("alembic.ini")
        command.upgrade(alembic_cfg, "head")
    
    await asyncio.to_thread(_run_migrations)
    
# alembic/versions/a1b2c3d4e5f6_add_installation_id.py
from alembic import op
import sqlalchemy as sa

revision = 'a1b2c3d4e5f6'
down_revision = 'e83f4b0f5bf4'

def upgrade():
    op.add_column('installations', sa.Column('installation_id', sa.Integer(), nullable=True))
    op.add_column('installations', sa.Column('github_repo_id', sa.Integer(), nullable=True))

def downgrade():
    op.drop_column('installations', 'github_repo_id')
    op.drop_column('installations', 'installation_id')

Connection Pooling

SQLAlchemy manages a connection pool to handle concurrent requests:
  • Pool size: 10 connections
  • Max overflow: 20 connections (30 total)
  • Pool recycle: 1 hour (prevents stale connections)
  • Pool timeout: 30 seconds
Railway (hosting provider) free tier supports up to 50 concurrent connections. The pool configuration keeps us well under that limit.

Query Patterns

from sqlalchemy import select
from app.models.user import User

async with async_session() as db:
    result = await db.execute(
        select(User).where(User.github_id == github_id)
    )
    user = result.scalar_one_or_none()
from app.models.installation import Installation

async with async_session() as db:
    installation = Installation(
        user_id=user.id,
        repo_full_name=f"{owner}/{repo}",
        webhook_id=webhook_id,
        webhook_secret=webhook_secret,
        is_active=True,
    )
    db.add(installation)
    await db.commit()
    await db.refresh(installation)
from sqlalchemy import select, desc
from app.models.workflow import WorkflowRun

async with async_session() as db:
    result = await db.execute(
        select(WorkflowRun)
        .where(WorkflowRun.workflow_type == "pr_review")
        .order_by(desc(WorkflowRun.started_at))
        .limit(20)
    )
    runs = result.scalars().all()

Data Retention

Currently, Nectr stores all events and workflow runs indefinitely. Future roadmap includes:
  • Archive old events (> 90 days) to cold storage
  • Delete failed events (> 30 days)
  • Compress payloads for storage efficiency

Next Steps

Neo4j Graph

Learn about the knowledge graph schema

Backend Architecture

Explore FastAPI routes and middleware

Build docs developers (and LLMs) love