Skip to main content

Overview

This document traces data flows for key operations in Aurora, from user interaction to backend processing and storage.

Authentication Flow

OAuth 2.0 Flow (GCP Example)

Key Files:
  • OAuth URL: server/connectors/gcp_connector/auth/oauth.py:13-31
  • Token exchange: server/connectors/gcp_connector/auth/oauth.py:34-71
  • Vault storage: server/utils/vault/vault_client.py
  • Post-auth tasks: server/connectors/gcp_connector/gcp_post_auth_tasks.py
Flow Steps:
  1. Frontend requests OAuth URL from Flask API
  2. Flask generates authorization URL with scopes
  3. User approves permissions in GCP consent screen
  4. GCP redirects back with authorization code
  5. Flask exchanges code for access/refresh tokens
  6. Vault stores tokens securely (never in database)
  7. Postgres stores reference: vault:kv/data/aurora/users/{user_id}/gcp_token
  8. Celery runs background task to fetch initial project list
  9. Frontend receives success and updates UI

Chat Request Flow

AI Agent Query Processing

Key Files:
  • WebSocket handler: server/main_chatbot.py:604-1000
  • Workflow execution: server/chat/backend/agent/workflow.py:59-113
  • Agent logic: server/chat/backend/agent/agent.py:155-400
  • Tool execution: server/chat/backend/agent/tools/cloud_tools.py
Flow Steps:
  1. User types question in chat interface
  2. Frontend sends WebSocket message with query, session_id, user_id
  3. WebSocket server receives message and initializes LangGraph state
  4. Agent loads conversation history from PostgreSQL
  5. Agent builds dynamic system prompt based on connected providers
  6. LLM streams response tokens back through WebSocket
  7. Frontend renders tokens in real-time
  8. LLM decides to call list_gke_clusters tool
  9. Agent executes tool with user context
  10. Tool calls GCP API with credentials from Vault
  11. Tool returns structured result to agent
  12. Agent sends result back to LLM
  13. LLM generates final answer incorporating tool data
  14. WebSocket streams final answer to frontend
  15. Agent saves full conversation to PostgreSQL
  16. Agent indexes conversation in Weaviate for future retrieval

Infrastructure Deployment Flow

Terraform Resource Creation

Key Files:
  • IaC write: server/chat/backend/agent/tools/iac/iac_write_tool.py
  • IaC deploy: server/chat/backend/agent/tools/iac/iac_deploy_tool.py
  • Confirmations: server/utils/cloud/infrastructure_confirmation.py
  • Storage: server/utils/storage/storage.py
Flow Steps:
  1. User requests infrastructure (“Create a GKE cluster”)
  2. Agent asks LLM to plan the operation
  3. LLM calls iac_write tool to generate Terraform code
  4. IaC Tool writes .tf files to session-isolated directory in SeaweedFS
  5. Agent displays Terraform code to user for review
  6. LLM calls iac_deploy tool to apply changes
  7. IaC Tool requests user confirmation via WebSocket
  8. Frontend shows confirmation dialog with resource details
  9. User approves or rejects deployment
  10. Frontend sends confirmation_response back
  11. IaC Tool executes terraform init, terraform plan, terraform apply
  12. Terraform creates resources in GCP/AWS/Azure
  13. IaC Tool saves Terraform state to SeaweedFS
  14. IaC Tool logs deployment metadata to PostgreSQL
  15. Agent streams success message to frontend

Knowledge Base Ingestion Flow

Document Upload & Indexing

Key Files:
  • Upload endpoint: server/routes/knowledge_base.py
  • Indexing task: server/routes/knowledge_base/tasks.py
  • Storage manager: server/utils/storage/storage.py
  • Weaviate client: server/chat/backend/agent/weaviate_client.py
Flow Steps:
  1. User selects PDF/DOCX file and clicks Upload
  2. Frontend sends multipart form data to Flask API
  3. Flask validates file type and size (max 100MB)
  4. Storage saves file to SeaweedFS with key {user_id}/kb/{doc_id}.pdf
  5. Postgres creates record in kb_documents with status=‘processing’
  6. Celery picks up indexing task from Redis queue
  7. Celery downloads file from Storage
  8. Celery extracts text using PyPDF2/python-docx
  9. Celery splits text into 500-token chunks with 50-token overlap
  10. Weaviate generates embeddings using all-MiniLM-L6-v2 model
  11. Weaviate stores vectors in UserKnowledge collection
  12. Postgres updates document status to ‘indexed’
  13. Frontend receives SSE update and shows “Ready” badge

