Graph Manager
The Graph Manager handles initialization, registration, and retrieval of LangGraph agent instances in GAIA.
Overview
GAIA uses a centralized graph management system to:
- Avoid circular imports between modules
- Enable lazy initialization of graphs
- Provide named graph instances
- Support multiple agent types
# Location: apps/api/app/agents/core/graph_manager.py:13
class GraphManager:
"""Centralized manager for LangGraph instances."""
@classmethod
def set_graph(cls, graph_instance: Any, graph_name: str = "default_graph"):
"""Register a graph instance with the given name."""
providers.register(graph_name, loader_func=lambda: graph_instance)
@classmethod
async def get_graph(cls, graph_name: str = "default_graph") -> Any:
"""Retrieve the graph instance by name."""
logger.info(f"Attempting to get graph '{graph_name}'")
try:
graph = await providers.aget(graph_name)
if graph is not None:
logger.info(
f"Successfully retrieved graph '{graph_name}' from lazy provider"
)
return graph
else:
logger.error(
f"Graph '{graph_name}' returned None from lazy provider"
)
return None
except KeyError as e:
logger.error(
f"Graph provider '{graph_name}' not registered: {e}"
)
return None
except Exception as e:
logger.error(f"Error retrieving graph '{graph_name}': {e}", exc_info=True)
return None
Lazy Provider Pattern
GAIA uses a lazy provider system to defer graph initialization:
# Location: apps/api/app/core/lazy_loader.py
from app.core.lazy_loader import lazy_provider, MissingKeyStrategy
@lazy_provider(
name="executor_agent",
required_keys=[],
strategy=MissingKeyStrategy.WARN,
auto_initialize=False,
)
async def build_executor_agent():
"""Build and return the executor agent lazily."""
logger.debug("Building executor agent with lazy providers")
async with build_executor_graph() as graph:
logger.info("Executor agent built successfully")
return graph
Benefits of Lazy Loading
- Faster Startup: Graphs aren’t built until needed
- Conditional Loading: Only load graphs required by environment
- Error Isolation: Failed graph initialization doesn’t crash the app
- Resource Efficiency: Save memory by not loading unused graphs
Graph Registration
Graphs are registered during application startup:
# Location: apps/api/app/agents/core/graph_builder/build_graph.py:153
def build_graphs():
"""Build comms and executor agents and register subagent providers."""
logger.info("Building core agent graphs...")
register_subagent_providers()
build_executor_agent()
build_comms_agent()
logger.info("Core agent graphs built and registered successfully")
This is called during FastAPI lifespan:
# Location: apps/api/app/core/factory.py
from app.agents.core.graph_builder.build_graph import build_graphs
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
# Startup
logger.info("Starting up application...")
# Initialize databases
await init_databases()
# Build agent graphs
build_graphs()
yield
# Shutdown
logger.info("Shutting down application...")
await close_databases()
Graph Retrieval
Graphs are retrieved when handling requests:
# Location: apps/api/app/agents/core/agent.py:88
async def _core_agent_logic(...):
"""Core agent initialization logic."""
# Retrieve the comms agent graph
graph = await GraphManager.get_graph("comms_agent")
if graph is None:
raise RuntimeError("Comms agent graph not available")
# Use graph for execution
result = await graph.ainvoke(initial_state, config)
return result
Multiple Graph Instances
GAIA manages several graph types:
1. Comms Agent Graph
@lazy_provider(
name="comms_agent",
required_keys=[],
strategy=MissingKeyStrategy.WARN,
auto_initialize=False,
)
async def build_comms_agent():
"""Build and return the comms agent."""
async with build_comms_graph() as graph:
logger.info("Comms agent built successfully")
return graph
Purpose: Primary user-facing agent for chat interactions
Tools:
call_executor: Delegate to executor
add_memory: Store memories
search_memory: Retrieve memories
2. Executor Agent Graph
@lazy_provider(
name="executor_agent",
required_keys=[],
strategy=MissingKeyStrategy.WARN,
auto_initialize=False,
)
async def build_executor_agent():
"""Build and return the executor agent."""
async with build_executor_graph() as graph:
logger.info("Executor agent built successfully")
return graph
Purpose: Execute tasks with full tool access
Tools: All agent tools + handoff to subagents
3. Subagent Graphs
# Location: apps/api/app/agents/core/subagents/provider_subagents.py
def register_subagent_providers():
"""Register lazy providers for all subagents."""
@lazy_provider(name="gmail_subagent", auto_initialize=False)
async def build_gmail_subagent():
llm = init_llm()
return await SubAgentFactory.create_provider_subagent(
provider="gmail",
name="gmail_subagent",
llm=llm,
tool_space="gmail_delegated",
)
@lazy_provider(name="calendar_subagent", auto_initialize=False)
async def build_calendar_subagent():
llm = init_llm()
return await SubAgentFactory.create_provider_subagent(
provider="calendar",
name="calendar_subagent",
llm=llm,
tool_space="calendar_delegated",
)
# Register more subagents...
Purpose: Specialized agents for specific integrations
Tools: Scoped to provider (e.g., Gmail tools only)
Graph Context Managers
Graphs are built using async context managers:
# Location: apps/api/app/agents/core/graph_builder/build_graph.py:30
@asynccontextmanager
async def build_executor_graph(
chat_llm: Optional[LanguageModelLike] = None,
in_memory_checkpointer: bool = False,
):
"""Construct and compile the executor agent graph."""
if chat_llm is None:
chat_llm = init_llm()
tool_registry, store = await asyncio.gather(
get_tool_registry(),
get_tools_store(),
)
tool_dict = tool_registry.get_tool_dict()
tool_dict.update({"handoff": handoff_tool})
builder = create_agent(
llm=chat_llm,
agent_name="executor_agent",
tool_registry=tool_dict,
retrieve_tools_coroutine=get_retrieve_tools_function(),
initial_tool_ids=["handoff"],
pre_model_hooks=[
filter_messages_node,
manage_system_prompts_node,
trim_messages_node,
],
)
checkpointer_manager = await get_checkpointer_manager()
if in_memory_checkpointer or not checkpointer_manager:
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer, store=store)
logger.debug("Graph compiled with in-memory checkpointer")
yield graph
else:
checkpointer = checkpointer_manager.get_checkpointer()
graph = builder.compile(checkpointer=checkpointer, store=store)
logger.debug("Graph compiled with PostgreSQL checkpointer")
yield graph
Why Context Managers?
Context managers ensure proper cleanup:
async with build_comms_graph() as graph:
# Use graph
result = await graph.ainvoke(...)
# Automatic cleanup on exit
This pattern:
- Ensures resources are released
- Handles exceptions gracefully
- Supports dependency injection
Checkpointer Management
Graphs require checkpointers for state persistence:
# Location: apps/api/app/agents/core/graph_builder/checkpointer_manager.py
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg
class CheckpointerManager:
"""Manages PostgreSQL checkpointer for graph state persistence."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
def get_checkpointer(self) -> PostgresSaver:
"""Get PostgreSQL checkpointer instance.
Returns:
PostgresSaver configured with database pool
"""
return PostgresSaver(pool=self.pool)
async def get_checkpointer_manager() -> Optional[CheckpointerManager]:
"""Get or create checkpointer manager.
Returns:
CheckpointerManager if database is available, None otherwise
"""
try:
pool = await get_postgres_pool()
return CheckpointerManager(pool)
except Exception as e:
logger.warning(f"Failed to create checkpointer manager: {e}")
return None
Dynamic Graph Creation
Some graphs are created dynamically (e.g., for workflows):
# Location: apps/api/app/services/workflow/workflow_subagent.py
class WorkflowSubagentRunner:
@staticmethod
async def execute(
task: str,
user_id: str,
thread_id: str,
user_name: Optional[str] = None,
user_time: Optional[datetime] = None,
stream_writer=None,
) -> str:
"""Execute workflow subagent with custom tools."""
# Create subagent dynamically
llm = init_llm()
subagent_graph = await SubAgentFactory.create_provider_subagent(
provider="workflow",
name="workflow_subagent",
llm=llm,
tool_space="workflow_delegated",
use_direct_tools=True,
)
# Execute
result = await subagent_graph.ainvoke(
{"messages": [HumanMessage(content=task)]},
config=config,
)
return extract_response(result)
Error Handling
Graph retrieval includes comprehensive error handling:
async def get_graph(cls, graph_name: str = "default_graph") -> Any:
"""Get the graph instance by name."""
logger.info(f"Attempting to get graph '{graph_name}'")
try:
graph = await providers.aget(graph_name)
if graph is not None:
logger.info(f"Successfully retrieved graph '{graph_name}'")
return graph
else:
logger.error(f"Graph '{graph_name}' returned None")
return None
except KeyError as e:
logger.error(f"Graph provider '{graph_name}' not registered: {e}")
return None
except Exception as e:
logger.error(f"Error retrieving graph '{graph_name}': {e}", exc_info=True)
return None
Fallback Strategies
Implement fallbacks when graphs fail:
graph = await GraphManager.get_graph("comms_agent")
if graph is None:
# Fallback: Use in-memory graph
logger.warning("Using fallback in-memory graph")
async with build_comms_graph(in_memory_checkpointer=True) as fallback_graph:
result = await fallback_graph.ainvoke(state, config)
Testing Graph Initialization
Test graphs with in-memory checkpointers:
import pytest
from app.agents.core.graph_builder.build_graph import build_comms_graph
@pytest.mark.asyncio
async def test_comms_graph_creation():
"""Test comms agent graph can be created."""
async with build_comms_graph(in_memory_checkpointer=True) as graph:
assert graph is not None
# Test execution
result = await graph.ainvoke(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "test"}}
)
assert "messages" in result
Always use in_memory_checkpointer=True for tests to avoid database dependencies.
Best Practices
1. Register Graphs at Startup
Register all graphs during application lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
build_graphs() # Register all graphs
yield
# Shutdown
2. Use Descriptive Names
Name graphs clearly:
# Good
GraphManager.set_graph(graph, "comms_agent")
GraphManager.set_graph(graph, "executor_agent")
# Avoid
GraphManager.set_graph(graph, "agent1")
GraphManager.set_graph(graph, "main")
3. Handle Missing Graphs Gracefully
Always check if graph retrieval succeeded:
graph = await GraphManager.get_graph("comms_agent")
if graph is None:
raise RuntimeError("Comms agent not available")
4. Use Context Managers for Cleanup
Build graphs with context managers:
async with build_executor_graph() as graph:
result = await graph.ainvoke(state, config)
# Automatic cleanup
Next Steps