Skip to main content

Overview

Aurora’s AI agent is built on LangGraph, a framework for building stateful, multi-step workflows with large language models. The agent can execute cloud operations, search knowledge bases, run commands, and more through a tool-based architecture.

LangGraph Architecture

State Graph

The agent workflow is implemented as a directed graph with nodes representing operations and edges defining the flow.
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(State)
workflow.add_node("agentic_tool_flow", agent.agentic_tool_flow)
workflow.add_edge(START, "agentic_tool_flow")
workflow.add_edge("agentic_tool_flow", END)
app = workflow.compile(checkpointer=memory)
Reference: server/chat/backend/agent/workflow.py:103-113

State Management

The workflow state stores conversation context and execution metadata:
class State(TypedDict):
    messages: List[BaseMessage]      # Chat history
    user_id: str                     # Authenticated user
    session_id: str                  # Chat session ID
    question: str                    # User query
    provider_preference: List[str]   # Connected cloud providers
    selected_project_id: str         # Active project/subscription
    mode: str                        # "agent" or "ask" mode
    attachments: List[Dict]          # File uploads
    session_files: List[Dict]        # S3 storage files
Reference: server/chat/backend/agent/utils/state.py

Memory Persistence

LangGraph uses a memory saver to persist conversation context across requests:
from chat.backend.agent.utils.safe_memory_saver import SafeMemorySaver

memory = SafeMemorySaver()
config = RunnableConfig({"configurable": {"thread_id": session_id}})
app = workflow.compile(checkpointer=memory)
Reference: server/chat/backend/agent/workflow.py:60-64

Agent Execution Flow

1. Initialize Connection

User connects via WebSocket with authentication:
{
  "type": "init",
  "user_id": "user-123"
}
Reference: server/main_chatbot.py:94-133

2. Receive Query

User sends a question with context:
{
  "type": "query",
  "query": "List all GKE clusters in my project",
  "user_id": "user-123",
  "session_id": "session-456",
  "mode": "agent",
  "model": "openai/gpt-4",
  "provider_preference": ["gcp"]
}
Reference: server/main_chatbot.py:806-839

3. Set User Context

The agent sets thread-local context for tools:
from chat.backend.agent.tools.cloud_tools import set_user_context

set_user_context(
    user_id=state.user_id,
    session_id=state.session_id,
    provider_preference=provider_preference,
    selected_project_id=selected_project_id,
    state=state,
    mode=mode
)
Reference: server/chat/backend/agent/agent.py:200-208

4. Build System Prompt

Dynamic system prompt based on connected providers and mode:
from chat.backend.agent.prompt.prompt_builder import build_prompt_segments

segments = build_prompt_segments(
    provider_preference=provider_preference,
    mode=mode,
    has_zip_reference=has_zip,
    state=state
)

system_prompt = assemble_system_prompt(segments)
Reference: server/chat/backend/agent/agent.py:244-250

5. Load Available Tools

Tools are filtered based on mode and connected providers:
from chat.backend.agent.tools.cloud_tools import get_cloud_tools

tools = get_cloud_tools(
    provider_preference=provider_preference,
    mode=mode,
    state=state
)
Reference: server/chat/backend/agent/agent.py:168

6. Execute LangGraph Workflow

The workflow streams events as the agent thinks and acts:
async for event_type, event_data in workflow.stream(state):
    if event_type == "token":
        # Stream LLM tokens to frontend
        await websocket.send(json.dumps({
            "type": "message",
            "data": {"text": token_text, "is_chunk": True}
        }))
    elif event_type == "messages":
        # Handle tool calls from LLM
        if msg_chunk.tool_calls:
            for tool_call in msg_chunk.tool_calls:
                # Execute tool and return result
Reference: server/main_chatbot.py:331-470

7. Tool Execution

When the LLM decides to use a tool:
@tool
def list_gke_clusters(project_id: str, location: str = "-") -> str:
    """List all GKE clusters in a GCP project."""
    # Tool execution logic
    return json.dumps({"status": "success", "clusters": [...]})