Incident Detection & RCA Flow

Background Chat Analysis

Key Files:
  • Incident creation: server/routes/incidents_routes.py
  • Background task: server/chat/background/task.py
  • RCA workflow: server/chat/backend/agent/agent.py
  • Thought saving: server/main_chatbot.py:229-286
Flow Steps:
  1. Monitoring system (Grafana/Datadog) sends webhook on alert
  2. Flask creates incident record in PostgreSQL
  3. Celery receives background RCA task from Redis
  4. Celery sends internal HTTP request to Chatbot service
  5. Chatbot initializes LangGraph workflow with incident context
  6. Agent instructs LLM to investigate the alert
  7. LLM decides to gather evidence:
    • Calls get_gcp_logs for recent error logs
    • Calls github_search_code for recent deployments
    • Calls splunk_search for metrics and traces
  8. Agent saves incremental “thoughts” to incident_thoughts table
  9. LLM synthesizes evidence into root cause analysis
  10. Agent updates incidents table with RCA summary
  11. Chatbot returns success to Celery task
  12. Frontend polls for incident updates and displays RCA

Service Discovery Flow

Periodic Resource Scanning

Key Files:
  • Beat schedule: server/celery_config.py:79-82
  • Discovery tasks: server/services/discovery/tasks.py
  • Memgraph client: server/services/discovery/memgraph_client.py
Flow Steps:
  1. Celery Beat triggers run_full_discovery task every hour
  2. Celery queries PostgreSQL for all users with connected providers
  3. For each user:
    • Retrieve credentials from Vault for each provider
    • Call cloud provider APIs to list resources:
      • GCP: Projects, GKE clusters, Compute instances, Cloud SQL
      • AWS: EC2 instances, EKS clusters, RDS databases, Lambda functions
      • Azure: VMs, AKS clusters, SQL databases, App Services
  4. Celery creates graph nodes in Memgraph:
    CREATE (s:Service {id: "...", name: "...", type: "..."})
    
  5. Celery creates relationships:
    MATCH (a:Service {id: "..."}) MATCH (b:Service {id: "..."}) 
    CREATE (a)-[:DEPENDS_ON]->(b)
    
  6. Celery caches service list in PostgreSQL graph_services table
  7. Frontend queries graph for visualization

Secret Storage Flow

Vault Secret Lifecycle

Key Files:
  • Vault client: server/utils/vault/vault_client.py
  • Token management: server/utils/auth/token_management.py
Storage Pattern:
# Store in Vault
vault_path = f"aurora/users/{user_id}/{secret_name}"
vault.secrets.kv.v2.create_or_update_secret(
    path=vault_path,
    secret=secret_data
)

# Store reference in DB
vault_reference = f"vault:kv/data/{vault_path}"
db.execute(
    "UPDATE credentials SET token = %s WHERE user_id = %s",
    (vault_reference, user_id)
)

# Retrieve from Vault
vault_path = extract_path_from_reference(vault_reference)
secret_data = vault.secrets.kv.v2.read_secret_version(path=vault_path)
Why This Pattern?
  • Never store secrets in database - Only references
  • Centralized secret rotation - Update in one place
  • Audit trail - Vault logs all access
  • Encryption at rest - Vault encrypts data

File Storage Flow

SeaweedFS S3 Operations

Key Files:
  • Storage abstraction: server/utils/storage/storage.py
  • S3 configuration: config/seaweedfs/s3.json
Usage Examples:
from utils.storage.storage import get_storage_manager

storage = get_storage_manager(user_id)

# Upload Terraform state
storage.upload_file(
    user_id,
    local_path="/tmp/terraform.tfstate",
    object_key=f"{session_id}/terraform_dir/terraform.tfstate"
)

# Download for restore
content = storage.download_file(
    user_id,
    f"{session_id}/terraform_dir/terraform.tfstate"
)

# List session files
files = storage.list_user_files(
    user_id,
    prefix=f"{session_id}/terraform_dir/"
)

Cache Flow

Redis Cache Patterns

Cache Keys:
# API cost tracking (5 min TTL)
f"api_cost:{user_id}"

# Cloud resource cache (30 min TTL)
f"gcp_projects:{user_id}"
f"aws_ec2:{user_id}:{region}"
f"azure_vms:{user_id}:{subscription}"

# Rate limiting (1 min TTL)
f"rate_limit:{user_id}:{endpoint}"
Reference: server/utils/billing/billing_cache.py

Build docs developers (and LLMs) love