Skip to main content

Services Layer

GAIA’s service layer encapsulates business logic and provides a clean interface between API endpoints and data access.

Service Architecture

Services follow these principles:
  1. Single Responsibility: Each service handles one domain
  2. Reusability: Services are used by endpoints, workers, and agents
  3. Testability: Pure functions with minimal side effects
  4. Type Safety: Full type hints with return types
  5. Error Handling: Consistent exception handling

Service Organization

app/services/
├── user_service.py
├── chat_service.py
├── conversation_service.py
├── memory_service.py
├── goals_service.py
├── calendar_service.py
├── reminder_service.py
├── notification_service.py
├── file_service.py
├── upload_service.py
├── search_service.py
├── analytics_service.py
├── usage_service.py
└── onboarding/
    └── onboarding_service.py

User Service

Handles user profile management and authentication.

Example Implementation

from datetime import datetime, timezone
from typing import Optional
from app.db.mongodb.collections import users_collection
from app.utils.oauth_utils import upload_user_picture
from bson import ObjectId
from fastapi import HTTPException

async def get_user_by_id(user_id: str) -> Optional[dict]:
    """Get user by ID from database."""
    try:
        user = await users_collection.find_one({"_id": ObjectId(user_id)})
        if user:
            user["_id"] = str(user["_id"])
        return user
    except Exception as e:
        logger.error(f"Error fetching user {user_id}: {e}")
        raise HTTPException(status_code=404, detail="User not found")

async def get_user_by_email(email: str) -> Optional[dict]:
    """Get user by email from database."""
    try:
        user = await users_collection.find_one({"email": email})
        if user:
            user["_id"] = str(user["_id"])
        return user
    except Exception as e:
        logger.error(f"Error fetching user by email {email}: {e}")
        raise HTTPException(status_code=404, detail="User not found")

async def update_user_profile(
    user_id: str,
    name: Optional[str] = None,
    picture_data: Optional[bytes] = None,
    data: Optional[dict] = None,
) -> dict:
    """
    Update user profile information.

    Args:
        user_id: User ID
        name: New name (optional)
        picture_data: New profile picture data (optional)
        data: Additional fields to update (optional)

    Returns:
        Updated user data
    """
    try:
        user = await users_collection.find_one({"_id": ObjectId(user_id)})
        if not user:
            raise HTTPException(status_code=404, detail="User not found")

        update_data: dict = (
            {"updated_at": datetime.now(timezone.utc), **data}
            if data
            else {"updated_at": datetime.now(timezone.utc)}
        )

        # Update name if provided
        if name is not None and name.strip():
            update_data["name"] = name.strip()

        # Update picture if provided
        if picture_data:
            try:
                # Generate public_id for Cloudinary
                user_email = user.get("email", "")
                public_id = (
                    f"user_{user_email.replace('@', '_at_').replace('.', '_dot_')}"
                )

                # Upload to Cloudinary
                picture_url = await upload_user_picture(picture_data, public_id)
                update_data["picture"] = picture_url

            except Exception as e:
                logger.error(f"Error uploading profile picture: {e}")
                raise HTTPException(
                    status_code=500, detail="Failed to upload profile picture"
                )

        # Update database
        await users_collection.update_one(
            {"_id": ObjectId(user_id)}, {"$set": update_data}
        )

        # Fetch and return updated user
        updated_user = await get_user_by_id(user_id)

        if not updated_user:
            raise HTTPException(status_code=404, detail="User not found after update")

        return {
            "user_id": updated_user["_id"],
            "name": updated_user.get("name"),
            "email": updated_user.get("email"),
            "picture": updated_user.get("picture"),
            "updated_at": updated_user.get("updated_at"),
            "selected_model": updated_user.get("selected_model"),
        }

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error updating user profile: {e}")
        raise HTTPException(status_code=500, detail="Failed to update profile")

Chat Service

Handles chat streaming with background execution and Redis pub/sub.

Background Streaming Pattern

from app.services.chat_service import run_chat_stream_background
from app.core.stream_manager import stream_manager
from uuid import uuid4