The tool result is added to the conversation and the LLM continues reasoning.

8. Stream Response

Final answer is streamed token-by-token to the frontend:
{
  "type": "message",
  "data": {
    "text": "Found 3 GKE clusters:\n1. prod-cluster-1\n2. staging-cluster\n3. dev-cluster",
    "is_chunk": false,
    "is_complete": true
  }
}
Reference: server/main_chatbot.py:439-470

Agent Tools

Aurora provides 30+ tools across multiple categories:

Cloud Operations

ToolPurposeFile
list_gke_clustersList GKE clusterscloud_tools.py
list_gcp_compute_instancesList GCP VMscloud_tools.py
list_aws_ec2_instancesList AWS EC2cloud_tools.py
list_azure_vmsList Azure VMscloud_tools.py
get_gcp_logsQuery GCP logscloud_tools.py
iac_writeWrite Terraformiac_tool.py
iac_deployDeploy infrastructureiac_tool.py
ToolPurposeFile
knowledge_base_searchSemantic search uploaded docsknowledge_base_search_tool.py
confluence_searchSearch Confluence wikisconfluence_search_tool.py
web_searchSearch the internetweb_search_tool.py

Command Execution

ToolPurposeFile
run_kubectl_commandExecute kubectl in podscloud_tools.py
terminal_execRun bash commandsterminal_exec_tool.py
cloud_execExecute cloud CLI commandscloud_exec_tool.py

Source Control

ToolPurposeFile
github_search_codeSearch GitHub reposgithub_rca_tool.py
github_get_fileRead file contentgithub_rca_tool.py
github_commitCommit and push changesgithub_commit_tool.py
github_apply_fixApply code fixesgithub_apply_fix_tool.py

Monitoring & Observability

ToolPurposeFile
splunk_searchSearch Splunk logssplunk_tool.py
dynatrace_queryQuery Dynatrace metricsdynatrace_tool.py
coroot_analyzeAnalyze Coroot datacoroot_tool.py
jenkins_get_build_logsFetch Jenkins logsjenkins_rca_tool.py

File Operations

ToolPurposeFile
extract_zip_fileExtract ZIP archiveszip_file_tool.py
read_terraform_fileRead Terraform filesiac_tool.py
Reference: server/chat/backend/agent/tools/

Tool Execution Pattern

Tool Definition

Tools are defined using LangChain’s @tool decorator:
from langchain.agents import tool
from chat.backend.agent.tools.cloud_tools import get_user_context

@tool
def list_gke_clusters(project_id: str, location: str = "-") -> str:
    """List all GKE clusters in a GCP project.
    
    Args:
        project_id: GCP project ID
        location: GCP region or zone (default: all)
    
    Returns:
        JSON string with cluster details
    """
    # Get user context
    ctx = get_user_context()
    user_id = ctx["user_id"]
    
    # Execute operation
    from connectors.gcp_connector.gcp.projects import list_gke_clusters
    clusters = list_gke_clusters(user_id, project_id, location)
    
    return json.dumps({"status": "success", "clusters": clusters})

Tool Context Access

Tools access user context via thread-local storage:
from chat.backend.agent.tools.cloud_tools import get_user_context

ctx = get_user_context()
user_id = ctx["user_id"]
session_id = ctx["session_id"]
provider = ctx["provider_preference"]
mode = ctx["mode"]  # "agent" or "ask"
Reference: server/chat/backend/agent/tools/cloud_tools.py:58-98

Tool Output Streaming

Tools can send real-time updates via WebSocket:
from chat.backend.agent.tools.cloud_tools import get_websocket_sender

sender = get_websocket_sender()
if sender:
    await sender({
        "type": "tool_progress",
        "data": {"message": "Deploying cluster..."}
    })
Reference: server/main_chatbot.py:176-215

Error Handling

Tools return structured JSON with status:
# Success
return json.dumps({
    "status": "success",
    "data": {...}
})

