Skip to main content
Graphiti uses a pluggable driver architecture that allows you to integrate any graph database backend. This guide walks through implementing a custom driver from scratch.

Driver Architecture Overview

Graphiti’s driver layer is organized into three tiers:

1. GraphDriver ABC

The core interface (graphiti_core/driver/driver.py) that every backend must implement:
from graphiti_core.driver.driver import GraphDriver, GraphProvider
from abc import abstractmethod
from typing import Any, Coroutine

class GraphDriver(ABC):
    provider: GraphProvider
    
    @abstractmethod
    def execute_query(self, cypher_query: str, **kwargs: Any) -> Coroutine:
        """Execute a query against the graph database."""
        raise NotImplementedError()
    
    @abstractmethod
    def session(self, database: str | None = None) -> GraphDriverSession:
        """Create a new database session."""
        raise NotImplementedError()
    
    @abstractmethod
    async def build_indices_and_constraints(self, delete_existing: bool = False):
        """Build database indices and constraints."""
        raise NotImplementedError()
    
    @abstractmethod
    def close(self):
        """Close the database connection."""
        raise NotImplementedError()

2. GraphProvider Enum

Identifies the backend in query builders:
from enum import Enum

class GraphProvider(Enum):
    NEO4J = 'neo4j'
    FALKORDB = 'falkordb'
    KUZU = 'kuzu'
    NEPTUNE = 'neptune'
    # Add your backend here

3. Operations Interfaces

Eleven abstract base classes covering all CRUD and search operations:
  • Node operations: EntityNodeOperations, EpisodeNodeOperations, CommunityNodeOperations, SagaNodeOperations
  • Edge operations: EntityEdgeOperations, EpisodicEdgeOperations, CommunityEdgeOperations, HasEpisodeEdgeOperations, NextEpisodeEdgeOperations
  • Search & maintenance: SearchOperations, GraphMaintenanceOperations

Building a Custom Driver

Step 1: Add to GraphProvider Enum

Edit graphiti_core/driver/driver.py:
class GraphProvider(Enum):
    NEO4J = 'neo4j'
    FALKORDB = 'falkordb'
    KUZU = 'kuzu'
    NEPTUNE = 'neptune'
    MY_BACKEND = 'my_backend'  # Add your backend

Step 2: Create Directory Structure

mkdir -p graphiti_core/driver/my_backend/operations
touch graphiti_core/driver/my_backend/__init__.py
touch graphiti_core/driver/my_backend/operations/__init__.py
touch graphiti_core/driver/my_backend_driver.py

Step 3: Implement the Driver Class

Create graphiti_core/driver/my_backend_driver.py:
import logging
from typing import Any
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
from graphiti_core.driver.my_backend.operations import (
    MyBackendEntityNodeOperations,
    MyBackendEpisodeNodeOperations,
    MyBackendCommunityNodeOperations,
    MyBackendSagaNodeOperations,
    MyBackendEntityEdgeOperations,
    MyBackendEpisodicEdgeOperations,
    MyBackendCommunityEdgeOperations,
    MyBackendHasEpisodeEdgeOperations,
    MyBackendNextEpisodeEdgeOperations,
    MyBackendSearchOperations,
    MyBackendGraphMaintenanceOperations,
)

logger = logging.getLogger(__name__)


