Skip to main content

Database Models

GAIA uses a hybrid database architecture with PostgreSQL for relational data and MongoDB for document storage.

Database Architecture

PostgreSQL

  • Use case: Structured data requiring ACID transactions
  • ORM: SQLAlchemy with async support
  • Connection: asyncpg driver
  • Features: Connection pooling, migrations, type safety

MongoDB

  • Use case: Flexible document storage, nested data structures
  • Driver: Motor (async) and PyMongo (sync)
  • Features: Lazy collection loading, schema flexibility

PostgreSQL Models

GAIA uses SQLAlchemy for PostgreSQL models with async support.

Database Connection

from app.db.postgresql import get_db_session, get_postgresql_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

# Get engine
engine: AsyncEngine = await get_postgresql_engine()

# Get session (context manager)
async with get_db_session() as session:
    result = await session.execute(select(User))
    users = result.scalars().all()

Connection Configuration

from sqlalchemy.ext.asyncio import create_async_engine
from app.config.settings import settings

# Convert PostgreSQL URL to async driver
postgres_url = settings.POSTGRES_URL
url = postgres_url.replace("postgresql://", "postgresql+asyncpg://")

engine = create_async_engine(
    url=url,
    future=True,
    pool_pre_ping=True,
    pool_size=5,
    max_overflow=10,
)

# Create tables
async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.create_all)

Base Model

from sqlalchemy.orm import declarative_base
from app.db.postgresql import Base

# All PostgreSQL models inherit from Base
Base = declarative_base()

Example Model Definition

from sqlalchemy import Column, String, DateTime, Integer, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from app.db.postgresql import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    name = Column(String, nullable=False)
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    updated_at = Column(DateTime, onupdate=lambda: datetime.now(timezone.utc))

    # Relationships
    conversations = relationship("Conversation", back_populates="user")

MongoDB Models

MongoDB collections are accessed through lazy-loaded collection objects.

Collection Access

from app.db.mongodb.collections import (
    users_collection,
    conversations_collection,
    goals_collection,
    notes_collection,
    todos_collection,
    workflows_collection,
)

# Collections are lazy-loaded on first access
user = await users_collection.find_one({"_id": ObjectId(user_id)})

Available Collections

GAIA provides the following MongoDB collections:
# Core user data
users_collection
team_collection
device_tokens_collection

# Conversations and messaging
conversations_collection
notifications_collection

# Productivity features
goals_collection
notes_collection
todos_collection
projects_collection
reminders_collection

# Workflows and automation
workflows_collection
workflow_executions_collection
processed_webhooks_collection

# Integrations
integrations_collection
user_integrations_collection

# Calendar and events
calendars_collection

# Files and media
files_collection

# Payment and subscriptions
payments_collection
subscriptions_collection
plans_collection
usage_snapshots_collection

# Other
mail_collection
blog_collection
search_urls_collection
support_collection
feedback_collection
waitlist_collection
ai_models_collection

Lazy Collection Loading

Collections are initialized only when accessed:
def _get_collection(collection_name: str):
    """Get async collection with lazy loading and caching."""
    if collection_name not in _collections_cache:
        logger.info(f"Creating async collection '{collection_name}' (lazy loading)")
        mongodb_instance = _get_mongodb_instance()
        _collections_cache[collection_name] = mongodb_instance.get_collection(
            collection_name
        )
    return _collections_cache[collection_name]

Pydantic Models

GAIA uses Pydantic for data validation and serialization.

User Models

from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime
from enum import Enum

class OnboardingPhase(str, Enum):
    """Tracks the current phase of user onboarding"""
    INITIAL = "initial"
    PERSONALIZATION_PENDING = "personalization_pending"
    PERSONALIZATION_COMPLETE = "personalization_complete"
    GETTING_STARTED = "getting_started"
    COMPLETED = "completed"

