Skip to main content
Graphiti is designed for high-performance knowledge graph operations. This guide covers configuration, optimization, and scaling strategies for production deployments.

Concurrency Control

The most critical performance setting is SEMAPHORE_LIMIT, which controls concurrent episode processing.

Understanding SEMAPHORE_LIMIT

Graphiti’s ingestion pipelines are highly concurrent. SEMAPHORE_LIMIT determines how many episodes can be processed simultaneously. Each episode involves multiple LLM calls:
  • Entity extraction (2-3 calls)
  • Entity deduplication (1-2 calls)
  • Fact extraction (2-3 calls)
  • Summarization (1-2 calls)
Actual concurrent LLM requests = SEMAPHORE_LIMIT × 6-10

Default Configuration

# In graphiti_core/helpers.py
SEMAPHORE_LIMIT = int(os.getenv('SEMAPHORE_LIMIT', 20))
The default is conservative. For MCP server and production deployments:
# MCP server default (more conservative)
SEMAPHORE_LIMIT=10

# Core library default
SEMAPHORE_LIMIT=20

Tuning by LLM Provider

OpenAI

# Tier 1 (free): 3 RPM → very limited
export SEMAPHORE_LIMIT=1

# Tier 2: 60 RPM
export SEMAPHORE_LIMIT=5

# Tier 3: 500 RPM (most common paid tier)
export SEMAPHORE_LIMIT=10

# Tier 4: 5,000 RPM
export SEMAPHORE_LIMIT=30

# Tier 5: 10,000+ RPM
export SEMAPHORE_LIMIT=50

Anthropic

# Default tier: 50 RPM
export SEMAPHORE_LIMIT=5

# Mid tier: 500 RPM
export SEMAPHORE_LIMIT=15

# High tier: 1,000+ RPM
export SEMAPHORE_LIMIT=30

Azure OpenAI

# Check your quota in Azure Portal
# Start conservative and monitor
export SEMAPHORE_LIMIT=10

# Scale up gradually based on quota
export SEMAPHORE_LIMIT=20  # For higher quotas

Ollama (Local LLM)

# Depends on hardware (CPU/GPU)
export SEMAPHORE_LIMIT=2  # Conservative for CPU-only
export SEMAPHORE_LIMIT=5  # For GPU acceleration

# Monitor resource usage and adjust

Groq

# Groq offers very high throughput
export SEMAPHORE_LIMIT=30

# Can go higher with premium access
export SEMAPHORE_LIMIT=50

Symptoms of Misconfiguration

Too High:
  • 429 rate limit errors in logs
  • Increased API costs from retries
  • Memory pressure from queued operations
  • Inconsistent response times
Too Low:
  • Slow episode ingestion
  • Underutilized API quota
  • Poor throughput
  • Long processing queues

Monitoring and Adjustment

import logging
import time

logger = logging.getLogger('graphiti')

# Monitor episode processing time
start = time.time()
await graphiti.add_episode(...)
elapsed = time.time() - start

logger.info(f"Episode processed in {elapsed:.2f}s")

# Track 429 errors
try:
    await graphiti.add_episode(...)
except RateLimitError as e:
    logger.warning(f"Rate limit hit: {e}")
    # Consider lowering SEMAPHORE_LIMIT

Dynamic Adjustment

Adjust concurrency at runtime:
from graphiti_core import helpers

# Lower concurrency during high load
helpers.SEMAPHORE_LIMIT = 5

# Increase during off-peak
helpers.SEMAPHORE_LIMIT = 20

Database Optimization

Neo4j Performance

Memory Configuration

Edit neo4j.conf:
# Heap size (general rule: 50% of available RAM, max 32GB)
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=4G

# Page cache (remaining RAM after heap)
dbms.memory.pagecache.size=4G

# Transaction state
dbms.memory.transaction.global_max_size=2G
dbms.memory.transaction.max_size=1G

Index Configuration

Create optimal indices:
// Vector indices for embeddings
CREATE VECTOR INDEX entity_embedding IF NOT EXISTS
FOR (n:Entity)
ON n.name_embedding
OPTIONS {
  indexConfig: {
    `vector.dimensions`: 1536,
    `vector.similarity_function`: 'cosine'
  }
};

// Fulltext search
CREATE FULLTEXT INDEX entity_search IF NOT EXISTS
FOR (n:Entity)
ON EACH [n.name, n.summary];

// Property indices
CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid);
CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id);
CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at);

Query Optimization

Use query plans to identify bottlenecks:
// Profile a search query
PROFILE
MATCH (n:Entity {group_id: $group_id})
WHERE n.name CONTAINS $query
RETURN n
LIMIT 10;

// Look for:
// - Db Hits (lower is better)
// - Index usage (should use indices)
// - Estimated Rows (accuracy)

Connection Pooling

from graphiti_core.driver.neo4j_driver import Neo4jDriver
from neo4j import AsyncGraphDatabase

