Skip to main content

Graph Manager

The Graph Manager handles initialization, registration, and retrieval of LangGraph agent instances in GAIA.

Overview

GAIA uses a centralized graph management system to:
  • Avoid circular imports between modules
  • Enable lazy initialization of graphs
  • Provide named graph instances
  • Support multiple agent types
# Location: apps/api/app/agents/core/graph_manager.py:13
class GraphManager:
    """Centralized manager for LangGraph instances."""
    
    @classmethod
    def set_graph(cls, graph_instance: Any, graph_name: str = "default_graph"):
        """Register a graph instance with the given name."""
        providers.register(graph_name, loader_func=lambda: graph_instance)
    
    @classmethod
    async def get_graph(cls, graph_name: str = "default_graph") -> Any:
        """Retrieve the graph instance by name."""
        logger.info(f"Attempting to get graph '{graph_name}'")
        try:
            graph = await providers.aget(graph_name)
            if graph is not None:
                logger.info(
                    f"Successfully retrieved graph '{graph_name}' from lazy provider"
                )
                return graph
            else:
                logger.error(
                    f"Graph '{graph_name}' returned None from lazy provider"
                )
                return None
        except KeyError as e:
            logger.error(
                f"Graph provider '{graph_name}' not registered: {e}"
            )
            return None
        except Exception as e:
            logger.error(f"Error retrieving graph '{graph_name}': {e}", exc_info=True)
            return None

Lazy Provider Pattern

GAIA uses a lazy provider system to defer graph initialization:
# Location: apps/api/app/core/lazy_loader.py
from app.core.lazy_loader import lazy_provider, MissingKeyStrategy

@lazy_provider(
    name="executor_agent",
    required_keys=[],
    strategy=MissingKeyStrategy.WARN,
    auto_initialize=False,
)
async def build_executor_agent():
    """Build and return the executor agent lazily."""
    logger.debug("Building executor agent with lazy providers")
    
    async with build_executor_graph() as graph:
        logger.info("Executor agent built successfully")
    return graph

Benefits of Lazy Loading

  1. Faster Startup: Graphs aren’t built until needed
  2. Conditional Loading: Only load graphs required by environment
  3. Error Isolation: Failed graph initialization doesn’t crash the app
  4. Resource Efficiency: Save memory by not loading unused graphs

Graph Registration

Graphs are registered during application startup:
# Location: apps/api/app/agents/core/graph_builder/build_graph.py:153
def build_graphs():
    """Build comms and executor agents and register subagent providers."""
    logger.info("Building core agent graphs...")
    
    register_subagent_providers()
    build_executor_agent()
    build_comms_agent()
    
    logger.info("Core agent graphs built and registered successfully")