class OnboardingPreferences(BaseModel):
    profession: Optional[str] = Field(
        None,
        description="User's profession or main area of focus",
    )
    response_style: Optional[str] = Field(
        None,
        description="Preferred communication style",
    )
    custom_instructions: Optional[str] = Field(
        None,
        max_length=500,
        description="Custom instructions for the AI assistant"
    )

    @field_validator("profession")
    @classmethod
    def validate_profession(cls, v):
        if v is not None and v != "":
            v = v.strip()
            if not v:
                raise ValueError("Profession cannot be empty")
            if len(v) > 50:
                raise ValueError("Profession must be 50 characters or less")
            return v
        return None if v == "" else v

class OnboardingData(BaseModel):
    completed: bool = Field(
        default=False,
        description="Whether onboarding is completed"
    )
    completed_at: Optional[datetime] = Field(
        None,
        description="Timestamp when onboarding was completed"
    )
    phase: OnboardingPhase = Field(
        default=OnboardingPhase.INITIAL,
        description="Current onboarding phase"
    )
    preferences: Optional[OnboardingPreferences] = Field(
        None,
        description="User's onboarding preferences"
    )
    house: Optional[str] = Field(None, description="Assigned house name")
    personality_phrase: Optional[str] = Field(
        None,
        description="LLM-generated personality phrase"
    )
    user_bio: Optional[str] = Field(
        None,
        description="LLM-generated bio paragraph"
    )

Chat Models

from typing import List, Optional, Union
from pydantic import BaseModel
from enum import Enum

class ImageData(BaseModel):
    url: str
    prompt: str
    improved_prompt: Optional[str] = None

class ToolDataEntry(TypedDict):
    """Unified structure for tool execution data."""
    tool_name: str
    data: Union[dict, List, str, int, float, bool]
    timestamp: Optional[str]

class MessageModel(BaseModel):
    type: str
    response: str
    date: Optional[str] = None
    image_data: Optional[ImageData] = None
    disclaimer: Optional[str] = None
    subtype: Optional[str] = None
    file: Optional[bytes] = None
    filename: Optional[str] = None
    filetype: Optional[str] = None
    message_id: Optional[str] = None
    fileIds: Optional[List[str]] = []
    fileData: Optional[List[FileData]] = []
    selectedTool: Optional[str] = None
    toolCategory: Optional[str] = None
    selectedWorkflow: Optional[SelectedWorkflowData] = None
    tool_data: Optional[List[ToolDataEntry]] = None
    follow_up_actions: Optional[List[str]] = None
    metadata: Optional[dict] = None
    replyToMessage: Optional[ReplyToMessageData] = None

class SystemPurpose(str, Enum):
    EMAIL_PROCESSING = "email_processing"
    REMINDER_PROCESSING = "reminder_processing"
    WORKFLOW_EXECUTION = "workflow_execution"
    OTHER = "other"

class ConversationModel(BaseModel):
    conversation_id: str
    description: str = "New Chat"
    is_system_generated: Optional[bool] = False
    system_purpose: Optional[SystemPurpose] = None
    is_unread: Optional[bool] = False

MongoDB Document Structure

User Document Example

{
  "_id": ObjectId("..."),
  "email": "[email protected]",
  "name": "John Doe",
  "picture": "https://...",
  "timezone": "America/New_York",
  "selected_model": "gpt-4",
  "created_at": ISODate("2024-01-01T00:00:00Z"),
  "updated_at": ISODate("2024-01-15T12:00:00Z"),
  "onboarding": {
    "completed": true,
    "completed_at": ISODate("2024-01-02T10:00:00Z"),
    "phase": "completed",
    "preferences": {
      "profession": "Software Engineer",
      "response_style": "detailed",
      "custom_instructions": "Always provide code examples"
    },
    "house": "innovation",
    "personality_phrase": "Curious problem solver",
    "user_bio": "John is a software engineer..."
  }
}

Conversation Document Example

