Skip to main content
The GraphRAG Python API allows you to integrate knowledge graph indexing and querying directly into your Python applications. This provides fine-grained control over the indexing pipeline and query execution.
This API is under development and may undergo changes in future releases. Backwards compatibility is not guaranteed at this time.

Installation

pip install graphrag

API overview

GraphRAG provides three main API modules:

Indexing API

Build knowledge graph indexes from your data

Query API

Search and query your knowledge graphs

Prompt tuning API

Generate custom prompts for your domain

Indexing API

The indexing API allows you to build knowledge graph indexes programmatically.

Import the API

from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.config.enums import IndexingMethod

Basic indexing

Create a simple indexing pipeline:
import asyncio
from pathlib import Path
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig

# Load configuration from settings.yaml
config = GraphRagConfig.from_file("./my-project/settings.yaml")

# Run the indexing pipeline
async def run_index():
    results = await build_index(
        config=config,
        method=IndexingMethod.Standard,
        verbose=True
    )
    
    for result in results:
        if result.error is None:
            print(f"Workflow {result.workflow} completed successfully")
        else:
            print(f"Workflow {result.workflow} failed: {result.error}")

# Execute
asyncio.run(run_index())

Advanced indexing options

from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks

class MyCallbacks(WorkflowCallbacks):
    def on_workflow_start(self, name: str, instance: object) -> None:
        print(f"Starting workflow: {name}")
    
    def on_workflow_end(self, name: str, result: object) -> None:
        print(f"Completed workflow: {name}")
    
    def on_error(self, error: Exception, stack_trace: str) -> None:
        print(f"Error occurred: {error}")

results = await build_index(
    config=config,
    callbacks=[MyCallbacks()],
    verbose=True
)

build_index parameters

ParameterTypeDescriptionDefault
configGraphRagConfigThe GraphRAG configurationRequired
methodIndexingMethod | strIndexing method to useIndexingMethod.Standard
is_update_runboolWhether this is an incremental updateFalse
callbackslist[WorkflowCallbacks]Callbacks for pipeline eventsNone
additional_contextdict[str, Any]Custom context for the pipelineNone
verboseboolEnable verbose loggingFalse
input_documentspd.DataFrameOverride document loading with custom dataframeNone

Query API

The query API provides four search methods for querying your knowledge graph.

Import the API

from graphrag.api import (
    global_search,
    global_search_streaming,
    local_search,
    local_search_streaming,
    drift_search,
    drift_search_streaming,
    basic_search,
    basic_search_streaming
)
from graphrag.config.models.graph_rag_config import GraphRagConfig
import pandas as pd

Loading index data

Before querying, load the required parquet files:
import pandas as pd
from pathlib import Path

# Define your output directory
output_dir = Path("./my-project/output")

# Load required tables
entities = pd.read_parquet(output_dir / "entities.parquet")
communities = pd.read_parquet(output_dir / "communities.parquet")
community_reports = pd.read_parquet(output_dir / "community_reports.parquet")
text_units = pd.read_parquet(output_dir / "text_units.parquet")
relationships = pd.read_parquet(output_dir / "relationships.parquet")

# Optional: load covariates if you extracted claims
try:
    covariates = pd.read_parquet(output_dir / "covariates.parquet")
except FileNotFoundError:
    covariates = None
Use global search for high-level questions about the entire dataset:
import asyncio
from graphrag.api import global_search
from graphrag.config.models.graph_rag_config import GraphRagConfig

config = GraphRagConfig.from_file("./my-project/settings.yaml")

async def run_global_search():
    response, context = await global_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        community_level=2,
        dynamic_community_selection=False,
        response_type="Multiple Paragraphs",
        query="What are the main themes in the dataset?"
    )
    
    print("Response:", response)
    print("Context:", context)

asyncio.run(run_global_search())
Use local search for specific entity-focused queries:
from graphrag.api import local_search

async def run_local_search():
    response, context = await local_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        text_units=text_units,
        relationships=relationships,
        covariates=covariates,
        community_level=2,
        response_type="Multiple Paragraphs",
        query="Tell me about John Smith"
    )
    
    print("Response:", response)
    print("Context:", context)

asyncio.run(run_local_search())
DRIFT combines local and global approaches:
from graphrag.api import drift_search

async def run_drift_search():
    response, context = await drift_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        text_units=text_units,
        relationships=relationships,
        community_level=2,
        response_type="Multiple Paragraphs",
        query="How do the main themes relate to specific individuals?"
    )
    
    print("Response:", response)

asyncio.run(run_drift_search())
Basic text similarity search without the knowledge graph:
from graphrag.api import basic_search

async def run_basic_search():
    response, context = await basic_search(
        config=config,
        text_units=text_units,
        response_type="Multiple Paragraphs",
        query="Find mentions of artificial intelligence"
    )
    
    print("Response:", response)

asyncio.run(run_basic_search())

Search API parameters

Global search parameters

