Skip to main content

State Management

GAIA’s agent system uses LangGraph’s state management to maintain conversation context, tool outputs, and user preferences across agent invocations.

State Schema

The core state schema is defined using Pydantic:
# Location: apps/api/app/agents/core/state.py:24
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages

class State(DictLikeModel):
    """Agent state schema with message history and context."""
    
    query: str = ""
    """Current user query/message."""
    
    messages: Annotated[List[AnyMessage], add_messages] = Field(default_factory=list)
    """Conversation message history with automatic merging."""
    
    current_datetime: Optional[str] = None
    """User's current datetime in their timezone."""
    
    mem0_user_id: Optional[str] = None
    """User ID for Mem0 memory storage."""
    
    memories: List[str] = Field(default_factory=list)
    """Retrieved memories relevant to current conversation."""
    
    memories_stored: bool = False
    """Flag indicating if memories have been stored this turn."""
    
    conversation_id: Optional[str] = None
    """Thread/conversation identifier for state persistence."""

Message Annotations

The messages field uses LangGraph’s add_messages reducer:
from langgraph.graph import add_messages

messages: Annotated[List[AnyMessage], add_messages]
This annotation provides automatic message merging:
  • Appending: New messages are added to the list
  • Updating: Messages with matching IDs are updated
  • Deduplication: Prevents duplicate messages

Message Types

from langchain_core.messages import (
    HumanMessage,      # User messages
    AIMessage,         # Agent responses
    SystemMessage,     # System prompts
    ToolMessage,       # Tool execution results
    FunctionMessage,   # Legacy function calls
)

# Example: Adding a human message
state["messages"].append(
    HumanMessage(content="Schedule a meeting for tomorrow")
)

# Example: AI response with tool calls
state["messages"].append(
    AIMessage(
        content="I'll check your calendar",
        tool_calls=[{
            "id": "call_123",
            "name": "CUSTOM_FETCH_EVENTS",
            "args": {"time_min": "2024-01-01T00:00:00Z"}
        }]
    )
)

# Example: Tool result
state["messages"].append(
    ToolMessage(
        content='{"events": [...]}',
        tool_call_id="call_123"
    )
)

State Initialization

States are initialized when agents are invoked:
# Location: apps/api/app/helpers/agent_helpers.py
def build_initial_state(
    request: MessageRequestWithHistory,
    user_id: str,
    conversation_id: str,
    history: List[AnyMessage],
    trigger_context: Optional[dict] = None,
) -> dict:
    """
    Build initial state for agent invocation.
    
    Args:
        request: User request with message and context
        user_id: User identifier
        conversation_id: Thread identifier
        history: Previous conversation messages
        trigger_context: Optional workflow trigger context
    
    Returns:
        Initial state dictionary
    """
    return {
        "query": request.message,
        "messages": history,
        "current_datetime": datetime.now().isoformat(),
        "mem0_user_id": user_id,
        "conversation_id": conversation_id,
        "memories": [],
        "memories_stored": False,
    }

State Updates

Nodes update state by returning dictionaries:
def my_node(state: State) -> dict:
    """Node that updates messages and memories."""
    
    # Access current state
    current_messages = state.messages
    user_id = state.mem0_user_id
    
    # Perform operations...
    memories = search_memories(user_id, state.query)
    
    # Return state updates
    return {
        "memories": memories,
        "messages": [SystemMessage(content="Found 3 relevant memories")]
    }

Partial Updates

You only need to return fields that changed:
def trim_messages_node(state: State) -> dict:
    """Only update messages field."""
    trimmed = state.messages[-10:]  # Keep last 10
    return {"messages": trimmed}

Accumulation vs. Replacement

The add_messages reducer appends by default:
# Initial state
state = {"messages": [HumanMessage(content="Hello")]}

# Node returns
return {"messages": [AIMessage(content="Hi there")]}

# Result: Both messages are kept
state["messages"]  # [HumanMessage(...), AIMessage(...)]
To replace messages, return the full list:
def filter_messages_node(state: State) -> dict:
    """Replace messages with filtered version."""
    filtered = [msg for msg in state.messages if len(msg.content) < 10000]
    return {"messages": filtered}

Agent Configuration

Runtime configuration is passed via RunnableConfig:
# Location: apps/api/app/helpers/agent_helpers.py
def build_agent_config(
    conversation_id: str,
    user: dict,
    user_time: datetime,
    user_model_config: Optional[ModelConfig] = None,
    usage_metadata_callback: Optional[UsageMetadataCallbackHandler] = None,
    agent_name: str = "comms_agent",
    selected_tool: Optional[str] = None,
    tool_category: Optional[str] = None,
) -> RunnableConfig:
    """
    Build runtime configuration for agent execution.
    
    Returns:
        RunnableConfig with configurable parameters
    """
    return {
        "configurable": {
            "thread_id": conversation_id,
            "user_id": user["user_id"],
            "user_name": user.get("name"),
            "user_email": user.get("email"),
            "user_time": user_time.isoformat(),
            "user_timezone": user.get("timezone", "UTC"),
            "agent_name": agent_name,
            "selected_tool": selected_tool,
            "tool_category": tool_category,
            "model": user_model_config.model_name if user_model_config else None,
        },
        "callbacks": [usage_metadata_callback] if usage_metadata_callback else [],
    }

Accessing Configuration in Nodes

from langchain_core.runnables import RunnableConfig

