Skip to main content
The NeptuneDriver class provides connectivity to AWS Neptune, a fully managed graph database service with OpenSearch integration.

Overview

AWS Neptune is a managed graph database service that supports both property graph (openCypher) and RDF models. The NeptuneDriver provides:
  • Support for Neptune Database and Neptune Analytics
  • OpenSearch Serverless (AOSS) integration for fulltext search
  • AWS IAM authentication
  • Managed infrastructure and scaling
  • Automatic datetime conversion

Constructor

from graphiti_core.driver.neptune_driver import NeptuneDriver

driver = NeptuneDriver(
    host="neptune-db://cluster-endpoint.region.neptune.amazonaws.com",
    aoss_host="search-endpoint.region.aoss.amazonaws.com",
    port=8182,
    aoss_port=443
)

Parameters

host
str
required
The Neptune endpoint URL.Must use one of these URI schemes:
  • neptune-db://<endpoint> - Neptune Database cluster endpoint
  • neptune-graph://<graph-id> - Neptune Analytics graph identifier
Example: "neptune-db://my-cluster.us-east-1.neptune.amazonaws.com" or "neptune-graph://g-abc123def456"
aoss_host
str
required
The OpenSearch Serverless (AOSS) endpoint hostname.Used for fulltext indexing and search. Must be an AOSS endpoint in the same AWS region.Example: "search-graphiti.us-east-1.aoss.amazonaws.com"
port
int
default:"8182"
The Neptune Database port.Only used for Neptune Database. Ignored for Neptune Analytics.Default: 8182
aoss_port
int
default:"443"
The OpenSearch Serverless port.Typically 443 for HTTPS connections.Default: 443

Methods

execute_query

Execute a Cypher query and return results.
result, _, _ = await driver.execute_query(
    "MATCH (n:Entity) RETURN n LIMIT 10"
)
Parameters:
  • cypher_query_ (str | list) - The Cypher query or list of queries to execute
  • **kwargs - Query parameters (automatically sanitized for Neptune)
Returns: tuple[list[dict], None, None] - Records, None, None
Neptune requires datetime objects to be converted to ISO strings and wrapped with datetime(). The driver handles this automatically.

session

Create a new database session.
session = driver.session()
try:
    result = await session.run("MATCH (n) RETURN count(n)")
finally:
    await session.close()
Parameters:
  • database (str | None) - Ignored for Neptune (single graph per endpoint)
Returns: NeptuneDriverSession - Session object

close

Close the driver and release connections.
await driver.close()
Returns: None

build_indices_and_constraints

Create OpenSearch indices for fulltext search.
await driver.build_indices_and_constraints(delete_existing=False)
Parameters:
  • delete_existing (bool) - If True, delete existing AOSS indices before creating new ones
Returns: None
Index creation takes approximately 60 seconds. The method includes a sleep to ensure indices are ready before use.

create_aoss_indices

Create OpenSearch Serverless indices.
await driver.create_aoss_indices()
Returns: None Creates the following indices:
  • node_name_and_summary - Entity and community node fulltext
  • community_name - Community name search
  • episode_content - Episode content search
  • edge_name_and_fact - Edge relationship search

delete_aoss_indices

Delete all OpenSearch Serverless indices.
await driver.delete_aoss_indices()
Returns: None

run_aoss_query

Execute a fulltext search query against OpenSearch.
results = driver.run_aoss_query(
    name="node_name_and_summary",
    query_text="California Attorney General",
    limit=10
)
Parameters:
  • name (str) - The index name to query
  • query_text (str) - The search query text
  • limit (int) - Maximum number of results (default: 10)
Returns: dict[str, Any] - OpenSearch response with hits

save_to_aoss

Bulk index documents into OpenSearch.
count = driver.save_to_aoss(
    name="node_name_and_summary",
    data=[
        {"uuid": "entity-1", "name": "Example", "summary": "An example entity"},
        {"uuid": "entity-2", "name": "Another", "summary": "Another entity"}
    ]
)
print(f"Indexed {count} documents")
Parameters:
  • name (str) - The index name
  • data (list[dict]) - List of documents to index