class MyBackendDriver(GraphDriver):
    provider = GraphProvider.MY_BACKEND
    aoss_client: None = None  # Set if using external search (like Neptune)
    
    def __init__(
        self,
        host: str = 'localhost',
        port: int = 7687,
        username: str | None = None,
        password: str | None = None,
        database: str = 'default_db',
    ):
        super().__init__()
        self._database = database
        
        # Initialize your database client
        self.client = MyBackendClient(
            host=host,
            port=port,
            username=username,
            password=password
        )
        
        # Instantiate all 11 operation classes
        self._entity_node_ops = MyBackendEntityNodeOperations()
        self._episode_node_ops = MyBackendEpisodeNodeOperations()
        self._community_node_ops = MyBackendCommunityNodeOperations()
        self._saga_node_ops = MyBackendSagaNodeOperations()
        self._entity_edge_ops = MyBackendEntityEdgeOperations()
        self._episodic_edge_ops = MyBackendEpisodicEdgeOperations()
        self._community_edge_ops = MyBackendCommunityEdgeOperations()
        self._has_episode_edge_ops = MyBackendHasEpisodeEdgeOperations()
        self._next_episode_edge_ops = MyBackendNextEpisodeEdgeOperations()
        self._search_ops = MyBackendSearchOperations()
        self._graph_ops = MyBackendGraphMaintenanceOperations()
    
    # Expose operations via properties
    @property
    def entity_node_ops(self):
        return self._entity_node_ops
    
    @property
    def episode_node_ops(self):
        return self._episode_node_ops
    
    # ... implement all 11 properties
    
    @property
    def search_ops(self):
        return self._search_ops
    
    @property
    def graph_ops(self):
        return self._graph_ops
    
    # Implement abstract methods
    def execute_query(self, cypher_query: str, **kwargs: Any):
        """Execute a query against the database."""
        return self.client.execute(cypher_query, **kwargs)
    
    def session(self, database: str | None = None):
        """Create a database session."""
        db = database or self._database
        return MyBackendDriverSession(self.client, db)
    
    async def build_indices_and_constraints(self, delete_existing: bool = False):
        """Build indices and constraints."""
        if delete_existing:
            await self.delete_all_indexes()
        
        # Get index queries from shared builders
        from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices
        
        fulltext_indices = get_fulltext_indices(self.provider)
        range_indices = get_range_indices(self.provider)
        
        async with self.session() as session:
            for query in fulltext_indices + range_indices:
                await session.run(query)
    
    async def delete_all_indexes(self):
        """Delete all indices."""
        # Implementation depends on your database
        async with self.session() as session:
            await session.run("DROP ALL INDEXES")
    
    def close(self):
        """Close the database connection."""
        self.client.close()

Step 4: Implement GraphDriverSession

Create a session class for connection management:
class MyBackendDriverSession(GraphDriverSession):
    provider = GraphProvider.MY_BACKEND
    
    def __init__(self, client, database: str):
        self.client = client
        self.database = database
        self._session = client.create_session(database)
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        await self.close()
    
    async def run(self, query: str, **kwargs: Any) -> Any:
        """Execute a query in this session."""
        return await self._session.execute(query, **kwargs)
    
    async def close(self):
        """Close the session."""
        await self._session.close()
    
    async def execute_write(self, func, *args, **kwargs):
        """Execute a write transaction."""
        return await func(self, *args, **kwargs)

Step 5: Implement Operations

Implement all 11 operation classes. Here’s an example of EntityNodeOperations:
# graphiti_core/driver/my_backend/operations/entity_node_ops.py

from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
from graphiti_core.driver.query_executor import QueryExecutor, Transaction
from graphiti_core.nodes import EntityNode
from graphiti_core.models.nodes.node_db_queries import build_entity_node_upsert_query


class MyBackendEntityNodeOperations(EntityNodeOperations):
    async def save(
        self,
        executor: QueryExecutor,
        node: EntityNode,
        tx: Transaction | None = None,
    ) -> None:
        """Save an entity node to the database."""
        # Use shared query builder
        from graphiti_core.driver.driver import GraphProvider
        
        query = build_entity_node_upsert_query(GraphProvider.MY_BACKEND)
        
        params = {
            'uuid': node.uuid,
            'name': node.name,
            'group_id': node.group_id,
            'labels': node.labels,
            'created_at': node.created_at,
            'summary': node.summary,
            'name_embedding': node.name_embedding,
        }
        
        if tx:
            await tx.run(query, **params)
        else:
            async with executor.session() as session:
                await session.run(query, **params)
    
    async def get_by_uuid(
        self,
        executor: QueryExecutor,
        uuid: str,
    ) -> EntityNode:
        """Retrieve an entity node by UUID."""
        query = """
        MATCH (n:Entity {uuid: $uuid})
        RETURN n
        """
        
        async with executor.session() as session:
            result = await session.run(query, uuid=uuid)
            # Parse result into EntityNode
            return self._parse_entity_node(result)
    
    def _parse_entity_node(self, result) -> EntityNode:
        """Parse database result into EntityNode."""
        # Implementation depends on result format
        node_data = result[0]['n']
        return EntityNode(
            uuid=node_data['uuid'],
            name=node_data['name'],
            group_id=node_data['group_id'],
            labels=node_data['labels'],
            created_at=node_data['created_at'],
            summary=node_data.get('summary'),
            name_embedding=node_data.get('name_embedding'),
        )
    
    # Implement remaining methods:
    # - save_bulk
    # - delete
    # - delete_by_group_id
    # - delete_by_uuids
    # - get_by_uuids
    # - get_by_group_ids
    # - load_embeddings
    # - load_embeddings_bulk

Step 6: Add Query Variants