# Configure pool size
driver = AsyncGraphDatabase.driver(
    uri="bolt://localhost:7687",
    auth=("neo4j", "password"),
    max_connection_pool_size=50,  # Default: 100
    connection_acquisition_timeout=60,  # Seconds
)

neo4j_driver = Neo4jDriver(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password"
)
neo4j_driver.client = driver

FalkorDB Performance

Redis Configuration

Optimize Redis for FalkorDB:
# redis.conf

# Memory
maxmemory 8gb
maxmemory-policy allkeys-lru

# Persistence (adjust based on durability needs)
save 900 1
save 300 10
save 60 10000

# Network
tcp-backlog 511
timeout 0
tcp-keepalive 300

Graph-Specific Settings

from graphiti_core.driver.falkordb_driver import FalkorDriver

driver = FalkorDriver(
    host="localhost",
    port=6379,
    database="graphiti"
)

# FalkorDB automatically optimizes queries
# No manual index creation needed

Kuzu Performance

File System Optimization

from graphiti_core.driver.kuzu_driver import KuzuDriver

# Use fast storage (SSD)
driver = KuzuDriver(
    db="/mnt/nvme/graphiti.kuzu",  # SSD path
    max_concurrent_queries=4  # Adjust based on CPU cores
)

Memory vs Disk Trade-off

# In-memory for speed (loses data on restart)
driver = KuzuDriver(db=":memory:")

# Persistent storage
driver = KuzuDriver(db="/path/to/persistent.kuzu")

Chunking Configuration

Graphiti automatically chunks large episodes to avoid LLM context limits.

Chunking Parameters

# Content chunking (from graphiti_core/helpers.py)
export CHUNK_TOKEN_SIZE=3000  # Default: 3000 tokens per chunk
export CHUNK_OVERLAP_TOKENS=200  # Default: 200 token overlap
export CHUNK_MIN_TOKENS=1000  # Minimum size before chunking

# Entity density threshold
# Chunk if: elements per 1000 tokens > threshold * 1000
export ENTITY_DENSITY_THRESHOLD=0.15

Tuning Guidance

Large documents:
# Process large documents faster (smaller chunks)
export CHUNK_TOKEN_SIZE=2000
export CHUNK_OVERLAP_TOKENS=100
Dense entity extraction:
# More context for entity-rich content
export CHUNK_TOKEN_SIZE=4000
export CHUNK_OVERLAP_TOKENS=300
Cost optimization:
# Larger chunks = fewer LLM calls = lower cost
export CHUNK_TOKEN_SIZE=4000
# But may hit context limits on some models

Embedding Performance

Batch Embeddings

Graphiti batches embedding requests by default:
# In your code, embeddings are automatically batched
await graphiti.add_episode(...)  # Internally batches embeddings

Choose Faster Embedding Models

from graphiti_core.embedder.openai import OpenAIEmbedder, OpenAIEmbedderConfig

# Faster, smaller model
embedder = OpenAIEmbedder(
    config=OpenAIEmbedderConfig(
        embedding_model="text-embedding-3-small",
        embedding_dim=1536
    )
)

graphiti = Graphiti(
    "bolt://localhost:7687",
    "neo4j",
    "password",
    embedder=embedder
)

Local Embeddings

Use local models to eliminate network latency:
from graphiti_core.embedder.sentence_transformers import (
    SentenceTransformerEmbedder,
    SentenceTransformerConfig
)

# Fast local embeddings
embedder = SentenceTransformerEmbedder(
    config=SentenceTransformerConfig(
        model="all-MiniLM-L6-v2",  # Very fast, decent quality
        embedding_dim=384
    )
)

# Or higher quality
embedder = SentenceTransformerEmbedder(
    config=SentenceTransformerConfig(
        model="all-mpnet-base-v2",  # Slower, better quality
        embedding_dim=768
    )
)

Search Performance

Limit Result Counts

# Faster searches with fewer results
results = await graphiti.search(
    query="user preferences",
    num_results=5  # Default: 10, lower is faster
)

Use Centered Searches

# More efficient with center node
results = await graphiti.search(
    query="product info",
    center_node_uuid=user_node_uuid,  # Focuses search
    num_results=10
)

Optimize Search Configuration

from graphiti_core.search.search_config import SearchConfig

# Custom search config
config = SearchConfig(
    num_episodes=3,  # Fewer episodes = faster
    num_results=5,
    max_facts=50,  # Limit fact retrieval
    reranker_weight=0.5
)

results = await graphiti.search(
    query="test",
    config=config
)

Parallel Processing

Enable Parallel Runtime

# Enable parallel processing (experimental)
export USE_PARALLEL_RUNTIME=true
Warning: This is experimental and may cause issues with some LLM providers.

Batch Episode Ingestion

import asyncio

# Process episodes in parallel (respects SEMAPHORE_LIMIT)
episodes = [
    {"name": f"Episode {i}", "content": f"Content {i}"}
    for i in range(100)
]

