The GraphRAG Python API allows you to integrate knowledge graph indexing and querying directly into your Python applications. This provides fine-grained control over the indexing pipeline and query execution.
This API is under development and may undergo changes in future releases. Backwards compatibility is not guaranteed at this time.
Installation
API overview
GraphRAG provides three main API modules:
Indexing API Build knowledge graph indexes from your data
Query API Search and query your knowledge graphs
Prompt tuning API Generate custom prompts for your domain
Indexing API
The indexing API allows you to build knowledge graph indexes programmatically.
Import the API
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.config.enums import IndexingMethod
Basic indexing
Create a simple indexing pipeline:
import asyncio
from pathlib import Path
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
# Load configuration from settings.yaml
config = GraphRagConfig.from_file( "./my-project/settings.yaml" )
# Run the indexing pipeline
async def run_index ():
results = await build_index(
config = config,
method = IndexingMethod.Standard,
verbose = True
)
for result in results:
if result.error is None :
print ( f "Workflow { result.workflow } completed successfully" )
else :
print ( f "Workflow { result.workflow } failed: { result.error } " )
# Execute
asyncio.run(run_index())
Advanced indexing options
With callbacks
Update mode
Custom input documents
With additional context
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
class MyCallbacks ( WorkflowCallbacks ):
def on_workflow_start ( self , name : str , instance : object ) -> None :
print ( f "Starting workflow: { name } " )
def on_workflow_end ( self , name : str , result : object ) -> None :
print ( f "Completed workflow: { name } " )
def on_error ( self , error : Exception , stack_trace : str ) -> None :
print ( f "Error occurred: { error } " )
results = await build_index(
config = config,
callbacks = [MyCallbacks()],
verbose = True
)
build_index parameters
Parameter Type Description Default configGraphRagConfigThe GraphRAG configuration Required methodIndexingMethod | strIndexing method to use IndexingMethod.Standardis_update_runboolWhether this is an incremental update Falsecallbackslist[WorkflowCallbacks]Callbacks for pipeline events Noneadditional_contextdict[str, Any]Custom context for the pipeline NoneverboseboolEnable verbose logging Falseinput_documentspd.DataFrameOverride document loading with custom dataframe None
Query API
The query API provides four search methods for querying your knowledge graph.
Import the API
from graphrag.api import (
global_search,
global_search_streaming,
local_search,
local_search_streaming,
drift_search,
drift_search_streaming,
basic_search,
basic_search_streaming
)
from graphrag.config.models.graph_rag_config import GraphRagConfig
import pandas as pd
Loading index data
Before querying, load the required parquet files:
import pandas as pd
from pathlib import Path
# Define your output directory
output_dir = Path( "./my-project/output" )
# Load required tables
entities = pd.read_parquet(output_dir / "entities.parquet" )
communities = pd.read_parquet(output_dir / "communities.parquet" )
community_reports = pd.read_parquet(output_dir / "community_reports.parquet" )
text_units = pd.read_parquet(output_dir / "text_units.parquet" )
relationships = pd.read_parquet(output_dir / "relationships.parquet" )
# Optional: load covariates if you extracted claims
try :
covariates = pd.read_parquet(output_dir / "covariates.parquet" )
except FileNotFoundError :
covariates = None
Global search
Use global search for high-level questions about the entire dataset:
Basic global search
Streaming global search
Dynamic community selection
import asyncio
from graphrag.api import global_search
from graphrag.config.models.graph_rag_config import GraphRagConfig
config = GraphRagConfig.from_file( "./my-project/settings.yaml" )
async def run_global_search ():
response, context = await global_search(
config = config,
entities = entities,
communities = communities,
community_reports = community_reports,
community_level = 2 ,
dynamic_community_selection = False ,
response_type = "Multiple Paragraphs" ,
query = "What are the main themes in the dataset?"
)
print ( "Response:" , response)
print ( "Context:" , context)
asyncio.run(run_global_search())
Local search
Use local search for specific entity-focused queries:
Basic local search
Streaming local search
from graphrag.api import local_search
async def run_local_search ():
response, context = await local_search(
config = config,
entities = entities,
communities = communities,
community_reports = community_reports,
text_units = text_units,
relationships = relationships,
covariates = covariates,
community_level = 2 ,
response_type = "Multiple Paragraphs" ,
query = "Tell me about John Smith"
)
print ( "Response:" , response)
print ( "Context:" , context)
asyncio.run(run_local_search())
DRIFT search
DRIFT combines local and global approaches:
from graphrag.api import drift_search
async def run_drift_search ():
response, context = await drift_search(
config = config,
entities = entities,
communities = communities,
community_reports = community_reports,
text_units = text_units,
relationships = relationships,
community_level = 2 ,
response_type = "Multiple Paragraphs" ,
query = "How do the main themes relate to specific individuals?"
)
print ( "Response:" , response)
asyncio.run(run_drift_search())
Basic search
Basic text similarity search without the knowledge graph:
from graphrag.api import basic_search
async def run_basic_search ():
response, context = await basic_search(
config = config,
text_units = text_units,
response_type = "Multiple Paragraphs" ,
query = "Find mentions of artificial intelligence"
)
print ( "Response:" , response)
asyncio.run(run_basic_search())
Search API parameters
Global search parameters
Parameter Type Description configGraphRagConfigGraphRAG configuration entitiespd.DataFrameEntities dataframe communitiespd.DataFrameCommunities dataframe community_reportspd.DataFrameCommunity reports dataframe community_levelint | NoneLeiden hierarchy level (higher = smaller communities) dynamic_community_selectionboolEnable dynamic community selection response_typestrResponse format description querystrThe query string callbackslist[QueryCallbacks]Optional callbacks verboseboolEnable verbose logging
Local search parameters
Parameter Type Description configGraphRagConfigGraphRAG configuration entitiespd.DataFrameEntities dataframe communitiespd.DataFrameCommunities dataframe community_reportspd.DataFrameCommunity reports dataframe text_unitspd.DataFrameText units dataframe relationshipspd.DataFrameRelationships dataframe covariatespd.DataFrame | NoneCovariates/claims dataframe community_levelintLeiden hierarchy level response_typestrResponse format description querystrThe query string callbackslist[QueryCallbacks]Optional callbacks verboseboolEnable verbose logging
Prompt tuning API
Generate custom prompts tailored to your data domain:
import asyncio
from pathlib import Path
from graphrag.api import generate_indexing_prompts, DocSelectionType
async def tune_prompts ():
root = Path( "./my-project" )
await generate_indexing_prompts(
root = root,
domain = "medical research" ,
selection_method = DocSelectionType. RANDOM ,
limit = 15 ,
max_tokens = 2000 ,
chunk_size = 200 ,
overlap = 100 ,
language = "English" ,
discover_entity_types = True ,
output = root / "prompts"
)
print ( "Prompts generated successfully!" )
asyncio.run(tune_prompts())
Selection methods
Random
Top
Auto (K-means)
Randomly select documents: await generate_indexing_prompts(
root = root,
selection_method = DocSelectionType. RANDOM ,
limit = 15
)
Select first N documents: await generate_indexing_prompts(
root = root,
selection_method = DocSelectionType. TOP ,
limit = 15
)
Use clustering to select representative documents: await generate_indexing_prompts(
root = root,
selection_method = DocSelectionType. AUTO ,
n_subset_max = 300 ,
k = 15
)
Query callbacks
Implement custom callbacks to track query execution:
from graphrag.callbacks.query_callbacks import QueryCallbacks
from typing import Any
class MyQueryCallbacks ( QueryCallbacks ):
def on_context ( self , context : Any) -> None :
"""Called when context is available."""
print ( f "Context retrieved: { len (context) } items" )
def on_llm_start ( self , prompt : str ) -> None :
"""Called before LLM invocation."""
print ( f "Sending prompt ( { len (prompt) } chars) to LLM" )
def on_llm_end ( self , response : str ) -> None :
"""Called after LLM response."""
print ( f "Received response ( { len (response) } chars)" )
# Use with queries
response, context = await local_search(
config = config,
entities = entities,
communities = communities,
community_reports = community_reports,
text_units = text_units,
relationships = relationships,
covariates = covariates,
community_level = 2 ,
response_type = "Multiple Paragraphs" ,
query = "What are the findings?" ,
callbacks = [MyQueryCallbacks()]
)
Complete example
Here’s a complete example combining indexing and querying:
import asyncio
import pandas as pd
from pathlib import Path
from graphrag.api import build_index, global_search
from graphrag.config.models.graph_rag_config import GraphRagConfig
async def main ():
# Setup
project_root = Path( "./my-research" )
config = GraphRagConfig.from_file(project_root / "settings.yaml" )
output_dir = project_root / "output"
# Step 1: Build the index
print ( "Building index..." )
results = await build_index(
config = config,
verbose = True
)
# Check for errors
for result in results:
if result.error:
print ( f "Error in { result.workflow } : { result.error } " )
return
print ( "Index built successfully!" )
# Step 2: Load the indexed data
print ( "Loading index data..." )
entities = pd.read_parquet(output_dir / "entities.parquet" )
communities = pd.read_parquet(output_dir / "communities.parquet" )
community_reports = pd.read_parquet(output_dir / "community_reports.parquet" )
# Step 3: Run a query
print ( "Running query..." )
response, context = await global_search(
config = config,
entities = entities,
communities = communities,
community_reports = community_reports,
community_level = 2 ,
dynamic_community_selection = False ,
response_type = "Multiple Paragraphs" ,
query = "What are the main findings in this dataset?"
)
print ( " \n === Query Response ===" )
print (response)
print ( " \n === Context Data ===" )
print ( f "Retrieved { len (context) } context items" )
if __name__ == "__main__" :
asyncio.run(main())
Error handling
Always handle potential errors in your pipeline:
import asyncio
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
async def safe_index ():
try :
config = GraphRagConfig.from_file( "./settings.yaml" )
results = await build_index( config = config, verbose = True )
failed_workflows = [
result for result in results
if result.error is not None
]
if failed_workflows:
print ( "Some workflows failed:" )
for result in failed_workflows:
print ( f " - { result.workflow } : { result.error } " )
return False
print ( "All workflows completed successfully!" )
return True
except FileNotFoundError :
print ( "Configuration file not found" )
return False
except Exception as e:
print ( f "Unexpected error: { e } " )
return False
asyncio.run(safe_index())
Always validate your configuration with --dry-run via CLI before running programmatic indexing.
Next steps
CLI usage Learn the command-line interface
Configuration Understand configuration options
Best practices Optimize your GraphRAG implementation
API reference Detailed API documentation