Services Layer
GAIA’s service layer encapsulates business logic and provides a clean interface between API endpoints and data access.Service Architecture
Services follow these principles:- Single Responsibility: Each service handles one domain
- Reusability: Services are used by endpoints, workers, and agents
- Testability: Pure functions with minimal side effects
- Type Safety: Full type hints with return types
- Error Handling: Consistent exception handling
Service Organization
app/services/
├── user_service.py
├── chat_service.py
├── conversation_service.py
├── memory_service.py
├── goals_service.py
├── calendar_service.py
├── reminder_service.py
├── notification_service.py
├── file_service.py
├── upload_service.py
├── search_service.py
├── analytics_service.py
├── usage_service.py
└── onboarding/
└── onboarding_service.py
User Service
Handles user profile management and authentication.Example Implementation
from datetime import datetime, timezone
from typing import Optional
from app.db.mongodb.collections import users_collection
from app.utils.oauth_utils import upload_user_picture
from bson import ObjectId
from fastapi import HTTPException
async def get_user_by_id(user_id: str) -> Optional[dict]:
"""Get user by ID from database."""
try:
user = await users_collection.find_one({"_id": ObjectId(user_id)})
if user:
user["_id"] = str(user["_id"])
return user
except Exception as e:
logger.error(f"Error fetching user {user_id}: {e}")
raise HTTPException(status_code=404, detail="User not found")
async def get_user_by_email(email: str) -> Optional[dict]:
"""Get user by email from database."""
try:
user = await users_collection.find_one({"email": email})
if user:
user["_id"] = str(user["_id"])
return user
except Exception as e:
logger.error(f"Error fetching user by email {email}: {e}")
raise HTTPException(status_code=404, detail="User not found")
async def update_user_profile(
user_id: str,
name: Optional[str] = None,
picture_data: Optional[bytes] = None,
data: Optional[dict] = None,
) -> dict:
"""
Update user profile information.
Args:
user_id: User ID
name: New name (optional)
picture_data: New profile picture data (optional)
data: Additional fields to update (optional)
Returns:
Updated user data
"""
try:
user = await users_collection.find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(status_code=404, detail="User not found")
update_data: dict = (
{"updated_at": datetime.now(timezone.utc), **data}
if data
else {"updated_at": datetime.now(timezone.utc)}
)
# Update name if provided
if name is not None and name.strip():
update_data["name"] = name.strip()
# Update picture if provided
if picture_data:
try:
# Generate public_id for Cloudinary
user_email = user.get("email", "")
public_id = (
f"user_{user_email.replace('@', '_at_').replace('.', '_dot_')}"
)
# Upload to Cloudinary
picture_url = await upload_user_picture(picture_data, public_id)
update_data["picture"] = picture_url
except Exception as e:
logger.error(f"Error uploading profile picture: {e}")
raise HTTPException(
status_code=500, detail="Failed to upload profile picture"
)
# Update database
await users_collection.update_one(
{"_id": ObjectId(user_id)}, {"$set": update_data}
)
# Fetch and return updated user
updated_user = await get_user_by_id(user_id)
if not updated_user:
raise HTTPException(status_code=404, detail="User not found after update")
return {
"user_id": updated_user["_id"],
"name": updated_user.get("name"),
"email": updated_user.get("email"),
"picture": updated_user.get("picture"),
"updated_at": updated_user.get("updated_at"),
"selected_model": updated_user.get("selected_model"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating user profile: {e}")
raise HTTPException(status_code=500, detail="Failed to update profile")
Chat Service
Handles chat streaming with background execution and Redis pub/sub.Background Streaming Pattern
from app.services.chat_service import run_chat_stream_background
from app.core.stream_manager import stream_manager
from uuid import uuid4
async def run_chat_stream_background(
stream_id: str,
body: MessageRequestWithHistory,
user: dict,
user_time: datetime,
conversation_id: str,
) -> None:
"""
Run chat streaming in background, publishing chunks to Redis.
This function runs independently of the HTTP request lifecycle.
Progress is saved to MongoDB on completion, even if client disconnects.
Args:
stream_id: Unique stream identifier for Redis pub/sub
body: Message request with history
user: User information dict
user_time: User's local time
conversation_id: Conversation ID (may be new or existing)
"""
complete_message = ""
tool_data: Dict[str, Any] = {"tool_data": []}
tool_outputs: Dict[str, str] = {}
user_message_id = str(uuid4())
bot_message_id = str(uuid4())
is_new_conversation = body.conversation_id is None
usage_metadata: Dict[str, Any] = {}
follow_up_actions: List[str] = []
try:
# Get user model config
user_id = user.get("user_id")
user_model_config = None
if user_id:
try:
user_model_config = await get_user_selected_model(user_id)
except Exception as e:
logger.warning(f"Could not get user's selected model: {e}")
usage_metadata_callback = UsageMetadataCallbackHandler()
# Call agent and stream response
async for chunk in call_agent(
user=user,
user_time=user_time,
conversation_id=conversation_id,
message_request=body,
user_model_config=user_model_config,
callbacks=[usage_metadata_callback],
):
# Process and publish chunk
await stream_manager.publish(stream_id, chunk)
# Accumulate message
if chunk.get("type") == "text":
complete_message += chunk.get("content", "")
# Save conversation to database
await save_conversation(
conversation_id=conversation_id,
user_message=body.messages[-1],
bot_message=complete_message,
user_message_id=user_message_id,
bot_message_id=bot_message_id,
tool_data=tool_data,
)
except Exception as e:
logger.error(f"Error in background chat stream: {e}")
await stream_manager.publish(
stream_id,
{"type": "error", "message": str(e)}
)
finally:
await stream_manager.publish(stream_id, {"type": "done"})
Conversation Service
Manages conversation storage and retrieval.Message Updates
from app.services.conversation_service import update_messages
from app.db.mongodb.collections import conversations_collection
async def update_messages(
conversation_id: str,
messages: List[MessageModel],
user_id: str
) -> dict:
"""
Update messages in a conversation.
Args:
conversation_id: Conversation ID
messages: List of message models
user_id: User ID for authorization
Returns:
Updated conversation data
"""
# Verify ownership
conversation = await conversations_collection.find_one({
"conversation_id": conversation_id,
"user_id": user_id
})
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
# Convert Pydantic models to dict
message_dicts = [msg.model_dump() for msg in messages]
# Update conversation
await conversations_collection.update_one(
{"conversation_id": conversation_id},
{
"$set": {
"messages": message_dicts,
"updated_at": datetime.now(timezone.utc)
}
}
)
return {"success": True, "conversation_id": conversation_id}
Memory Service
Handles long-term memory storage using ChromaDB.Store Memories
from app.services.memory_service import store_memories_batch
from app.db.chroma import get_chroma_collection
async def store_memories_batch(
user_id: str,
memories: List[dict]
) -> dict:
"""
Store multiple memories for a user.
Args:
user_id: User ID
memories: List of memory objects with 'text' and 'metadata'
Returns:
Result with count of stored memories
"""
collection = await get_chroma_collection("user_memories")
documents = [memory["text"] for memory in memories]
metadatas = [
{**memory.get("metadata", {}), "user_id": user_id}
for memory in memories
]
ids = [f"{user_id}_{i}_{datetime.now().timestamp()}" for i in range(len(memories))]
await collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
return {"stored": len(memories), "user_id": user_id}
File Service
Manages file uploads and retrieval.File Operations
from app.services.file_service import get_files
from app.db.mongodb.collections import files_collection
async def get_files(file_ids: List[str], user_id: str) -> List[dict]:
"""
Get files by IDs for a specific user.
Args:
file_ids: List of file IDs
user_id: User ID for authorization
Returns:
List of file documents
"""
files = await files_collection.find({
"file_id": {"$in": file_ids},
"user_id": user_id
}).to_list(None)
# Convert ObjectId to string
for file in files:
file["_id"] = str(file["_id"])
return files
Service Composition
Services can compose other services:from app.services.user_service import get_user_by_id
from app.services.usage_service import track_usage
from app.services.notification_service import send_notification
async def process_chat_completion(
user_id: str,
tokens_used: int,
message: str
) -> dict:
"""
Process chat completion with usage tracking and notifications.
"""
# Get user
user = await get_user_by_id(user_id)
# Track usage
usage = await track_usage(
user_id=user_id,
feature="chat",
tokens=tokens_used
)
# Send notification if quota exceeded
if usage["quota_exceeded"]:
await send_notification(
user_id=user_id,
title="Usage Limit Reached",
message="You've reached your chat usage limit."
)
return {
"user": user,
"usage": usage,
"message": message
}
Testing Services
Services are designed for easy testing:import pytest
from app.services.user_service import get_user_by_id
@pytest.mark.asyncio
async def test_get_user_by_id(mock_db):
"""Test user retrieval by ID."""
# Arrange
user_id = "test-user-id"
expected_user = {
"_id": user_id,
"email": "[email protected]",
"name": "Test User"
}
mock_db.users.find_one.return_value = expected_user
# Act
user = await get_user_by_id(user_id)
# Assert
assert user["_id"] == user_id
assert user["email"] == expected_user["email"]
mock_db.users.find_one.assert_called_once_with({"_id": user_id})
Background Tasks
Services integrate with FastAPI background tasks:from fastapi import BackgroundTasks
from app.services.analytics_service import track_event
@router.post("/chat")
async def chat_endpoint(
background_tasks: BackgroundTasks,
user: dict = Depends(get_current_user)
):
"""Chat endpoint with background analytics."""
# Main response
response = await process_chat(...)
# Background analytics tracking
background_tasks.add_task(
track_event,
user_id=user["user_id"],
event="chat_message",
properties={"model": response["model"]}
)
return response
Service Best Practices
1. Type Hints
from typing import Optional, List, Dict, Any
async def get_user_goals(
user_id: str,
status: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get user goals with optional filtering."""
pass
2. Error Handling
from fastapi import HTTPException
from app.config.loggers import app_logger as logger
async def delete_user(user_id: str) -> dict:
"""Delete user with proper error handling."""
try:
result = await users_collection.delete_one({"_id": ObjectId(user_id)})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"success": True, "user_id": user_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting user {user_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to delete user")
3. Transaction Safety
from app.db.postgresql import get_db_session
async def create_user_with_profile(
email: str,
name: str,
profile_data: dict
) -> dict:
"""Create user and profile in transaction."""
async with get_db_session() as session:
async with session.begin():
# Create user
user = User(email=email, name=name)
session.add(user)
await session.flush() # Get user.id
# Create profile
profile = Profile(user_id=user.id, **profile_data)
session.add(profile)
# Commit happens automatically on context exit
return {"user_id": user.id, "profile_id": profile.id}
4. Logging
from app.config.loggers import app_logger as logger
async def process_payment(user_id: str, amount: float) -> dict:
"""Process payment with detailed logging."""
logger.info(f"Processing payment for user {user_id}: ${amount}")
try:
result = await stripe.charge(amount)
logger.info(f"Payment successful: {result['id']}")
return result
except Exception as e:
logger.error(f"Payment failed for user {user_id}: {e}")
raise
5. Caching
from functools import lru_cache
from app.db.redis import redis_client
async def get_user_with_cache(user_id: str) -> dict:
"""Get user with Redis caching."""
cache_key = f"user:{user_id}"
# Try cache first
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
user = await get_user_by_id(user_id)
# Store in cache (1 hour TTL)
await redis_client.setex(
cache_key,
3600,
json.dumps(user)
)
return user
Service Documentation
Document services with comprehensive docstrings:async def update_user_preferences(
user_id: str,
preferences: dict
) -> dict:
"""
Update user preferences.
Args:
user_id: User ID
preferences: Dictionary of preference key-value pairs
Returns:
Updated preferences dictionary
Raises:
HTTPException: 404 if user not found, 500 on update failure
Example:
>>> await update_user_preferences(
... "user-123",
... {"theme": "dark", "notifications": True}
... )
{"theme": "dark", "notifications": True}
"""
pass