The NeptuneDriver class provides connectivity to AWS Neptune, a fully managed graph database service with OpenSearch integration.
Overview
AWS Neptune is a managed graph database service that supports both property graph (openCypher) and RDF models. The NeptuneDriver provides:
Support for Neptune Database and Neptune Analytics
OpenSearch Serverless (AOSS) integration for fulltext search
AWS IAM authentication
Managed infrastructure and scaling
Automatic datetime conversion
Constructor
from graphiti_core.driver.neptune_driver import NeptuneDriver
driver = NeptuneDriver(
host = "neptune-db://cluster-endpoint.region.neptune.amazonaws.com" ,
aoss_host = "search-endpoint.region.aoss.amazonaws.com" ,
port = 8182 ,
aoss_port = 443
)
Parameters
The Neptune endpoint URL. Must use one of these URI schemes:
neptune-db://<endpoint> - Neptune Database cluster endpoint
neptune-graph://<graph-id> - Neptune Analytics graph identifier
Example: "neptune-db://my-cluster.us-east-1.neptune.amazonaws.com" or "neptune-graph://g-abc123def456"
The OpenSearch Serverless (AOSS) endpoint hostname. Used for fulltext indexing and search. Must be an AOSS endpoint in the same AWS region. Example: "search-graphiti.us-east-1.aoss.amazonaws.com"
The Neptune Database port. Only used for Neptune Database. Ignored for Neptune Analytics. Default: 8182
The OpenSearch Serverless port. Typically 443 for HTTPS connections. Default: 443
Methods
execute_query
Execute a Cypher query and return results.
result, _, _ = await driver.execute_query(
"MATCH (n:Entity) RETURN n LIMIT 10"
)
Parameters:
cypher_query_ (str | list) - The Cypher query or list of queries to execute
**kwargs - Query parameters (automatically sanitized for Neptune)
Returns: tuple[list[dict], None, None] - Records, None, None
Neptune requires datetime objects to be converted to ISO strings and wrapped with datetime(). The driver handles this automatically.
session
Create a new database session.
session = driver.session()
try :
result = await session.run( "MATCH (n) RETURN count(n)" )
finally :
await session.close()
Parameters:
database (str | None) - Ignored for Neptune (single graph per endpoint)
Returns: NeptuneDriverSession - Session object
close
Close the driver and release connections.
Returns: None
build_indices_and_constraints
Create OpenSearch indices for fulltext search.
await driver.build_indices_and_constraints( delete_existing = False )
Parameters:
delete_existing (bool) - If True, delete existing AOSS indices before creating new ones
Returns: None
Index creation takes approximately 60 seconds. The method includes a sleep to ensure indices are ready before use.
create_aoss_indices
Create OpenSearch Serverless indices.
await driver.create_aoss_indices()
Returns: None
Creates the following indices:
node_name_and_summary - Entity and community node fulltext
community_name - Community name search
episode_content - Episode content search
edge_name_and_fact - Edge relationship search
delete_aoss_indices
Delete all OpenSearch Serverless indices.
await driver.delete_aoss_indices()
Returns: None
run_aoss_query
Execute a fulltext search query against OpenSearch.
results = driver.run_aoss_query(
name = "node_name_and_summary" ,
query_text = "California Attorney General" ,
limit = 10
)
Parameters:
name (str) - The index name to query
query_text (str) - The search query text
limit (int) - Maximum number of results (default: 10)
Returns: dict[str, Any] - OpenSearch response with hits
save_to_aoss
Bulk index documents into OpenSearch.
count = driver.save_to_aoss(
name = "node_name_and_summary" ,
data = [
{ "uuid" : "entity-1" , "name" : "Example" , "summary" : "An example entity" },
{ "uuid" : "entity-2" , "name" : "Another" , "summary" : "Another entity" }
]
)
print ( f "Indexed { count } documents" )
Parameters:
name (str) - The index name
data (list[dict]) - List of documents to index
Returns: int - Number of successfully indexed documents
Properties
provider
driver.provider # GraphProvider.NEPTUNE
Returns the GraphProvider.NEPTUNE enum value.
aoss_client
driver.aoss_client # OpenSearch client instance
Access the underlying OpenSearch client for advanced queries.
Operations Properties
The driver exposes specialized operation interfaces:
driver.entity_node_ops # NeptuneEntityNodeOperations
driver.episode_node_ops # NeptuneEpisodeNodeOperations
driver.community_node_ops # NeptuneCommunityNodeOperations
driver.saga_node_ops # NeptuneSagaNodeOperations
driver.entity_edge_ops # NeptuneEntityEdgeOperations
driver.episodic_edge_ops # NeptuneEpisodicEdgeOperations
driver.community_edge_ops # NeptuneCommunityEdgeOperations
driver.has_episode_edge_ops # NeptuneHasEpisodeEdgeOperations
driver.next_episode_edge_ops # NeptuneNextEpisodeEdgeOperations
driver.search_ops # NeptuneSearchOperations
driver.graph_ops # NeptuneGraphMaintenanceOperations
Usage Examples
Neptune Database Connection
import os
from graphiti_core.driver.neptune_driver import NeptuneDriver
# Get endpoints from environment
neptune_uri = os.environ.get( "NEPTUNE_HOST" )
aoss_host = os.environ.get( "AOSS_HOST" )
if not neptune_uri or not aoss_host:
raise ValueError ( "NEPTUNE_HOST and AOSS_HOST must be set" )
# Connect to Neptune Database
driver = NeptuneDriver(
host = f "neptune-db:// { neptune_uri } " ,
aoss_host = aoss_host,
port = 8182
)
try :
# Initialize indices (takes ~60 seconds)
await driver.build_indices_and_constraints()
# Execute a query
result, _, _ = await driver.execute_query(
"MATCH (n:Entity) RETURN count(n) as count"
)
print ( f "Entity count: { result[ 0 ][ 'count' ] } " )
finally :
await driver.close()
Neptune Analytics Connection
from graphiti_core.driver.neptune_driver import NeptuneDriver
# Connect to Neptune Analytics
driver = NeptuneDriver(
host = "neptune-graph://g-abc123def456" ,
aoss_host = "search-graphiti.us-east-1.aoss.amazonaws.com"
)
# Neptune Analytics uses the graph ID, port is ignored
With Graphiti
import os
from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.driver.neptune_driver import NeptuneDriver
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
load_dotenv()
# Initialize driver
driver = NeptuneDriver(
host = os.environ[ "NEPTUNE_HOST" ],
aoss_host = os.environ[ "AOSS_HOST" ],
port = int (os.environ.get( "NEPTUNE_PORT" , 8182 ))
)
# Initialize Graphiti
graphiti = Graphiti( graph_driver = driver)
# Clean up and initialize (for first run)
await driver.delete_aoss_indices()
await driver._delete_all_data()
await graphiti.build_indices_and_constraints()
# Add an episode
await graphiti.add_episode(
name = "Episode 1" ,
episode_body = "Kamala Harris is the Attorney General of California." ,
source = EpisodeType.text,
source_description = "biographical information" ,
reference_time = datetime.now(timezone.utc)
)
# Search
results = await graphiti.search(
"Who is the California Attorney General?"
)
print (results)
OpenSearch Fulltext Search
from graphiti_core.driver.neptune_driver import NeptuneDriver
driver = NeptuneDriver(
host = "neptune-db://my-cluster.us-east-1.neptune.amazonaws.com" ,
aoss_host = "search-graphiti.us-east-1.aoss.amazonaws.com"
)
# Search entity names and summaries
results = driver.run_aoss_query(
name = "node_name_and_summary" ,
query_text = "California government politics" ,
limit = 20
)
for hit in results.get( "hits" , {}).get( "hits" , []):
source = hit[ "_source" ]
score = hit[ "_score" ]
print ( f "[ { score :.2f} ] { source[ 'name' ] } : { source[ 'summary' ] } " )
Bulk Indexing to OpenSearch
from graphiti_core.driver.neptune_driver import NeptuneDriver
driver = NeptuneDriver(
host = "neptune-db://my-cluster.us-east-1.neptune.amazonaws.com" ,
aoss_host = "search-graphiti.us-east-1.aoss.amazonaws.com"
)
# Bulk index entities
entities = [
{
"uuid" : "entity-1" ,
"name" : "Kamala Harris" ,
"summary" : "Attorney General of California" ,
"group_id" : "politics"
},
{
"uuid" : "entity-2" ,
"name" : "Gavin Newsom" ,
"summary" : "Governor of California" ,
"group_id" : "politics"
}
]
count = driver.save_to_aoss(
name = "node_name_and_summary" ,
data = entities
)
print ( f "Indexed { count } entities" )
Datetime Handling
from datetime import datetime, timezone
from graphiti_core.driver.neptune_driver import NeptuneDriver
driver = NeptuneDriver(
host = "neptune-db://my-cluster.us-east-1.neptune.amazonaws.com" ,
aoss_host = "search-graphiti.us-east-1.aoss.amazonaws.com"
)
# Datetime objects are automatically converted
await driver.execute_query(
"CREATE (e:Episode {uuid: $uuid, created_at: $created_at} )" ,
uuid = "episode-1" ,
created_at = datetime.now(timezone.utc)
)
# Stored as: datetime("2024-03-15T10:30:00+00:00")
The driver automatically:
Converts datetime objects to ISO strings
Wraps them with Neptune’s datetime() function
Handles datetimes in nested structures (lists, dicts)
Environment Variables
Common environment variables for Neptune configuration:
NEPTUNE_HOST = neptune-db://my-cluster.us-east-1.neptune.amazonaws.com
NEPTUNE_PORT = 8182
AOSS_HOST = search-graphiti.us-east-1.aoss.amazonaws.com
AWS_REGION = us-east-1
AWS_ACCESS_KEY_ID = your-access-key
AWS_SECRET_ACCESS_KEY = your-secret-key
Load in your application:
import os
from dotenv import load_dotenv
from graphiti_core.driver.neptune_driver import NeptuneDriver
load_dotenv()
driver = NeptuneDriver(
host = os.environ[ "NEPTUNE_HOST" ],
aoss_host = os.environ[ "AOSS_HOST" ],
port = int (os.environ.get( "NEPTUNE_PORT" , 8182 ))
)
AWS Authentication
The NeptuneDriver uses boto3 for AWS authentication. It automatically:
Reads credentials from environment variables
Uses IAM role credentials if running on EC2/ECS/Lambda
Reads from ~/.aws/credentials
Uses AWS SSO credentials
Ensure your credentials have permissions for:
Neptune read/write access
OpenSearch Serverless read/write access
OpenSearch Indices
The driver creates four OpenSearch indices:
node_name_and_summary
Searches entity and community nodes by name, summary, and group_id.
Fields: uuid, name, summary, group_id
Searches community nodes by name and group_id.
Fields: uuid, name, group_id
episode_content
Searches episode nodes by content, source, and description.
Fields: uuid, content, source, source_description, group_id
edge_name_and_fact
Searches entity edges by name, fact, and group_id.
Fields: uuid, name, fact, group_id
Limitations
Neptune has some limitations:
Index creation delay - OpenSearch indices take ~60 seconds to become available
Single graph per endpoint - Cannot switch databases like Neo4j
AWS region required - Must be in the same region as Neptune and AOSS
IAM authentication required - Must configure AWS credentials
Cost considerations - Managed service with per-hour/per-query pricing
When to Use Neptune
Use NeptuneDriver for:
AWS-native deployments
Managed graph database requirements
Integration with other AWS services
Scalable cloud workloads
Production applications with AWS infrastructure
Consider alternatives for:
Non-AWS deployments → Neo4j or FalkorDB
Development/testing → Kuzu
Self-hosted requirements → Neo4j or FalkorDB
Cost-sensitive projects → Neo4j or FalkorDB
Drivers Overview Learn about the driver architecture
Graphiti Main Graphiti class documentation
Neo4j Driver Self-hosted production driver
FalkorDB Driver Redis-based alternative