ParameterTypeDescription
configGraphRagConfigGraphRAG configuration
entitiespd.DataFrameEntities dataframe
communitiespd.DataFrameCommunities dataframe
community_reportspd.DataFrameCommunity reports dataframe
community_levelint | NoneLeiden hierarchy level (higher = smaller communities)
dynamic_community_selectionboolEnable dynamic community selection
response_typestrResponse format description
querystrThe query string
callbackslist[QueryCallbacks]Optional callbacks
verboseboolEnable verbose logging

Local search parameters

ParameterTypeDescription
configGraphRagConfigGraphRAG configuration
entitiespd.DataFrameEntities dataframe
communitiespd.DataFrameCommunities dataframe
community_reportspd.DataFrameCommunity reports dataframe
text_unitspd.DataFrameText units dataframe
relationshipspd.DataFrameRelationships dataframe
covariatespd.DataFrame | NoneCovariates/claims dataframe
community_levelintLeiden hierarchy level
response_typestrResponse format description
querystrThe query string
callbackslist[QueryCallbacks]Optional callbacks
verboseboolEnable verbose logging

Prompt tuning API

Generate custom prompts tailored to your data domain:
import asyncio
from pathlib import Path
from graphrag.api import generate_indexing_prompts, DocSelectionType

async def tune_prompts():
    root = Path("./my-project")
    
    await generate_indexing_prompts(
        root=root,
        domain="medical research",
        selection_method=DocSelectionType.RANDOM,
        limit=15,
        max_tokens=2000,
        chunk_size=200,
        overlap=100,
        language="English",
        discover_entity_types=True,
        output=root / "prompts"
    )
    
    print("Prompts generated successfully!")

asyncio.run(tune_prompts())

Selection methods

Randomly select documents:
await generate_indexing_prompts(
    root=root,
    selection_method=DocSelectionType.RANDOM,
    limit=15
)

Query callbacks

Implement custom callbacks to track query execution:
from graphrag.callbacks.query_callbacks import QueryCallbacks
from typing import Any

class MyQueryCallbacks(QueryCallbacks):
    def on_context(self, context: Any) -> None:
        """Called when context is available."""
        print(f"Context retrieved: {len(context)} items")
    
    def on_llm_start(self, prompt: str) -> None:
        """Called before LLM invocation."""
        print(f"Sending prompt ({len(prompt)} chars) to LLM")
    
    def on_llm_end(self, response: str) -> None:
        """Called after LLM response."""
        print(f"Received response ({len(response)} chars)")

# Use with queries
response, context = await local_search(
    config=config,
    entities=entities,
    communities=communities,
    community_reports=community_reports,
    text_units=text_units,
    relationships=relationships,
    covariates=covariates,
    community_level=2,
    response_type="Multiple Paragraphs",
    query="What are the findings?",
    callbacks=[MyQueryCallbacks()]
)

Complete example

Here’s a complete example combining indexing and querying:
import asyncio
import pandas as pd
from pathlib import Path
from graphrag.api import build_index, global_search
from graphrag.config.models.graph_rag_config import GraphRagConfig

async def main():
    # Setup
    project_root = Path("./my-research")
    config = GraphRagConfig.from_file(project_root / "settings.yaml")
    output_dir = project_root / "output"
    
    # Step 1: Build the index
    print("Building index...")
    results = await build_index(
        config=config,
        verbose=True
    )
    
    # Check for errors
    for result in results:
        if result.error:
            print(f"Error in {result.workflow}: {result.error}")
            return
    
    print("Index built successfully!")
    
    # Step 2: Load the indexed data
    print("Loading index data...")
    entities = pd.read_parquet(output_dir / "entities.parquet")
    communities = pd.read_parquet(output_dir / "communities.parquet")
    community_reports = pd.read_parquet(output_dir / "community_reports.parquet")
    
    # Step 3: Run a query
    print("Running query...")
    response, context = await global_search(
        config=config,
        entities=entities,
        communities=communities,
        community_reports=community_reports,
        community_level=2,
        dynamic_community_selection=False,
        response_type="Multiple Paragraphs",
        query="What are the main findings in this dataset?"
    )
    
    print("\n=== Query Response ===")
    print(response)
    print("\n=== Context Data ===")
    print(f"Retrieved {len(context)} context items")

if __name__ == "__main__":
    asyncio.run(main())

Error handling

Always handle potential errors in your pipeline:
import asyncio
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig

async def safe_index():
    try:
        config = GraphRagConfig.from_file("./settings.yaml")
        results = await build_index(config=config, verbose=True)
        
        failed_workflows = [
            result for result in results 
            if result.error is not None
        ]
        
        if failed_workflows:
            print("Some workflows failed:")
            for result in failed_workflows:
                print(f"  - {result.workflow}: {result.error}")
            return False
        
        print("All workflows completed successfully!")
        return True
        
    except FileNotFoundError:
        print("Configuration file not found")
        return False
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False

asyncio.run(safe_index())
Always validate your configuration with --dry-run via CLI before running programmatic indexing.

Next steps

CLI usage

Learn the command-line interface

Configuration

Understand configuration options

Best practices

Optimize your GraphRAG implementation

API reference

Detailed API documentation

Build docs developers (and LLMs) love