Returns: int - Number of successfully indexed documents

Properties

provider

driver.provider  # GraphProvider.NEPTUNE
Returns the GraphProvider.NEPTUNE enum value.

aoss_client

driver.aoss_client  # OpenSearch client instance
Access the underlying OpenSearch client for advanced queries.

Operations Properties

The driver exposes specialized operation interfaces:
driver.entity_node_ops       # NeptuneEntityNodeOperations
driver.episode_node_ops      # NeptuneEpisodeNodeOperations
driver.community_node_ops    # NeptuneCommunityNodeOperations
driver.saga_node_ops         # NeptuneSagaNodeOperations
driver.entity_edge_ops       # NeptuneEntityEdgeOperations
driver.episodic_edge_ops     # NeptuneEpisodicEdgeOperations
driver.community_edge_ops    # NeptuneCommunityEdgeOperations
driver.has_episode_edge_ops  # NeptuneHasEpisodeEdgeOperations
driver.next_episode_edge_ops # NeptuneNextEpisodeEdgeOperations
driver.search_ops            # NeptuneSearchOperations
driver.graph_ops             # NeptuneGraphMaintenanceOperations

Usage Examples

Neptune Database Connection

import os
from graphiti_core.driver.neptune_driver import NeptuneDriver

# Get endpoints from environment
neptune_uri = os.environ.get("NEPTUNE_HOST")
aoss_host = os.environ.get("AOSS_HOST")

if not neptune_uri or not aoss_host:
    raise ValueError("NEPTUNE_HOST and AOSS_HOST must be set")

# Connect to Neptune Database
driver = NeptuneDriver(
    host=f"neptune-db://{neptune_uri}",
    aoss_host=aoss_host,
    port=8182
)

try:
    # Initialize indices (takes ~60 seconds)
    await driver.build_indices_and_constraints()
    
    # Execute a query
    result, _, _ = await driver.execute_query(
        "MATCH (n:Entity) RETURN count(n) as count"
    )
    print(f"Entity count: {result[0]['count']}")
finally:
    await driver.close()

Neptune Analytics Connection

from graphiti_core.driver.neptune_driver import NeptuneDriver

# Connect to Neptune Analytics
driver = NeptuneDriver(
    host="neptune-graph://g-abc123def456",
    aoss_host="search-graphiti.us-east-1.aoss.amazonaws.com"
)

# Neptune Analytics uses the graph ID, port is ignored

With Graphiti

import os
from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.driver.neptune_driver import NeptuneDriver
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

load_dotenv()

# Initialize driver
driver = NeptuneDriver(
    host=os.environ["NEPTUNE_HOST"],
    aoss_host=os.environ["AOSS_HOST"],
    port=int(os.environ.get("NEPTUNE_PORT", 8182))
)

# Initialize Graphiti
graphiti = Graphiti(graph_driver=driver)

# Clean up and initialize (for first run)
await driver.delete_aoss_indices()
await driver._delete_all_data()
await graphiti.build_indices_and_constraints()

# Add an episode
await graphiti.add_episode(
    name="Episode 1",
    episode_body="Kamala Harris is the Attorney General of California.",
    source=EpisodeType.text,
    source_description="biographical information",
    reference_time=datetime.now(timezone.utc)
)

# Search
results = await graphiti.search(
    "Who is the California Attorney General?"
)
print(results)
from graphiti_core.driver.neptune_driver import NeptuneDriver

driver = NeptuneDriver(
    host="neptune-db://my-cluster.us-east-1.neptune.amazonaws.com",
    aoss_host="search-graphiti.us-east-1.aoss.amazonaws.com"
)

# Search entity names and summaries
results = driver.run_aoss_query(
    name="node_name_and_summary",
    query_text="California government politics",
    limit=20
)

for hit in results.get("hits", {}).get("hits", []):
    source = hit["_source"]
    score = hit["_score"]
    print(f"[{score:.2f}] {source['name']}: {source['summary']}")

Bulk Indexing to OpenSearch

from graphiti_core.driver.neptune_driver import NeptuneDriver

