The KuzuDriver class provides connectivity to Kuzu, an embedded graph database optimized for analytical workloads.
Overview
Kuzu is an embeddable graph database designed for analytical queries and graph analytics. The KuzuDriver provides:
Embedded database (no separate server required)
In-memory or disk-based storage
Explicit schema with predefined node and edge types
Concurrent query execution
Zero-configuration setup for development
Constructor
from graphiti_core.driver.kuzu_driver import KuzuDriver
driver = KuzuDriver(
db = ":memory:" ,
max_concurrent_queries = 1
)
Parameters
The database path or :memory: for in-memory storage. Options:
":memory:" - In-memory database (lost on process exit)
"/path/to/db" - Persistent disk-based database
Example: "./data/graphiti.kuzu" or ":memory:"
Maximum number of concurrent queries allowed. Kuzu supports concurrent reads but serializes writes. Set higher for read-heavy workloads. Example: 4 for multi-threaded read queries
Methods
execute_query
Execute a Cypher query and return results.
result, _, _ = await driver.execute_query(
"MATCH (n:Entity) RETURN n LIMIT 10"
)
Parameters:
cypher_query_ (str) - The Cypher query to execute
**kwargs - Query parameters (None values are filtered out)
Returns: tuple[list[dict] | list[list[dict]], None, None] - Records as dictionaries
Kuzu does not support the database_ and routing_ parameters. These are automatically removed from kwargs.
session
Create a new database session.
session = driver.session()
try :
result = await session.run( "MATCH (n) RETURN count(n)" )
finally :
await session.close()
Parameters:
_database (str | None) - Ignored for Kuzu (single database per driver)
Returns: KuzuDriverSession - Session object
close
Close the driver connection.
Returns: None
Kuzu relies on garbage collection for cleanup. The close method is a no-op but is provided for interface compatibility.
build_indices_and_constraints
Required by the abstract base class, but a no-op for Kuzu.
await driver.build_indices_and_constraints( delete_existing = False )
Parameters:
delete_existing (bool) - Ignored for Kuzu
Returns: None
Kuzu uses a predefined schema created during initialization. Indices are not dynamically created like Neo4j or FalkorDB.
setup_schema
Initialize the Kuzu database schema.
Returns: None
This method is automatically called during driver initialization. You typically don’t need to call it manually.
Properties
provider
driver.provider # GraphProvider.KUZU
Returns the GraphProvider.KUZU enum value.
Operations Properties
The driver exposes specialized operation interfaces:
driver.entity_node_ops # KuzuEntityNodeOperations
driver.episode_node_ops # KuzuEpisodeNodeOperations
driver.community_node_ops # KuzuCommunityNodeOperations
driver.saga_node_ops # KuzuSagaNodeOperations
driver.entity_edge_ops # KuzuEntityEdgeOperations
driver.episodic_edge_ops # KuzuEpisodicEdgeOperations
driver.community_edge_ops # KuzuCommunityEdgeOperations
driver.has_episode_edge_ops # KuzuHasEpisodeEdgeOperations
driver.next_episode_edge_ops # KuzuNextEpisodeEdgeOperations
driver.search_ops # KuzuSearchOperations
driver.graph_ops # KuzuGraphMaintenanceOperations
Kuzu Schema
Kuzu requires an explicit schema defined upfront. The KuzuDriver automatically creates:
Node Tables
Episodic - Episode nodes with content and metadata
Entity - Entity nodes with embeddings and attributes
Community - Community nodes with embeddings and summaries
Saga - Saga nodes for grouping episodes
RelatesToNode_ - Helper nodes for edge properties (Kuzu workaround)
Relationship Tables
RELATES_TO - Entity relationships (via RelatesToNode_)
MENTIONS - Episodic mentions of entities
HAS_MEMBER - Community membership
HAS_EPISODE - Saga to episode links
NEXT_EPISODE - Sequential episode ordering
Kuzu does not support fulltext indices on edge properties. The driver works around this by representing entity edges as: (Entity)-[:RELATES_TO]->(RelatesToNode_)-[:RELATES_TO]->(Entity)
Usage Examples
In-Memory Database
import asyncio
from graphiti_core.driver.kuzu_driver import KuzuDriver
async def main ():
# Create in-memory database
driver = KuzuDriver( db = ":memory:" )
try :
# Execute a query
result, _, _ = await driver.execute_query(
"MATCH (n:Entity) RETURN count(n) as count"
)
print ( f "Entity count: { result[ 0 ][ 'count' ] } " )
finally :
await driver.close()
asyncio.run(main())
Persistent Database
from pathlib import Path
from graphiti_core.driver.kuzu_driver import KuzuDriver
# Create database directory
db_path = Path( "./data/graphiti.kuzu" )
db_path.parent.mkdir( parents = True , exist_ok = True )
# Initialize with persistent storage
driver = KuzuDriver( db = str (db_path))
try :
# Data persists across runs
result, _, _ = await driver.execute_query(
"CREATE (e:Entity {uuid: 'entity-1', name: 'Example'})"
)
finally :
await driver.close()
With Graphiti
from graphiti_core import Graphiti
from graphiti_core.driver.kuzu_driver import KuzuDriver
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
# Create driver
driver = KuzuDriver( db = ":memory:" )
# Initialize Graphiti
graphiti = Graphiti( graph_driver = driver)
# Build schema (no-op for Kuzu, but included for consistency)
await graphiti.build_indices_and_constraints()
# Add an episode
await graphiti.add_episode(
name = "Episode 1" ,
episode_body = "Kamala Harris is the Attorney General of California." ,
source = EpisodeType.text,
source_description = "biographical information" ,
reference_time = datetime.now(timezone.utc)
)
# Search
results = await graphiti.search(
"Who is the California Attorney General?"
)
print (results)
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from graphiti_core import Graphiti
from graphiti_core.driver.kuzu_driver import KuzuDriver
# Setup OpenTelemetry
resource = Resource( attributes = { "service.name" : "graphiti-example" })
provider = TracerProvider( resource = resource)
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
otel_tracer = trace.get_tracer( __name__ )
# Initialize Graphiti with tracing
kuzu_driver = KuzuDriver( db = ":memory:" )
graphiti = Graphiti(
graph_driver = kuzu_driver,
tracer = otel_tracer,
trace_span_prefix = "graphiti.example"
)
# All operations are now traced
await graphiti.add_episode(
name = "Episode 1" ,
episode_body = "Example content" ,
source = EpisodeType.text,
reference_time = datetime.now(timezone.utc)
)
Concurrent Queries
import asyncio
from graphiti_core.driver.kuzu_driver import KuzuDriver
# Enable concurrent queries
driver = KuzuDriver(
db = ":memory:" ,
max_concurrent_queries = 4
)
# Execute queries concurrently (reads only)
results = await asyncio.gather(
driver.execute_query( "MATCH (n:Entity) RETURN count(n)" ),
driver.execute_query( "MATCH (e:Episodic) RETURN count(e)" ),
driver.execute_query( "MATCH (c:Community) RETURN count(c)" ),
driver.execute_query( "MATCH ()-[r:RELATES_TO]->() RETURN count(r)" )
)
for i, (result, _, _) in enumerate (results):
print ( f "Query { i + 1 } : { result } " )
Schema Inspection
from graphiti_core.driver.kuzu_driver import KuzuDriver
driver = KuzuDriver( db = ":memory:" )
# List all node tables
result, _, _ = await driver.execute_query(
"CALL show_tables() RETURN *"
)
print ( "Node tables:" )
for row in result:
print ( f " - { row } " )
# Inspect Entity schema
result, _, _ = await driver.execute_query(
"CALL table_info('Entity') RETURN *"
)
print ( " \n Entity schema:" )
for row in result:
print ( f " { row } " )
Development and Testing
Kuzu is ideal for development and testing:
import pytest
from graphiti_core import Graphiti
from graphiti_core.driver.kuzu_driver import KuzuDriver
@pytest.fixture
async def graphiti ():
"""Create a fresh in-memory Graphiti instance for each test."""
driver = KuzuDriver( db = ":memory:" )
graphiti = Graphiti( graph_driver = driver)
await graphiti.build_indices_and_constraints()
yield graphiti
await driver.close()
async def test_add_episode ( graphiti ):
"""Test adding an episode."""
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
await graphiti.add_episode(
name = "Test Episode" ,
episode_body = "Test content" ,
source = EpisodeType.text,
reference_time = datetime.now(timezone.utc)
)
# Verify episode was added
result, _, _ = await graphiti.driver.execute_query(
"MATCH (e:Episodic) RETURN count(e) as count"
)
assert result[ 0 ][ "count" ] == 1
Limitations
Kuzu has some limitations compared to other drivers:
No dynamic schema changes - Schema is fixed at initialization
No fulltext indices on edges - Workaround uses intermediate nodes
Single database per driver - Cannot switch databases like Neo4j
Limited transaction support - Writes are serialized
No vector indices - Embeddings stored as FLOAT[] arrays
Fast analytical queries - Optimized for graph analytics
Low memory overhead - Efficient in-memory representation
Sequential writes - Writes are serialized, not ideal for high write throughput
Concurrent reads - Multiple queries can read simultaneously
Columnar storage - Efficient for scanning large datasets
When to Use Kuzu
Use KuzuDriver for:
Development and testing
Embedded applications
Analytical workloads
In-memory processing
Single-process deployments
CI/CD pipelines (fast, no server required)
Consider alternatives for:
Production multi-tenant systems → Neo4j or FalkorDB
High write throughput → Neo4j
Cloud deployments → Neptune
Distributed systems → Neo4j
Drivers Overview Learn about the driver architecture
Graphiti Main Graphiti class documentation
Neo4j Driver Production-ready driver with transactions
FalkorDB Driver Redis-based multi-tenant driver