Add database-specific query builders in graphiti_core/models/nodes/node_db_queries.py:
def build_entity_node_upsert_query(provider: GraphProvider) -> str:
    """Build entity node upsert query for different providers."""
    match provider:
        case GraphProvider.NEO4J:
            return """
            MERGE (n:Entity {uuid: $uuid})
            ON CREATE SET
                n.name = $name,
                n.group_id = $group_id,
                n.labels = $labels,
                n.created_at = $created_at
            ON MATCH SET
                n.name = $name,
                n.summary = $summary,
                n.name_embedding = $name_embedding
            """
        
        case GraphProvider.MY_BACKEND:
            return """
            -- Your database-specific syntax
            INSERT INTO entities (uuid, name, group_id, labels, created_at, summary)
            VALUES ($uuid, $name, $group_id, $labels, $created_at, $summary)
            ON CONFLICT (uuid) DO UPDATE SET
                name = EXCLUDED.name,
                summary = EXCLUDED.summary
            """
        
        case _:
            raise NotImplementedError(f"Unsupported provider: {provider}")
Do the same for edge queries in graphiti_core/models/edges/edge_db_queries.py.

Step 7: Register Optional Dependency

Add to pyproject.toml:
[project.optional-dependencies]
my_backend = [
    "my-backend-client>=1.0.0",
]

Step 8: Export from init.py

Create graphiti_core/driver/my_backend/operations/__init__.py:
from graphiti_core.driver.my_backend.operations.entity_node_ops import (
    MyBackendEntityNodeOperations,
)
from graphiti_core.driver.my_backend.operations.episode_node_ops import (
    MyBackendEpisodeNodeOperations,
)
# ... export all 11 operation classes

__all__ = [
    'MyBackendEntityNodeOperations',
    'MyBackendEpisodeNodeOperations',
    # ... all 11 classes
]

Real-World Examples

The Neo4j driver is the most straightforward reference implementation:
from graphiti_core.driver.neo4j_driver import Neo4jDriver

driver = Neo4jDriver(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password",
    database="neo4j"
)
Key features:
  • Full transaction support
  • Native Cypher queries
  • Vector index support
  • Constraint management
See: graphiti_core/driver/neo4j_driver.py and graphiti_core/driver/neo4j/operations/

FalkorDB Driver (Lightweight Alternative)

FalkorDB is Redis-based with a different query approach:
from graphiti_core.driver.falkordb_driver import FalkorDriver

driver = FalkorDriver(
    host="localhost",
    port=6379,
    username=None,
    password=None,
    database="default_db"
)
Key differences:
  • Redis-based connection
  • Custom fulltext query syntax (@ prefix)
  • Stopwords filtering
  • No native transaction support
See: graphiti_core/driver/falkordb_driver.py and graphiti_core/driver/falkordb/operations/

Kuzu Driver (Embedded Database)

Kuzu demonstrates an embedded, in-process database:
from graphiti_core.driver.kuzu_driver import KuzuDriver

driver = KuzuDriver(
    db="/tmp/graphiti.kuzu",  # File path or :memory:
    max_concurrent_queries=1
)
Key features:
  • Embedded database (no server)
  • Explicit schema definition
  • Custom edge representation (RelatesToNode_)
  • Schema creation in init
Schema definition in kuzu_driver.py:
SCHEMA_QUERIES = """
    CREATE NODE TABLE IF NOT EXISTS Entity (
        uuid STRING PRIMARY KEY,
        name STRING,
        group_id STRING,
        labels STRING[],
        created_at TIMESTAMP,
        name_embedding FLOAT[],
        summary STRING
    );
    -- ... more tables
"""
See: graphiti_core/driver/kuzu_driver.py and graphiti_core/driver/kuzu/operations/ Neptune shows integration with external search (OpenSearch):
from graphiti_core.driver.neptune_driver import NeptuneDriver

driver = NeptuneDriver(
    host="neptune-endpoint.amazonaws.com",
    aoss_host="opensearch-endpoint.amazonaws.com",
    port=8182,
    aoss_port=443
)
Key features:
  • Cloud-managed graph database
  • External OpenSearch for fulltext
  • AWS IAM authentication
  • Separate search backend
See: graphiti_core/driver/neptune_driver.py and graphiti_core/driver/neptune/operations/

Usage Pattern

Once implemented, use your driver with Graphiti:
from graphiti_core import Graphiti
from graphiti_core.driver.my_backend_driver import MyBackendDriver

# Create driver instance
driver = MyBackendDriver(
    host="localhost",
    port=7687,
    username="user",
    password="password",
    database="my_graph"
)

# Pass to Graphiti
graphiti = Graphiti(graph_driver=driver)

# Build indices
await graphiti.build_indices_and_constraints()

# Use normally
await graphiti.add_episode(
    name="Test Episode",
    episode_body="This is a test",
    source=EpisodeType.text
)

