Skip to main content
Integrate Graphiti with LangGraph to build sophisticated agents that maintain persistent memory through knowledge graphs, enabling context-aware conversations and personalized responses.

Overview

LangGraph is LangChain’s library for building stateful, multi-actor applications with LLMs. Combined with Graphiti, you can:
  • Maintain persistent conversation memory across sessions
  • Extract and store structured knowledge from interactions
  • Retrieve relevant context using graph-based search
  • Build agents that learn and adapt over time
  • Personalize responses based on user history

Installation

pip install graphiti-core langchain-openai langgraph

# For Jupyter notebooks
pip install ipywidgets
Ensure you have Neo4j or FalkorDB running:
# Neo4j via Docker
docker compose up

# Or FalkorDB
docker run -p 6379:6379 -p 3000:3000 -it --rm falkordb/falkordb:latest

Quick Start Example

This example demonstrates a sales agent that uses Graphiti to:
  1. Remember user preferences and conversation history
  2. Search product knowledge stored in the graph
  3. Personalize recommendations based on learned information

Initialize Graphiti

import os
from datetime import datetime, timezone
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType

# Configure Graphiti
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.getenv('NEO4J_USER', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD', 'password')

client = Graphiti(neo4j_uri, neo4j_user, neo4j_password)
await client.build_indices_and_constraints()

Load Product Data

Ingest product information into the knowledge graph:
import json
from pathlib import Path

async def ingest_products_data(client: Graphiti):
    # Load product JSON
    with open('products.json') as f:
        products = json.load(f)['products']
    
    # Add each product as an episode
    for i, product in enumerate(products):
        await client.add_episode(
            name=product.get('title', f'Product {i}'),
            episode_body=str({k: v for k, v in product.items() if k != 'images'}),
            source_description='Product catalog',
            source=EpisodeType.json,
            reference_time=datetime.now(timezone.utc),
        )

await ingest_products_data(client)

Create User Node

Establish a user entity in the graph:
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_EPISODE_MENTIONS

user_name = 'jess'

# Add initial user episode
await client.add_episode(
    name='User Creation',
    episode_body=f'{user_name} is interested in buying running shoes',
    source=EpisodeType.text,
    reference_time=datetime.now(timezone.utc),
    source_description='SalesBot',
)

# Get user node UUID for centered searches
node_list = await client._search(user_name, NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
user_node_uuid = node_list.nodes[0].uuid

# Get product brand node UUID
node_list = await client._search('ManyBirds', NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
brand_node_uuid = node_list.nodes[0].uuid

Define Search Tool

Create a LangChain tool for querying product information:
from langchain_core.tools import tool
from graphiti_core.edges import EntityEdge

def edges_to_facts_string(entities: list[EntityEdge]):
    """Convert entity edges to readable facts."""
    return '-' + '\n- '.join([edge.fact for edge in entities])

@tool
async def get_shoe_data(query: str) -> str:
    """Search the knowledge graph for information about shoes."""
    edge_results = await client.search(
        query,
        center_node_uuid=brand_node_uuid,  # Center search on product brand
        num_results=10,
    )
    return edges_to_facts_string(edge_results)

tools = [get_shoe_data]

Build the Agent

from langchain_core.messages import AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from typing import Annotated
from typing_extensions import TypedDict
import asyncio

class State(TypedDict):
    messages: Annotated[list, add_messages]
    user_name: str
    user_node_uuid: str

async def chatbot(state: State):
    """Agent node that retrieves context from Graphiti and generates responses."""
    facts_string = None
    
    if len(state['messages']) > 0:
        last_message = state['messages'][-1]
        query = f'{"SalesBot" if isinstance(last_message, AIMessage) else state["user_name"]}: {last_message.content}'
        
        # Search Graphiti using user's node as center
        # Facts closer to the user node are ranked higher
        edge_results = await client.search(
            query,
            center_node_uuid=state['user_node_uuid'],
            num_results=5
        )
        facts_string = edges_to_facts_string(edge_results)
    
    # Build system message with retrieved facts
    system_message = SystemMessage(
        content=f"""You are a skillful shoe salesperson working for ManyBirds.
        Review information about the user and their conversation below.
        Keep responses short and concise. Always be selling and helpful!
        
        Things you need to know to close a sale:
        - User's shoe size
        - Any special needs (wide feet, arch support, etc.)
        - Preferred colors and styles
        - Budget
        
        Facts about the user:
        {facts_string or 'No facts about the user yet'}"""
    )
    
    messages = [system_message] + state['messages']
    response = await llm.ainvoke(messages)
    
    # Asynchronously persist interaction to Graphiti
    asyncio.create_task(
        client.add_episode(
            name='Chatbot Response',
            episode_body=f'{state["user_name"]}: {state["messages"][-1].content}\nSalesBot: {response.content}',
            source=EpisodeType.message,
            reference_time=datetime.now(timezone.utc),
            source_description='Chatbot',
        )
    )
    
    return {'messages': [response]}

# Initialize LLM with tools
llm = ChatOpenAI(model='gpt-4.1-mini', temperature=0).bind_tools(tools)
tool_node = ToolNode(tools)

# Define conditional logic
async def should_continue(state, config):
    last_message = state['messages'][-1]
    return 'end' if not last_message.tool_calls else 'continue'

# Build the graph
graph_builder = StateGraph(State)
memory = MemorySaver()

graph_builder.add_node('agent', chatbot)
graph_builder.add_node('tools', tool_node)

graph_builder.add_edge(START, 'agent')
graph_builder.add_conditional_edges(
    'agent',
    should_continue,
    {'continue': 'tools', 'end': END}
)
graph_builder.add_edge('tools', 'agent')

graph = graph_builder.compile(checkpointer=memory)

Run the Agent

import uuid

# Single interaction
response = await graph.ainvoke(
    {
        'messages': [{
            'role': 'user',
            'content': 'What sizes do the Wool Runners come in?'
        }],
        'user_name': user_name,
        'user_node_uuid': user_node_uuid,
    },
    config={'configurable': {'thread_id': uuid.uuid4().hex}}
)

print(response['messages'][-1].content)

Interactive Agent Loop

config = {'configurable': {'thread_id': uuid.uuid4().hex}}
user_state = {'user_name': user_name, 'user_node_uuid': user_node_uuid}

print("Agent: Hello! How can I help you find shoes today?")

while True:
    user_input = input("You: ")
    if user_input.lower() in ['exit', 'quit', 'bye']:
        print("Agent: Thank you for chatting! Goodbye!")
        break
    
    graph_state = {
        'messages': [{'role': 'user', 'content': user_input}],
        'user_name': user_state['user_name'],
        'user_node_uuid': user_state['user_node_uuid'],
    }
    
    async for event in graph.astream(graph_state, config=config):
        for value in event.values():
            if 'messages' in value:
                last_message = value['messages'][-1]
                if isinstance(last_message, AIMessage) and isinstance(last_message.content, str):
                    print(f"Agent: {last_message.content}")

Key Integration Patterns

1. Context Retrieval

Use Graphiti’s centered search to retrieve relevant facts:
# Center search on user node for personalized context
user_context = await client.search(
    query=user_message,
    center_node_uuid=user_node_uuid,
    num_results=5
)

# Center search on domain entities for factual information
product_info = await client.search(
    query=product_query,
    center_node_uuid=product_category_uuid,
    num_results=10
)

2. Asynchronous Episode Persistence

Avoid blocking agent responses by persisting episodes asynchronously:
import asyncio

# Don't await - fire and forget
asyncio.create_task(
    client.add_episode(
        name='Interaction',
        episode_body=conversation_text,
        source=EpisodeType.message,
        reference_time=datetime.now(timezone.utc),
    )
)

3. Dual Memory Strategy

  • LangGraph MemorySaver: Short-term conversation state within a session
  • Graphiti: Long-term knowledge persistence across sessions
# LangGraph manages immediate conversation flow
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

# Graphiti stores extractable knowledge
await client.add_episode(...)  # Persisted facts

4. Multi-Agent Collaboration

Share knowledge between multiple agents via Graphiti:
# Sales agent adds customer preference
await client.add_episode(
    name='Customer Preference',
    episode_body='Customer prefers blue shoes',
    group_id='customer-123',
    source=EpisodeType.text,
)

# Support agent retrieves the preference later
context = await client.search(
    'customer preferences',
    group_id='customer-123',
    num_results=5
)

Advanced Patterns

Dynamic Tool Generation

Generate tools based on graph entities:
from langchain_core.tools import tool

def create_category_search_tool(category_name: str, category_uuid: str):
    @tool
    async def search_category(query: str) -> str:
        f"""Search for {category_name} information."""
        results = await client.search(
            query,
            center_node_uuid=category_uuid,
            num_results=10
        )
        return edges_to_facts_string(results)
    
    return search_category

# Create tools for each category
categories = [
    ('Running Shoes', 'uuid-running'),
    ('Hiking Boots', 'uuid-hiking'),
    ('Casual Sneakers', 'uuid-casual'),
]

tools = [create_category_search_tool(name, uuid) for name, uuid in categories]

Temporal Context

Leverage Graphiti’s temporal awareness:
from datetime import timedelta

# Search for recent interactions
recent_time = datetime.now(timezone.utc) - timedelta(days=7)
recent_context = await client.search(
    query='customer interactions',
    reference_time=recent_time,
    num_results=10
)

# System message with temporal context
system_message = SystemMessage(
    content=f"""Recent customer activity (last 7 days):
    {edges_to_facts_string(recent_context)}
    
    Acknowledge any recent purchases or concerns."""
)

Structured Output Integration

Combine Graphiti with LangChain’s structured output:
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser

class CustomerProfile(BaseModel):
    shoe_size: int | None = Field(description="Customer's shoe size")
    preferred_colors: list[str] = Field(description="Preferred colors")
    budget_range: str | None = Field(description="Budget range")
    special_needs: list[str] = Field(description="Special requirements")

parser = PydanticOutputParser(pydantic_object=CustomerProfile)

# Extract structured data from graph context
edges = await client.search(f'customer profile for {user_name}', num_results=10)
facts = edges_to_facts_string(edges)

prompt = f"""Extract customer profile from these facts:
{facts}

{parser.get_format_instructions()}
"""

response = await llm.ainvoke(prompt)
profile = parser.parse(response.content)

Visualizing the Knowledge Graph

After interactions, the knowledge graph captures relationships:
// View user's preferences in Neo4j Browser
MATCH (u:Entity {name: 'jess'})-[r:RELATES_TO]->(n)
RETURN u, r, n
LIMIT 25
Example graph structure:
(jess:Entity)-[:INTERESTED_IN]->(Running Shoes:Entity)
(jess)-[:PREFERS]->(Size 10:Entity)
(jess)-[:LIKES]->(Blue Color:Entity)
(Running Shoes)-[:HAS_FEATURE]->(Lightweight:Entity)
(Running Shoes)-[:MANUFACTURED_BY]->(ManyBirds:Entity)

Best Practices

1. Scope Your Searches

Use center_node_uuid to focus retrieval:
# Personalized search centered on user
user_facts = await client.search(
    query,
    center_node_uuid=user_node_uuid,
    num_results=5
)

# Domain search centered on topic
domain_facts = await client.search(
    query,
    center_node_uuid=domain_node_uuid,
    num_results=10
)

2. Use Group IDs for Multi-Tenancy

# Separate graphs per customer
await client.add_episode(
    name='Interaction',
    episode_body=content,
    group_id=f'customer-{customer_id}',
    source=EpisodeType.text,
)

# Search within customer's graph
results = await client.search(
    query,
    group_id=f'customer-{customer_id}',
    num_results=10
)

3. Balance Concurrency

Adjust SEMAPHORE_LIMIT based on LLM tier:
# For development/low-tier APIs
export SEMAPHORE_LIMIT=5

# For production/high-tier APIs
export SEMAPHORE_LIMIT=20

4. Implement Error Handling

async def safe_add_episode(content: str):
    try:
        await client.add_episode(
            name='Interaction',
            episode_body=content,
            source=EpisodeType.text,
            reference_time=datetime.now(timezone.utc),
        )
    except Exception as e:
        logger.error(f"Failed to persist episode: {e}")
        # Continue agent operation even if persistence fails

Example: Multi-Agent Customer Service

class CustomerServiceState(TypedDict):
    messages: Annotated[list, add_messages]
    customer_id: str
    customer_node_uuid: str
    current_agent: str  # 'sales', 'support', 'billing'

async def route_to_agent(state: CustomerServiceState):
    """Route to appropriate agent based on intent."""
    last_message = state['messages'][-1].content
    
    # Retrieve customer context
    context = await client.search(
        f"customer {state['customer_id']} {last_message}",
        center_node_uuid=state['customer_node_uuid'],
        num_results=10
    )
    
    # Determine routing based on context
    intent_prompt = f"""Given this customer context:
    {edges_to_facts_string(context)}
    
    And their message: {last_message}
    
    Route to: sales, support, or billing
    """
    
    routing = await llm.ainvoke(intent_prompt)
    return routing.content.lower()

# Build multi-agent graph
graph_builder = StateGraph(CustomerServiceState)
graph_builder.add_node('router', route_to_agent)
graph_builder.add_node('sales', sales_agent)
graph_builder.add_node('support', support_agent)
graph_builder.add_node('billing', billing_agent)

graph_builder.add_edge(START, 'router')
graph_builder.add_conditional_edges(
    'router',
    lambda s: s['current_agent'],
    {
        'sales': 'sales',
        'support': 'support',
        'billing': 'billing'
    }
)

LangSmith Integration

Trace agent execution with LangSmith:
import os

# Enable tracing
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_PROJECT'] = 'Graphiti-LangGraph-Agent'
os.environ['LANGCHAIN_API_KEY'] = 'your-langsmith-key'

# Run agent - traces automatically captured
await graph.ainvoke(...)

Resources

Next Steps

Build docs developers (and LLMs) love