tasks = [
    graphiti.add_episode(
        name=ep["name"],
        episode_body=ep["content"],
        source=EpisodeType.text
    )
    for ep in episodes
]

# Concurrent execution (limited by SEMAPHORE_LIMIT)
await asyncio.gather(*tasks)

Caching Strategies

LLM Response Caching

Some providers support prompt caching:
# Anthropic prompt caching
from graphiti_core.llm_client.anthropic_client import AnthropicClient, LLMConfig

llm_client = AnthropicClient(
    config=LLMConfig(
        model="claude-4-sonnet-20250514",
        cache_system_messages=True  # Enable caching
    )
)

graphiti = Graphiti(
    "bolt://localhost:7687",
    "neo4j",
    "password",
    llm_client=llm_client
)

Application-Level Caching

from functools import lru_cache
import hashlib

class CachedGraphiti:
    def __init__(self, graphiti):
        self.graphiti = graphiti
        self._search_cache = {}
    
    async def cached_search(self, query: str, num_results: int = 10):
        cache_key = hashlib.md5(f"{query}:{num_results}".encode()).hexdigest()
        
        if cache_key in self._search_cache:
            return self._search_cache[cache_key]
        
        results = await self.graphiti.search(query, num_results=num_results)
        self._search_cache[cache_key] = results
        return results

Monitoring and Profiling

Enable Logging

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('graphiti_core')
logger.setLevel(logging.DEBUG)

Track Metrics

import time
import statistics

class PerformanceTracker:
    def __init__(self):
        self.episode_times = []
        self.search_times = []
    
    async def timed_add_episode(self, graphiti, **kwargs):
        start = time.time()
        await graphiti.add_episode(**kwargs)
        elapsed = time.time() - start
        self.episode_times.append(elapsed)
        return elapsed
    
    def report(self):
        return {
            "avg_episode_time": statistics.mean(self.episode_times),
            "p95_episode_time": statistics.quantiles(self.episode_times, n=20)[18],
            "total_episodes": len(self.episode_times)
        }

tracker = PerformanceTracker()
await tracker.timed_add_episode(graphiti, name="Test", episode_body="Content")
print(tracker.report())

OpenTelemetry Integration

See examples/opentelemetry/ for full instrumentation:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

# Set up tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

# Instrument Graphiti operations
with tracer.start_as_current_span("add_episode"):
    await graphiti.add_episode(...)

Production Deployment

Horizontal Scaling

Deploy multiple Graphiti instances:
# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: graphiti-api
spec:
  replicas: 5  # Scale horizontally
  template:
    spec:
      containers:
      - name: graphiti
        image: zepai/graphiti:latest
        env:
        - name: SEMAPHORE_LIMIT
          value: "15"  # Lower per instance
        - name: NEO4J_URI
          value: "bolt://neo4j-cluster:7687"  # Shared DB
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"

Database Clustering

Neo4j Cluster

# Neo4j Causal Cluster
services:
  neo4j-core-1:
    image: neo4j:5.26-enterprise
    environment:
      - NEO4J_dbms_mode=CORE
      - NEO4J_causal__clustering_initial__discovery__members=neo4j-core-1:5000,neo4j-core-2:5000,neo4j-core-3:5000
  
  neo4j-core-2:
    # ... similar config
  
  neo4j-core-3:
    # ... similar config

Load Balancing

# Round-robin across database replicas
from itertools import cycle

neo4j_uris = [
    "bolt://neo4j-1:7687",
    "bolt://neo4j-2:7687",
    "bolt://neo4j-3:7687",
]

uri_cycle = cycle(neo4j_uris)

def get_driver():
    uri = next(uri_cycle)
    return Neo4jDriver(uri=uri, user="neo4j", password="password")

Performance Benchmarks

Typical performance on modern hardware:
OperationAvg TimeP95 TimeNotes
Add Episode (short)2-5s8sSEMAPHORE_LIMIT=10
Add Episode (long)8-15s25sWith chunking
Search (5 results)200-500ms1sWith indices
Search (20 results)500ms-1s2sWith reranking
Bulk ingest (100 episodes)30-60s90sParallel
Hardware: 8-core CPU, 16GB RAM, SSD, OpenAI Tier 3

Troubleshooting

High Memory Usage

Symptoms: Memory grows unbounded Solutions:
  • Lower SEMAPHORE_LIMIT
  • Reduce CHUNK_TOKEN_SIZE
  • Enable database connection pooling
  • Clear episode queue periodically

Slow Ingestion

Symptoms: Episodes take > 30s to process Solutions:
  • Increase SEMAPHORE_LIMIT (if not hitting rate limits)
  • Use faster embedding model
  • Reduce chunking overhead
  • Check database index health

Rate Limit Errors

Symptoms: 429 errors in logs Solutions:
  • Lower SEMAPHORE_LIMIT
  • Implement exponential backoff
  • Upgrade LLM provider tier
  • Switch to local models (Ollama)

Next Steps

Build docs developers (and LLMs) love