results = await graphiti.search("test query")

Testing Your Driver

Create tests in tests/driver/test_my_backend_driver.py:
import pytest
from graphiti_core.driver.my_backend_driver import MyBackendDriver
from graphiti_core.nodes import EntityNode
from datetime import datetime, timezone


@pytest.fixture
async def driver():
    driver = MyBackendDriver(
        host="localhost",
        port=7687,
        database="test_db"
    )
    await driver.build_indices_and_constraints(delete_existing=True)
    yield driver
    await driver.delete_all_indexes()
    driver.close()


@pytest.mark.asyncio
async def test_entity_node_save_and_retrieve(driver):
    """Test saving and retrieving an entity node."""
    node = EntityNode(
        uuid="test-uuid",
        name="Test Entity",
        group_id="test-group",
        labels=["Person"],
        created_at=datetime.now(timezone.utc)
    )
    
    # Save node
    await driver.entity_node_ops.save(driver, node)
    
    # Retrieve node
    retrieved = await driver.entity_node_ops.get_by_uuid(driver, "test-uuid")
    
    assert retrieved.uuid == node.uuid
    assert retrieved.name == node.name
    assert retrieved.group_id == node.group_id

Best Practices

1. Use Query Builders

Leverage shared query builders instead of hardcoding queries:
from graphiti_core.models.nodes.node_db_queries import build_entity_node_upsert_query

query = build_entity_node_upsert_query(self.provider)

2. Handle Transactions Properly

Implement transaction support if your database supports it:
from contextlib import asynccontextmanager
from graphiti_core.driver.query_executor import Transaction

@asynccontextmanager
async def transaction(self):
    session = self.session()
    tx = await session.begin_transaction()
    try:
        yield tx
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise
    finally:
        await session.close()

3. Parse Results Consistently

Create helper methods for result parsing:
def _parse_entity_node(self, record) -> EntityNode:
    """Parse database record to EntityNode."""
    # Consistent parsing logic
    pass

4. Handle Database Dialects

Account for syntax differences in your query builders:
match provider:
    case GraphProvider.NEO4J:
        return "MERGE (n:Entity {uuid: $uuid})"
    case GraphProvider.MY_BACKEND:
        return "INSERT INTO entities ..."

5. Document Limitations

Clearly document any limitations:
class MyBackendDriver(GraphDriver):
    """
    Driver for MyBackend graph database.
    
    Limitations:
    - No native vector index support (uses approximation)
    - Transactions limited to single session
    - Maximum 10,000 nodes per query
    """

Performance Considerations

Connection Pooling

class MyBackendDriver(GraphDriver):
    def __init__(self, host, port, pool_size=10):
        self.client = MyBackendClient(
            host=host,
            port=port,
            max_pool_size=pool_size  # Connection pooling
        )

Batch Operations

Implement efficient bulk operations:
async def save_bulk(
    self,
    executor: QueryExecutor,
    nodes: list[EntityNode],
    batch_size: int = 100,
) -> None:
    """Save nodes in batches for efficiency."""
    for i in range(0, len(nodes), batch_size):
        batch = nodes[i:i + batch_size]
        query = build_bulk_upsert_query(self.provider)
        params = {'nodes': [n.model_dump() for n in batch]}
        async with executor.session() as session:
            await session.run(query, **params)

Index Optimization

Create appropriate indices:
async def build_indices_and_constraints(self, delete_existing=False):
    indices = [
        "CREATE INDEX entity_uuid ON Entity(uuid)",
        "CREATE INDEX entity_group_id ON Entity(group_id)",
        "CREATE VECTOR INDEX entity_embedding ON Entity(name_embedding)",
    ]
    
    async with self.session() as session:
        for index in indices:
            await session.run(index)

Troubleshooting

Driver Not Found

Error: ImportError: cannot import name 'MyBackendDriver' Solution: Ensure your driver is installed:
pip install graphiti-core[my_backend]

Query Syntax Errors

Error: SyntaxError: Invalid query syntax Solution: Add your provider to query builders:
case GraphProvider.MY_BACKEND:
    return "your-database-specific-query"

Missing Operations

Error: AttributeError: 'MyBackendDriver' object has no attribute 'entity_node_ops' Solution: Implement all 11 operation properties:
@property
def entity_node_ops(self):
    return self._entity_node_ops

Contributing Your Driver

To contribute your driver to the Graphiti project:
  1. Fork the repository
  2. Implement your driver following this guide
  3. Add comprehensive tests
  4. Update documentation
  5. Submit a pull request
See CONTRIBUTING.md for details.

Resources

Next Steps

Build docs developers (and LLMs) love