{
  "_id": ObjectId("..."),
  "conversation_id": "uuid-...",
  "user_id": "user-id",
  "description": "Project Planning Discussion",
  "is_system_generated": false,
  "system_purpose": null,
  "is_unread": false,
  "starred": false,
  "pinned": false,
  "created_at": ISODate("2024-01-15T10:00:00Z"),
  "updated_at": ISODate("2024-01-15T11:30:00Z"),
  "messages": [
    {
      "type": "user",
      "response": "Help me plan my project",
      "message_id": "msg-1",
      "date": "2024-01-15T10:00:00Z"
    },
    {
      "type": "bot",
      "response": "I'd be happy to help...",
      "message_id": "msg-2",
      "date": "2024-01-15T10:00:30Z",
      "tool_data": [
        {
          "tool_name": "create_project",
          "data": {"project_id": "proj-123"},
          "timestamp": "2024-01-15T10:00:30Z"
        }
      ]
    }
  ]
}

Working with MongoDB

CRUD Operations

from app.db.mongodb.collections import users_collection
from bson import ObjectId
from datetime import datetime, timezone

# Create
user_data = {
    "email": "[email protected]",
    "name": "John Doe",
    "created_at": datetime.now(timezone.utc)
}
result = await users_collection.insert_one(user_data)
user_id = str(result.inserted_id)

# Read
user = await users_collection.find_one({"_id": ObjectId(user_id)})
if user:
    user["_id"] = str(user["_id"])  # Convert ObjectId to string

# Update
await users_collection.update_one(
    {"_id": ObjectId(user_id)},
    {"$set": {"name": "Jane Doe", "updated_at": datetime.now(timezone.utc)}}
)

# Delete
await users_collection.delete_one({"_id": ObjectId(user_id)})

# Find multiple
users = await users_collection.find({"profession": "Engineer"}).to_list(length=100)

Aggregation Pipeline

from app.db.mongodb.collections import conversations_collection

pipeline = [
    {"$match": {"user_id": user_id}},
    {"$sort": {"updated_at": -1}},
    {"$limit": 10},
    {"$project": {
        "conversation_id": 1,
        "description": 1,
        "updated_at": 1,
        "message_count": {"$size": "$messages"}
    }}
]

recent_conversations = await conversations_collection.aggregate(pipeline).to_list(None)

Synchronous MongoDB Access

For synchronous code (e.g., Composio tools):
from app.db.mongodb.collections import get_sync_collection

# Get synchronous collection
users_collection = get_sync_collection("users")

# Use standard PyMongo operations
user = users_collection.find_one({"email": "[email protected]"})
users_collection.update_one(
    {"_id": user["_id"]},
    {"$set": {"last_login": datetime.now(timezone.utc)}}
)

Best Practices

  1. Always convert ObjectId to string when returning data to API
  2. Use timezone-aware datetimes with datetime.now(timezone.utc)
  3. Index frequently queried fields for performance
  4. Use projection to limit returned fields
  5. Validate data with Pydantic models before database operations
  6. Handle ObjectId conversions in service layer, not endpoints
  7. Use aggregation pipelines for complex queries
  8. Implement proper error handling for database operations
  9. Use connection pooling for PostgreSQL
  10. Leverage lazy loading for MongoDB collections

Database Relationships

User → Conversations (1:N)

# MongoDB approach (embedded)
conversation = {
    "user_id": "user-123",
    "messages": [...]
}

# Query
conversations = await conversations_collection.find(
    {"user_id": user_id}
).to_list(None)

User → Goals (1:N)

# Query user's goals
goals = await goals_collection.find(
    {"user_id": user_id, "status": "active"}
).sort("created_at", -1).to_list(None)

Migration Strategy

For schema changes in MongoDB:
  1. Add migration script in app/db/migrations/
  2. Use versioning in documents when needed
  3. Handle backwards compatibility in code
  4. Test with production-like data
Example migration:
from app.db.mongodb.collections import users_collection

async def migrate_add_timezone_field():
    """Add timezone field to all users."""
    result = await users_collection.update_many(
        {"timezone": {"$exists": False}},
        {"$set": {"timezone": "UTC"}}
    )
    print(f"Updated {result.modified_count} users")

Build docs developers (and LLMs) love