async def run_chat_stream_background(
    stream_id: str,
    body: MessageRequestWithHistory,
    user: dict,
    user_time: datetime,
    conversation_id: str,
) -> None:
    """
    Run chat streaming in background, publishing chunks to Redis.

    This function runs independently of the HTTP request lifecycle.
    Progress is saved to MongoDB on completion, even if client disconnects.

    Args:
        stream_id: Unique stream identifier for Redis pub/sub
        body: Message request with history
        user: User information dict
        user_time: User's local time
        conversation_id: Conversation ID (may be new or existing)
    """
    complete_message = ""
    tool_data: Dict[str, Any] = {"tool_data": []}
    tool_outputs: Dict[str, str] = {}
    user_message_id = str(uuid4())
    bot_message_id = str(uuid4())
    is_new_conversation = body.conversation_id is None
    usage_metadata: Dict[str, Any] = {}
    follow_up_actions: List[str] = []

    try:
        # Get user model config
        user_id = user.get("user_id")
        user_model_config = None
        if user_id:
            try:
                user_model_config = await get_user_selected_model(user_id)
            except Exception as e:
                logger.warning(f"Could not get user's selected model: {e}")

        usage_metadata_callback = UsageMetadataCallbackHandler()

        # Call agent and stream response
        async for chunk in call_agent(
            user=user,
            user_time=user_time,
            conversation_id=conversation_id,
            message_request=body,
            user_model_config=user_model_config,
            callbacks=[usage_metadata_callback],
        ):
            # Process and publish chunk
            await stream_manager.publish(stream_id, chunk)

            # Accumulate message
            if chunk.get("type") == "text":
                complete_message += chunk.get("content", "")

        # Save conversation to database
        await save_conversation(
            conversation_id=conversation_id,
            user_message=body.messages[-1],
            bot_message=complete_message,
            user_message_id=user_message_id,
            bot_message_id=bot_message_id,
            tool_data=tool_data,
        )

    except Exception as e:
        logger.error(f"Error in background chat stream: {e}")
        await stream_manager.publish(
            stream_id,
            {"type": "error", "message": str(e)}
        )
    finally:
        await stream_manager.publish(stream_id, {"type": "done"})

Conversation Service

Manages conversation storage and retrieval.

Message Updates

from app.services.conversation_service import update_messages
from app.db.mongodb.collections import conversations_collection

async def update_messages(
    conversation_id: str,
    messages: List[MessageModel],
    user_id: str
) -> dict:
    """
    Update messages in a conversation.

    Args:
        conversation_id: Conversation ID
        messages: List of message models
        user_id: User ID for authorization

    Returns:
        Updated conversation data
    """
    # Verify ownership
    conversation = await conversations_collection.find_one({
        "conversation_id": conversation_id,
        "user_id": user_id
    })

    if not conversation:
        raise HTTPException(status_code=404, detail="Conversation not found")

    # Convert Pydantic models to dict
    message_dicts = [msg.model_dump() for msg in messages]

    # Update conversation
    await conversations_collection.update_one(
        {"conversation_id": conversation_id},
        {
            "$set": {
                "messages": message_dicts,
                "updated_at": datetime.now(timezone.utc)
            }
        }
    )

    return {"success": True, "conversation_id": conversation_id}

Memory Service

Handles long-term memory storage using ChromaDB.

Store Memories

from app.services.memory_service import store_memories_batch
from app.db.chroma import get_chroma_collection

async def store_memories_batch(
    user_id: str,
    memories: List[dict]
) -> dict:
    """
    Store multiple memories for a user.

    Args:
        user_id: User ID
        memories: List of memory objects with 'text' and 'metadata'

    Returns:
        Result with count of stored memories
    """
    collection = await get_chroma_collection("user_memories")

    documents = [memory["text"] for memory in memories]
    metadatas = [
        {**memory.get("metadata", {}), "user_id": user_id}
        for memory in memories
    ]
    ids = [f"{user_id}_{i}_{datetime.now().timestamp()}" for i in range(len(memories))]

    await collection.add(
        documents=documents,
        metadatas=metadatas,
        ids=ids
    )

    return {"stored": len(memories), "user_id": user_id}

File Service

Manages file uploads and retrieval.

File Operations

from app.services.file_service import get_files
from app.db.mongodb.collections import files_collection

async def get_files(file_ids: List[str], user_id: str) -> List[dict]:
    """
    Get files by IDs for a specific user.

    Args:
        file_ids: List of file IDs
        user_id: User ID for authorization

    Returns:
        List of file documents
    """
    files = await files_collection.find({
        "file_id": {"$in": file_ids},
        "user_id": user_id
    }).to_list(None)

    # Convert ObjectId to string
    for file in files:
        file["_id"] = str(file["_id"])

    return files

Service Composition

Services can compose other services:
from app.services.user_service import get_user_by_id
from app.services.usage_service import track_usage
from app.services.notification_service import send_notification