driver = NeptuneDriver(
    host="neptune-db://my-cluster.us-east-1.neptune.amazonaws.com",
    aoss_host="search-graphiti.us-east-1.aoss.amazonaws.com"
)

# Bulk index entities
entities = [
    {
        "uuid": "entity-1",
        "name": "Kamala Harris",
        "summary": "Attorney General of California",
        "group_id": "politics"
    },
    {
        "uuid": "entity-2",
        "name": "Gavin Newsom",
        "summary": "Governor of California",
        "group_id": "politics"
    }
]

count = driver.save_to_aoss(
    name="node_name_and_summary",
    data=entities
)
print(f"Indexed {count} entities")

Datetime Handling

from datetime import datetime, timezone
from graphiti_core.driver.neptune_driver import NeptuneDriver

driver = NeptuneDriver(
    host="neptune-db://my-cluster.us-east-1.neptune.amazonaws.com",
    aoss_host="search-graphiti.us-east-1.aoss.amazonaws.com"
)

# Datetime objects are automatically converted
await driver.execute_query(
    "CREATE (e:Episode {uuid: $uuid, created_at: $created_at})",
    uuid="episode-1",
    created_at=datetime.now(timezone.utc)
)
# Stored as: datetime("2024-03-15T10:30:00+00:00")
The driver automatically:
  1. Converts datetime objects to ISO strings
  2. Wraps them with Neptune’s datetime() function
  3. Handles datetimes in nested structures (lists, dicts)

Environment Variables

Common environment variables for Neptune configuration:
NEPTUNE_HOST=neptune-db://my-cluster.us-east-1.neptune.amazonaws.com
NEPTUNE_PORT=8182
AOSS_HOST=search-graphiti.us-east-1.aoss.amazonaws.com
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
Load in your application:
import os
from dotenv import load_dotenv
from graphiti_core.driver.neptune_driver import NeptuneDriver

load_dotenv()

driver = NeptuneDriver(
    host=os.environ["NEPTUNE_HOST"],
    aoss_host=os.environ["AOSS_HOST"],
    port=int(os.environ.get("NEPTUNE_PORT", 8182))
)

AWS Authentication

The NeptuneDriver uses boto3 for AWS authentication. It automatically:
  1. Reads credentials from environment variables
  2. Uses IAM role credentials if running on EC2/ECS/Lambda
  3. Reads from ~/.aws/credentials
  4. Uses AWS SSO credentials
Ensure your credentials have permissions for:
  • Neptune read/write access
  • OpenSearch Serverless read/write access

OpenSearch Indices

The driver creates four OpenSearch indices:

node_name_and_summary

Searches entity and community nodes by name, summary, and group_id. Fields: uuid, name, summary, group_id

community_name

Searches community nodes by name and group_id. Fields: uuid, name, group_id

episode_content

Searches episode nodes by content, source, and description. Fields: uuid, content, source, source_description, group_id

edge_name_and_fact

Searches entity edges by name, fact, and group_id. Fields: uuid, name, fact, group_id

Limitations

Neptune has some limitations:
  • Index creation delay - OpenSearch indices take ~60 seconds to become available
  • Single graph per endpoint - Cannot switch databases like Neo4j
  • AWS region required - Must be in the same region as Neptune and AOSS
  • IAM authentication required - Must configure AWS credentials
  • Cost considerations - Managed service with per-hour/per-query pricing

When to Use Neptune

Use NeptuneDriver for:
  • AWS-native deployments
  • Managed graph database requirements
  • Integration with other AWS services
  • Scalable cloud workloads
  • Production applications with AWS infrastructure
Consider alternatives for:
  • Non-AWS deployments → Neo4j or FalkorDB
  • Development/testing → Kuzu
  • Self-hosted requirements → Neo4j or FalkorDB
  • Cost-sensitive projects → Neo4j or FalkorDB

Drivers Overview

Learn about the driver architecture

Graphiti

Main Graphiti class documentation

Neo4j Driver

Self-hosted production driver

FalkorDB Driver

Redis-based alternative

Build docs developers (and LLMs) love