This is called during FastAPI lifespan:
# Location: apps/api/app/core/factory.py
from app.agents.core.graph_builder.build_graph import build_graphs

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager."""
    # Startup
    logger.info("Starting up application...")
    
    # Initialize databases
    await init_databases()
    
    # Build agent graphs
    build_graphs()
    
    yield
    
    # Shutdown
    logger.info("Shutting down application...")
    await close_databases()

Graph Retrieval

Graphs are retrieved when handling requests:
# Location: apps/api/app/agents/core/agent.py:88
async def _core_agent_logic(...):
    """Core agent initialization logic."""
    
    # Retrieve the comms agent graph
    graph = await GraphManager.get_graph("comms_agent")
    
    if graph is None:
        raise RuntimeError("Comms agent graph not available")
    
    # Use graph for execution
    result = await graph.ainvoke(initial_state, config)
    return result

Multiple Graph Instances

GAIA manages several graph types:

1. Comms Agent Graph

@lazy_provider(
    name="comms_agent",
    required_keys=[],
    strategy=MissingKeyStrategy.WARN,
    auto_initialize=False,
)
async def build_comms_agent():
    """Build and return the comms agent."""
    async with build_comms_graph() as graph:
        logger.info("Comms agent built successfully")
    return graph
Purpose: Primary user-facing agent for chat interactions Tools:
  • call_executor: Delegate to executor
  • add_memory: Store memories
  • search_memory: Retrieve memories

2. Executor Agent Graph

@lazy_provider(
    name="executor_agent",
    required_keys=[],
    strategy=MissingKeyStrategy.WARN,
    auto_initialize=False,
)
async def build_executor_agent():
    """Build and return the executor agent."""
    async with build_executor_graph() as graph:
        logger.info("Executor agent built successfully")
    return graph
Purpose: Execute tasks with full tool access Tools: All agent tools + handoff to subagents

3. Subagent Graphs

# Location: apps/api/app/agents/core/subagents/provider_subagents.py
def register_subagent_providers():
    """Register lazy providers for all subagents."""
    
    @lazy_provider(name="gmail_subagent", auto_initialize=False)
    async def build_gmail_subagent():
        llm = init_llm()
        return await SubAgentFactory.create_provider_subagent(
            provider="gmail",
            name="gmail_subagent",
            llm=llm,
            tool_space="gmail_delegated",
        )
    
    @lazy_provider(name="calendar_subagent", auto_initialize=False)
    async def build_calendar_subagent():
        llm = init_llm()
        return await SubAgentFactory.create_provider_subagent(
            provider="calendar",
            name="calendar_subagent",
            llm=llm,
            tool_space="calendar_delegated",
        )
    
    # Register more subagents...
Purpose: Specialized agents for specific integrations Tools: Scoped to provider (e.g., Gmail tools only)

Graph Context Managers

Graphs are built using async context managers:
# Location: apps/api/app/agents/core/graph_builder/build_graph.py:30
@asynccontextmanager
async def build_executor_graph(
    chat_llm: Optional[LanguageModelLike] = None,
    in_memory_checkpointer: bool = False,
):
    """Construct and compile the executor agent graph."""
    if chat_llm is None:
        chat_llm = init_llm()
    
    tool_registry, store = await asyncio.gather(
        get_tool_registry(),
        get_tools_store(),
    )
    
    tool_dict = tool_registry.get_tool_dict()
    tool_dict.update({"handoff": handoff_tool})
    
    builder = create_agent(
        llm=chat_llm,
        agent_name="executor_agent",
        tool_registry=tool_dict,
        retrieve_tools_coroutine=get_retrieve_tools_function(),
        initial_tool_ids=["handoff"],
        pre_model_hooks=[
            filter_messages_node,
            manage_system_prompts_node,
            trim_messages_node,
        ],
    )
    
    checkpointer_manager = await get_checkpointer_manager()
    
    if in_memory_checkpointer or not checkpointer_manager:
        checkpointer = InMemorySaver()
        graph = builder.compile(checkpointer=checkpointer, store=store)
        logger.debug("Graph compiled with in-memory checkpointer")
        yield graph
    else:
        checkpointer = checkpointer_manager.get_checkpointer()
        graph = builder.compile(checkpointer=checkpointer, store=store)
        logger.debug("Graph compiled with PostgreSQL checkpointer")
        yield graph

Why Context Managers?

Context managers ensure proper cleanup:
async with build_comms_graph() as graph:
    # Use graph
    result = await graph.ainvoke(...)
    # Automatic cleanup on exit
This pattern:
  • Ensures resources are released
  • Handles exceptions gracefully
  • Supports dependency injection

Checkpointer Management

Graphs require checkpointers for state persistence:
# Location: apps/api/app/agents/core/graph_builder/checkpointer_manager.py
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg

class CheckpointerManager:
    """Manages PostgreSQL checkpointer for graph state persistence."""
    
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    def get_checkpointer(self) -> PostgresSaver:
        """Get PostgreSQL checkpointer instance.
        
        Returns:
            PostgresSaver configured with database pool
        """
        return PostgresSaver(pool=self.pool)

async def get_checkpointer_manager() -> Optional[CheckpointerManager]:
    """Get or create checkpointer manager.
    
    Returns:
        CheckpointerManager if database is available, None otherwise
    """
    try:
        pool = await get_postgres_pool()
        return CheckpointerManager(pool)
    except Exception as e:
        logger.warning(f"Failed to create checkpointer manager: {e}")
        return None

Dynamic Graph Creation

Some graphs are created dynamically (e.g., for workflows):
# Location: apps/api/app/services/workflow/workflow_subagent.py
class WorkflowSubagentRunner:
    @staticmethod
    async def execute(
        task: str,
        user_id: str,
        thread_id: str,
        user_name: Optional[str] = None,
        user_time: Optional[datetime] = None,
        stream_writer=None,
    ) -> str:
        """Execute workflow subagent with custom tools."""
        
        # Create subagent dynamically
        llm = init_llm()
        
        subagent_graph = await SubAgentFactory.create_provider_subagent(
            provider="workflow",
            name="workflow_subagent",
            llm=llm,
            tool_space="workflow_delegated",
            use_direct_tools=True,
        )
        
        # Execute
        result = await subagent_graph.ainvoke(
            {"messages": [HumanMessage(content=task)]},
            config=config,
        )
        
        return extract_response(result)

Error Handling

Graph retrieval includes comprehensive error handling:
async def get_graph(cls, graph_name: str = "default_graph") -> Any:
    """Get the graph instance by name."""
    logger.info(f"Attempting to get graph '{graph_name}'")
    try:
        graph = await providers.aget(graph_name)
        if graph is not None:
            logger.info(f"Successfully retrieved graph '{graph_name}'")
            return graph
        else:
            logger.error(f"Graph '{graph_name}' returned None")
            return None
    except KeyError as e:
        logger.error(f"Graph provider '{graph_name}' not registered: {e}")
        return None
    except Exception as e:
        logger.error(f"Error retrieving graph '{graph_name}': {e}", exc_info=True)
        return None

Fallback Strategies

Implement fallbacks when graphs fail:
graph = await GraphManager.get_graph("comms_agent")

if graph is None:
    # Fallback: Use in-memory graph
    logger.warning("Using fallback in-memory graph")
    async with build_comms_graph(in_memory_checkpointer=True) as fallback_graph:
        result = await fallback_graph.ainvoke(state, config)

Testing Graph Initialization

Test graphs with in-memory checkpointers:
import pytest
from app.agents.core.graph_builder.build_graph import build_comms_graph

@pytest.mark.asyncio
async def test_comms_graph_creation():
    """Test comms agent graph can be created."""
    async with build_comms_graph(in_memory_checkpointer=True) as graph:
        assert graph is not None
        
        # Test execution
        result = await graph.ainvoke(
            {"messages": [HumanMessage(content="Hello")]},
            config={"configurable": {"thread_id": "test"}}
        )
        
        assert "messages" in result
Always use in_memory_checkpointer=True for tests to avoid database dependencies.

Best Practices

1. Register Graphs at Startup

Register all graphs during application lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    build_graphs()  # Register all graphs
    yield
    # Shutdown

2. Use Descriptive Names

Name graphs clearly:
# Good
GraphManager.set_graph(graph, "comms_agent")
GraphManager.set_graph(graph, "executor_agent")

# Avoid
GraphManager.set_graph(graph, "agent1")
GraphManager.set_graph(graph, "main")

3. Handle Missing Graphs Gracefully

Always check if graph retrieval succeeded:
graph = await GraphManager.get_graph("comms_agent")
if graph is None:
    raise RuntimeError("Comms agent not available")

4. Use Context Managers for Cleanup

Build graphs with context managers:
async with build_executor_graph() as graph:
    result = await graph.ainvoke(state, config)
# Automatic cleanup

Next Steps

Build docs developers (and LLMs) love