def my_node(state: State, config: RunnableConfig) -> dict:
    """Node that uses runtime configuration."""
    
    # Access configurable parameters
    user_id = config["configurable"]["user_id"]
    user_timezone = config["configurable"]["user_timezone"]
    selected_tool = config["configurable"].get("selected_tool")
    
    # Use in logic...
    return {}

State Persistence

States are persisted using checkpointers:

PostgreSQL Checkpointer

# Location: apps/api/app/agents/core/graph_builder/checkpointer_manager.py
from langgraph.checkpoint.postgres import PostgresSaver

class CheckpointerManager:
    def __init__(self, pool):
        self.pool = pool
    
    def get_checkpointer(self):
        """Get PostgreSQL checkpointer for production use."""
        return PostgresSaver(pool=self.pool)

# Usage in graph compilation
checkpointer = checkpointer_manager.get_checkpointer()
graph = builder.compile(
    checkpointer=checkpointer,
    store=store
)

State Snapshots

Checkpointers save state snapshots after each node:
# Get state snapshot
snapshot = await graph.aget_state(config)

print(snapshot.values)      # Current state values
print(snapshot.next)        # Next nodes to execute
print(snapshot.metadata)    # Checkpoint metadata

Resuming from Checkpoints

# Resume conversation from saved state
config = {
    "configurable": {
        "thread_id": "conversation_123",
        "user_id": "user_456",
    }
}

# Agent automatically loads previous state
result = await graph.ainvoke(
    {"messages": [HumanMessage(content="Continue from where we left off")]},
    config=config
)

State Streaming

GAIA streams state updates to the frontend in real-time:
# Location: apps/api/app/helpers/agent_helpers.py
async def execute_graph_streaming(
    graph,
    initial_state: dict,
    config: RunnableConfig,
) -> AsyncGenerator[str, None]:
    """
    Execute graph and stream state updates as SSE events.
    
    Yields:
        SSE-formatted strings with state updates
    """
    async for chunk in graph.astream(initial_state, config, stream_mode="updates"):
        # Stream messages
        if "messages" in chunk:
            for msg in chunk["messages"]:
                if isinstance(msg, AIMessage):
                    yield f"data: {json.dumps({'content': msg.content})}\n\n"
        
        # Stream tool calls
        if "tool_calls" in chunk:
            yield f"data: {json.dumps({'tool_calls': chunk['tool_calls']})}\n\n"
    
    yield "data: [DONE]\n\n"

Stream Modes

LangGraph supports multiple stream modes:
  • updates: Stream node outputs as they complete
  • values: Stream full state after each node
  • messages: Stream only message updates
  • debug: Stream all internal events
# Stream node updates
async for chunk in graph.astream(state, config, stream_mode="updates"):
    print(f"Node update: {chunk}")

# Stream full state snapshots
async for chunk in graph.astream(state, config, stream_mode="values"):
    print(f"Current state: {chunk}")

Memory Integration

State integrates with Mem0 for persistent memory:
# Store user message in memory (background task)
if user_id and request.message:
    task = asyncio.create_task(
        store_user_message_memory(
            user_id,
            request.message,
            conversation_id
        )
    )
    _background_tasks.add(task)
    task.add_done_callback(_background_tasks.discard)

# Retrieve memories for context
memories = await memory_service.search_memories(
    query=state.query,
    user_id=state.mem0_user_id,
    limit=5
)

# Add to state
state["memories"] = [m.content for m in memories.memories]

DictLikeModel Pattern

GAIA’s State class extends DictLikeModel for compatibility:
# Location: apps/api/app/agents/core/state.py:10
class DictLikeModel(BaseModel, MutableMapping):
    """Pydantic model that behaves like a dictionary."""
    
    def __getitem__(self, key):
        return getattr(self, key)
    
    def __setitem__(self, key, value):
        setattr(self, key, value)
    
    def __delitem__(self, key):
        delattr(self, key)
    
    def __len__(self):
        return len(self.__dict__)
This allows both attribute and dictionary access:
state = State(query="Hello")

# Attribute access
state.query  # "Hello"

# Dictionary access
state["query"]  # "Hello"

# Both work interchangeably
state.messages = [HumanMessage(content="Hi")]
state["messages"]  # Same list
The DictLikeModel pattern is necessary because LangGraph expects state to be dictionary-like, while Pydantic models provide type validation and IDE support.

Best Practices

1. Keep State Minimal

Only store what’s necessary for decision-making:
# Good: Minimal state
class State(DictLikeModel):
    messages: List[AnyMessage]
    user_id: str

# Avoid: Storing large computed values
class State(DictLikeModel):
    messages: List[AnyMessage]
    all_user_emails: List[dict]  # Should be fetched on-demand

2. Use Reducers for Lists

Always annotate list fields with appropriate reducers:
from langgraph.graph import add_messages
from typing import Annotated

class State(DictLikeModel):
    # Messages auto-merge
    messages: Annotated[List[AnyMessage], add_messages]
    
    # Memories replace (no reducer)
    memories: List[str] = Field(default_factory=list)

3. Initialize with Defaults

Always provide default values:
class State(DictLikeModel):
    messages: List[AnyMessage] = Field(default_factory=list)
    memories: List[str] = Field(default_factory=list)
    memories_stored: bool = False  # Explicit default

4. Document State Fields

Use docstrings for clarity:
class State(DictLikeModel):
    """Agent state with conversation context."""
    
    query: str = ""
    """Current user query being processed."""
    
    memories: List[str] = Field(default_factory=list)
    """Relevant memories retrieved from Mem0."""

Next Steps

Build docs developers (and LLMs) love