# Failure
return json.dumps({
    "status": "failed",
    "error": "Permission denied",
    "message": "User lacks IAM permissions"
})

Access Control

Agent vs Ask Mode

Aurora supports two operational modes: Agent Mode (Full Access):
  • Can execute infrastructure changes
  • Can commit code to GitHub
  • Can deploy Terraform resources
  • Can run destructive operations
Ask Mode (Read-Only):
  • Can read cloud resources
  • Can search knowledge bases
  • Cannot modify infrastructure
  • Cannot commit code
Reference: server/chat/backend/agent/access.py

Tool Filtering

from chat.backend.agent.access import ModeAccessController

if not ModeAccessController.is_tool_allowed(mode, tool_name):
    return {"error": "Tool not available in Ask mode"}
Reference: server/main_chatbot.py:867-876

LLM Integration

Multi-Provider Support

Aurora supports multiple LLM providers:
from chat.backend.agent.llm import LLMManager

llm_manager = LLMManager()
llm = llm_manager.get_llm(
    model="openai/gpt-4",
    streaming=True,
    temperature=0.0
)
Supported providers:
  • OpenRouter: Access 100+ models (default)
  • OpenAI: GPT-3.5, GPT-4, GPT-4o
  • Anthropic: Claude 3.5 Sonnet, Claude 3 Opus
  • Google: Gemini Pro, Gemini 2.0 (with thinking)
Reference: server/chat/backend/agent/llm.py

Prompt Caching

Aurora uses prefix caching to reduce latency and costs:
from chat.backend.agent.utils.prefix_cache import PrefixCacheManager

cache_manager = PrefixCacheManager()
cache_manager.register_breakpoint(
    "system_prompt",
    content=system_prompt,
    priority=100
)
Reference: server/chat/backend/agent/utils/prefix_cache.py

MCP Integration

Aurora supports Model Context Protocol (MCP) for extending agent capabilities:

MCP Preloader

MCP servers are preloaded on startup for faster response times:
from chat.backend.agent.tools.mcp_preloader import start_mcp_preloader

mcp_preloader = start_mcp_preloader()
Reference: server/main_compute.py:102-106

Dynamic Tool Loading

MCP tools are dynamically loaded based on user connections:
from chat.backend.agent.tools.mcp_tools import get_mcp_tools_for_user

mcp_tools = get_mcp_tools_for_user(user_id)
all_tools = base_tools + mcp_tools
Reference: server/chat/backend/agent/tools/mcp_tools.py

Streaming Architecture

Token Streaming

LLM responses stream token-by-token for perceived speed:
async for event_type, event_data in workflow.stream(state):
    if event_type == "token":
        await websocket.send(json.dumps({
            "type": "message",
            "data": {"text": token, "is_chunk": True}
        }))
Reference: server/main_chatbot.py:341-360

Tool Call Streaming

Tool execution status is streamed in real-time:
# Tool call started
await websocket.send(json.dumps({
    "type": "tool_call",
    "data": {
        "tool_name": "list_gke_clusters",
        "input": {"project_id": "my-project"},
        "status": "running"
    }
}))

# Tool call completed
await websocket.send(json.dumps({
    "type": "tool_result",
    "data": {
        "tool_name": "list_gke_clusters",
        "result": {...},
        "status": "success"
    }
}))
Reference: server/main_chatbot.py:386-414

Cancellation & Cleanup

Workflow Cancellation

Users can cancel in-progress workflows:
{
  "type": "control",
  "action": "cancel",
  "session_id": "session-456"
}
The system:
  1. Cancels the asyncio task
  2. Waits for ongoing tool calls to complete
  3. Consolidates message chunks
  4. Saves context for resumption
  5. Sends END status to frontend
Reference: server/main_chatbot.py:709-800

Terraform Cleanup

Terraform state is cleaned up after deployment:
from utils.terraform.terraform_cleanup import cleanup_terraform_directory

cleanup_terraform_directory(user_id, session_id)
Reference: server/chat/backend/agent/agent.py:40-78

Build docs developers (and LLMs) love