async def process_chat_completion(
    user_id: str,
    tokens_used: int,
    message: str
) -> dict:
    """
    Process chat completion with usage tracking and notifications.
    """
    # Get user
    user = await get_user_by_id(user_id)

    # Track usage
    usage = await track_usage(
        user_id=user_id,
        feature="chat",
        tokens=tokens_used
    )

    # Send notification if quota exceeded
    if usage["quota_exceeded"]:
        await send_notification(
            user_id=user_id,
            title="Usage Limit Reached",
            message="You've reached your chat usage limit."
        )

    return {
        "user": user,
        "usage": usage,
        "message": message
    }

Testing Services

Services are designed for easy testing:
import pytest
from app.services.user_service import get_user_by_id

@pytest.mark.asyncio
async def test_get_user_by_id(mock_db):
    """Test user retrieval by ID."""
    # Arrange
    user_id = "test-user-id"
    expected_user = {
        "_id": user_id,
        "email": "[email protected]",
        "name": "Test User"
    }
    mock_db.users.find_one.return_value = expected_user

    # Act
    user = await get_user_by_id(user_id)

    # Assert
    assert user["_id"] == user_id
    assert user["email"] == expected_user["email"]
    mock_db.users.find_one.assert_called_once_with({"_id": user_id})

Background Tasks

Services integrate with FastAPI background tasks:
from fastapi import BackgroundTasks
from app.services.analytics_service import track_event

@router.post("/chat")
async def chat_endpoint(
    background_tasks: BackgroundTasks,
    user: dict = Depends(get_current_user)
):
    """Chat endpoint with background analytics."""
    # Main response
    response = await process_chat(...)

    # Background analytics tracking
    background_tasks.add_task(
        track_event,
        user_id=user["user_id"],
        event="chat_message",
        properties={"model": response["model"]}
    )

    return response

Service Best Practices

1. Type Hints

from typing import Optional, List, Dict, Any

async def get_user_goals(
    user_id: str,
    status: Optional[str] = None,
    limit: int = 10
) -> List[Dict[str, Any]]:
    """Get user goals with optional filtering."""
    pass

2. Error Handling

from fastapi import HTTPException
from app.config.loggers import app_logger as logger

async def delete_user(user_id: str) -> dict:
    """Delete user with proper error handling."""
    try:
        result = await users_collection.delete_one({"_id": ObjectId(user_id)})

        if result.deleted_count == 0:
            raise HTTPException(status_code=404, detail="User not found")

        return {"success": True, "user_id": user_id}

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error deleting user {user_id}: {e}")
        raise HTTPException(status_code=500, detail="Failed to delete user")

3. Transaction Safety

from app.db.postgresql import get_db_session

async def create_user_with_profile(
    email: str,
    name: str,
    profile_data: dict
) -> dict:
    """Create user and profile in transaction."""
    async with get_db_session() as session:
        async with session.begin():
            # Create user
            user = User(email=email, name=name)
            session.add(user)
            await session.flush()  # Get user.id

            # Create profile
            profile = Profile(user_id=user.id, **profile_data)
            session.add(profile)

            # Commit happens automatically on context exit

        return {"user_id": user.id, "profile_id": profile.id}

4. Logging

from app.config.loggers import app_logger as logger

async def process_payment(user_id: str, amount: float) -> dict:
    """Process payment with detailed logging."""
    logger.info(f"Processing payment for user {user_id}: ${amount}")

    try:
        result = await stripe.charge(amount)
        logger.info(f"Payment successful: {result['id']}")
        return result
    except Exception as e:
        logger.error(f"Payment failed for user {user_id}: {e}")
        raise

5. Caching

from functools import lru_cache
from app.db.redis import redis_client

async def get_user_with_cache(user_id: str) -> dict:
    """Get user with Redis caching."""
    cache_key = f"user:{user_id}"

    # Try cache first
    cached = await redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # Fetch from database
    user = await get_user_by_id(user_id)

    # Store in cache (1 hour TTL)
    await redis_client.setex(
        cache_key,
        3600,
        json.dumps(user)
    )

    return user

Service Documentation

Document services with comprehensive docstrings:
async def update_user_preferences(
    user_id: str,
    preferences: dict
) -> dict:
    """
    Update user preferences.

    Args:
        user_id: User ID
        preferences: Dictionary of preference key-value pairs

    Returns:
        Updated preferences dictionary

    Raises:
        HTTPException: 404 if user not found, 500 on update failure

    Example:
        >>> await update_user_preferences(
        ...     "user-123",
        ...     {"theme": "dark", "notifications": True}
        ... )
        {"theme": "dark", "notifications": True}
    """
    pass

Build docs developers (and LLMs) love