State Management
GAIA’s agent system uses LangGraph’s state management to maintain conversation context, tool outputs, and user preferences across agent invocations.
State Schema
The core state schema is defined using Pydantic:
# Location: apps/api/app/agents/core/state.py:24
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
class State(DictLikeModel):
"""Agent state schema with message history and context."""
query: str = ""
"""Current user query/message."""
messages: Annotated[List[AnyMessage], add_messages] = Field(default_factory=list)
"""Conversation message history with automatic merging."""
current_datetime: Optional[str] = None
"""User's current datetime in their timezone."""
mem0_user_id: Optional[str] = None
"""User ID for Mem0 memory storage."""
memories: List[str] = Field(default_factory=list)
"""Retrieved memories relevant to current conversation."""
memories_stored: bool = False
"""Flag indicating if memories have been stored this turn."""
conversation_id: Optional[str] = None
"""Thread/conversation identifier for state persistence."""
Message Annotations
The messages field uses LangGraph’s add_messages reducer:
from langgraph.graph import add_messages
messages: Annotated[List[AnyMessage], add_messages]
This annotation provides automatic message merging:
- Appending: New messages are added to the list
- Updating: Messages with matching IDs are updated
- Deduplication: Prevents duplicate messages
Message Types
from langchain_core.messages import (
HumanMessage, # User messages
AIMessage, # Agent responses
SystemMessage, # System prompts
ToolMessage, # Tool execution results
FunctionMessage, # Legacy function calls
)
# Example: Adding a human message
state["messages"].append(
HumanMessage(content="Schedule a meeting for tomorrow")
)
# Example: AI response with tool calls
state["messages"].append(
AIMessage(
content="I'll check your calendar",
tool_calls=[{
"id": "call_123",
"name": "CUSTOM_FETCH_EVENTS",
"args": {"time_min": "2024-01-01T00:00:00Z"}
}]
)
)
# Example: Tool result
state["messages"].append(
ToolMessage(
content='{"events": [...]}',
tool_call_id="call_123"
)
)
State Initialization
States are initialized when agents are invoked:
# Location: apps/api/app/helpers/agent_helpers.py
def build_initial_state(
request: MessageRequestWithHistory,
user_id: str,
conversation_id: str,
history: List[AnyMessage],
trigger_context: Optional[dict] = None,
) -> dict:
"""
Build initial state for agent invocation.
Args:
request: User request with message and context
user_id: User identifier
conversation_id: Thread identifier
history: Previous conversation messages
trigger_context: Optional workflow trigger context
Returns:
Initial state dictionary
"""
return {
"query": request.message,
"messages": history,
"current_datetime": datetime.now().isoformat(),
"mem0_user_id": user_id,
"conversation_id": conversation_id,
"memories": [],
"memories_stored": False,
}
State Updates
Nodes update state by returning dictionaries:
def my_node(state: State) -> dict:
"""Node that updates messages and memories."""
# Access current state
current_messages = state.messages
user_id = state.mem0_user_id
# Perform operations...
memories = search_memories(user_id, state.query)
# Return state updates
return {
"memories": memories,
"messages": [SystemMessage(content="Found 3 relevant memories")]
}
Partial Updates
You only need to return fields that changed:
def trim_messages_node(state: State) -> dict:
"""Only update messages field."""
trimmed = state.messages[-10:] # Keep last 10
return {"messages": trimmed}
Accumulation vs. Replacement
The add_messages reducer appends by default:
# Initial state
state = {"messages": [HumanMessage(content="Hello")]}
# Node returns
return {"messages": [AIMessage(content="Hi there")]}
# Result: Both messages are kept
state["messages"] # [HumanMessage(...), AIMessage(...)]
To replace messages, return the full list:
def filter_messages_node(state: State) -> dict:
"""Replace messages with filtered version."""
filtered = [msg for msg in state.messages if len(msg.content) < 10000]
return {"messages": filtered}
Agent Configuration
Runtime configuration is passed via RunnableConfig:
# Location: apps/api/app/helpers/agent_helpers.py
def build_agent_config(
conversation_id: str,
user: dict,
user_time: datetime,
user_model_config: Optional[ModelConfig] = None,
usage_metadata_callback: Optional[UsageMetadataCallbackHandler] = None,
agent_name: str = "comms_agent",
selected_tool: Optional[str] = None,
tool_category: Optional[str] = None,
) -> RunnableConfig:
"""
Build runtime configuration for agent execution.
Returns:
RunnableConfig with configurable parameters
"""
return {
"configurable": {
"thread_id": conversation_id,
"user_id": user["user_id"],
"user_name": user.get("name"),
"user_email": user.get("email"),
"user_time": user_time.isoformat(),
"user_timezone": user.get("timezone", "UTC"),
"agent_name": agent_name,
"selected_tool": selected_tool,
"tool_category": tool_category,
"model": user_model_config.model_name if user_model_config else None,
},
"callbacks": [usage_metadata_callback] if usage_metadata_callback else [],
}
Accessing Configuration in Nodes
from langchain_core.runnables import RunnableConfig
def my_node(state: State, config: RunnableConfig) -> dict:
"""Node that uses runtime configuration."""
# Access configurable parameters
user_id = config["configurable"]["user_id"]
user_timezone = config["configurable"]["user_timezone"]
selected_tool = config["configurable"].get("selected_tool")
# Use in logic...
return {}
State Persistence
States are persisted using checkpointers:
PostgreSQL Checkpointer
# Location: apps/api/app/agents/core/graph_builder/checkpointer_manager.py
from langgraph.checkpoint.postgres import PostgresSaver
class CheckpointerManager:
def __init__(self, pool):
self.pool = pool
def get_checkpointer(self):
"""Get PostgreSQL checkpointer for production use."""
return PostgresSaver(pool=self.pool)
# Usage in graph compilation
checkpointer = checkpointer_manager.get_checkpointer()
graph = builder.compile(
checkpointer=checkpointer,
store=store
)
State Snapshots
Checkpointers save state snapshots after each node:
# Get state snapshot
snapshot = await graph.aget_state(config)
print(snapshot.values) # Current state values
print(snapshot.next) # Next nodes to execute
print(snapshot.metadata) # Checkpoint metadata
Resuming from Checkpoints
# Resume conversation from saved state
config = {
"configurable": {
"thread_id": "conversation_123",
"user_id": "user_456",
}
}
# Agent automatically loads previous state
result = await graph.ainvoke(
{"messages": [HumanMessage(content="Continue from where we left off")]},
config=config
)
State Streaming
GAIA streams state updates to the frontend in real-time:
# Location: apps/api/app/helpers/agent_helpers.py
async def execute_graph_streaming(
graph,
initial_state: dict,
config: RunnableConfig,
) -> AsyncGenerator[str, None]:
"""
Execute graph and stream state updates as SSE events.
Yields:
SSE-formatted strings with state updates
"""
async for chunk in graph.astream(initial_state, config, stream_mode="updates"):
# Stream messages
if "messages" in chunk:
for msg in chunk["messages"]:
if isinstance(msg, AIMessage):
yield f"data: {json.dumps({'content': msg.content})}\n\n"
# Stream tool calls
if "tool_calls" in chunk:
yield f"data: {json.dumps({'tool_calls': chunk['tool_calls']})}\n\n"
yield "data: [DONE]\n\n"
Stream Modes
LangGraph supports multiple stream modes:
updates: Stream node outputs as they complete
values: Stream full state after each node
messages: Stream only message updates
debug: Stream all internal events
# Stream node updates
async for chunk in graph.astream(state, config, stream_mode="updates"):
print(f"Node update: {chunk}")
# Stream full state snapshots
async for chunk in graph.astream(state, config, stream_mode="values"):
print(f"Current state: {chunk}")
Memory Integration
State integrates with Mem0 for persistent memory:
# Store user message in memory (background task)
if user_id and request.message:
task = asyncio.create_task(
store_user_message_memory(
user_id,
request.message,
conversation_id
)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Retrieve memories for context
memories = await memory_service.search_memories(
query=state.query,
user_id=state.mem0_user_id,
limit=5
)
# Add to state
state["memories"] = [m.content for m in memories.memories]
DictLikeModel Pattern
GAIA’s State class extends DictLikeModel for compatibility:
# Location: apps/api/app/agents/core/state.py:10
class DictLikeModel(BaseModel, MutableMapping):
"""Pydantic model that behaves like a dictionary."""
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)
def __delitem__(self, key):
delattr(self, key)
def __len__(self):
return len(self.__dict__)
This allows both attribute and dictionary access:
state = State(query="Hello")
# Attribute access
state.query # "Hello"
# Dictionary access
state["query"] # "Hello"
# Both work interchangeably
state.messages = [HumanMessage(content="Hi")]
state["messages"] # Same list
The DictLikeModel pattern is necessary because LangGraph expects state to be dictionary-like, while Pydantic models provide type validation and IDE support.
Best Practices
1. Keep State Minimal
Only store what’s necessary for decision-making:
# Good: Minimal state
class State(DictLikeModel):
messages: List[AnyMessage]
user_id: str
# Avoid: Storing large computed values
class State(DictLikeModel):
messages: List[AnyMessage]
all_user_emails: List[dict] # Should be fetched on-demand
2. Use Reducers for Lists
Always annotate list fields with appropriate reducers:
from langgraph.graph import add_messages
from typing import Annotated
class State(DictLikeModel):
# Messages auto-merge
messages: Annotated[List[AnyMessage], add_messages]
# Memories replace (no reducer)
memories: List[str] = Field(default_factory=list)
3. Initialize with Defaults
Always provide default values:
class State(DictLikeModel):
messages: List[AnyMessage] = Field(default_factory=list)
memories: List[str] = Field(default_factory=list)
memories_stored: bool = False # Explicit default
4. Document State Fields
Use docstrings for clarity:
class State(DictLikeModel):
"""Agent state with conversation context."""
query: str = ""
"""Current user query being processed."""
memories: List[str] = Field(default_factory=list)
"""Relevant memories retrieved from Mem0."""
Next Steps