Skip to main content
The index API enables you to build and update knowledge graph indexes from your documents using Python.

build_index

Build a knowledge graph index from documents.
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.config.enums import IndexingMethod

config = GraphRagConfig.from_file("settings.yaml")

results = await build_index(
    config=config,
    method=IndexingMethod.Standard,
    verbose=True
)

Parameters

config
GraphRagConfig
required
The GraphRAG configuration object. Load from a YAML file using GraphRagConfig.from_file("settings.yaml") or construct programmatically.
method
IndexingMethod | str
default:"IndexingMethod.Standard"
The indexing method to use. Options include:
  • IndexingMethod.Standard - Full LLM-based extraction
  • IndexingMethod.NLP - NLP + LLM hybrid approach
Can be specified as an enum value or string.
is_update_run
bool
default:"False"
Whether this is an incremental update run. Set to True to update an existing index with new documents rather than rebuilding from scratch.
callbacks
list[WorkflowCallbacks] | None
default:"None"
A list of callback objects to receive pipeline lifecycle events. Use this to monitor indexing progress, handle errors, or implement custom logging.
additional_context
dict[str, Any] | None
default:"None"
Additional context to pass to the pipeline. This dictionary is accessible in the pipeline state under the additional_context key and can be used to pass custom data to pipeline workflows.
verbose
bool
default:"False"
Enable verbose logging output. When True, detailed logging information will be printed to the console and written to log files.
input_documents
pd.DataFrame | None
default:"None"
Override the default document loading and parsing. Supply your own pandas DataFrame of documents to index instead of loading from the configured input source.The DataFrame should have columns matching the expected document schema.

Returns

results
list[PipelineRunResult]
A list of pipeline run results, one for each workflow executed. Each result contains:
  • workflow - The name of the workflow that was executed
  • result - The workflow output data
  • error - Any error that occurred (None if successful)
  • errors - List of all errors encountered

Example: Basic indexing

import asyncio
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig

async def main():
    # Load configuration from YAML
    config = GraphRagConfig.from_file("settings.yaml")
    
    # Build the index
    results = await build_index(
        config=config,
        verbose=True
    )
    
    # Check for errors
    for result in results:
        if result.error:
            print(f"Workflow {result.workflow} failed: {result.error}")
        else:
            print(f"Workflow {result.workflow} completed successfully")

if __name__ == "__main__":
    asyncio.run(main())

Example: Incremental update

import asyncio
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig

async def main():
    config = GraphRagConfig.from_file("settings.yaml")
    
    # Update existing index with new documents
    results = await build_index(
        config=config,
        is_update_run=True,
        verbose=True
    )
    
    print(f"Updated {len(results)} workflows")

if __name__ == "__main__":
    asyncio.run(main())

Example: Custom document input

import asyncio
import pandas as pd
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig

async def main():
    config = GraphRagConfig.from_file("settings.yaml")
    
    # Prepare your own documents
    documents = pd.DataFrame({
        'id': ['doc1', 'doc2', 'doc3'],
        'text': [
            'First document text...',
            'Second document text...',
            'Third document text...'
        ],
        'title': ['Document 1', 'Document 2', 'Document 3']
    })
    
    # Index with custom documents
    results = await build_index(
        config=config,
        input_documents=documents,
        verbose=True
    )

if __name__ == "__main__":
    asyncio.run(main())

Example: Monitoring with callbacks

import asyncio
from graphrag.api import build_index
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks

class MyCallbacks(WorkflowCallbacks):
    def pipeline_start(self, workflows: list[str]):
        print(f"Starting pipeline with workflows: {workflows}")
    
    def pipeline_end(self, results):
        print(f"Pipeline completed with {len(results)} results")
    
    def pipeline_error(self, error: Exception):
        print(f"Pipeline error: {error}")

async def main():
    config = GraphRagConfig.from_file("settings.yaml")
    
    callbacks = [MyCallbacks()]
    
    results = await build_index(
        config=config,
        callbacks=callbacks,
        verbose=True
    )

if __name__ == "__main__":
    asyncio.run(main())

Output files

The build_index function produces several output files in the configured output directory:
  • entities.parquet - Extracted entities with descriptions and metadata
  • relationships.parquet - Relationships between entities
  • communities.parquet - Hierarchical community structure
  • community_reports.parquet - Summary reports for each community
  • text_units.parquet - Chunked text units from source documents
  • covariates.parquet - Extracted claims and covariates (if enabled)
These files can be loaded and used with the query API:
import pandas as pd

entities = pd.read_parquet("output/entities.parquet")
communities = pd.read_parquet("output/communities.parquet")
reports = pd.read_parquet("output/community_reports.parquet")

Configuration

The GraphRagConfig object controls all aspects of indexing:
from graphrag.config.models.graph_rag_config import GraphRagConfig

# Load from YAML file
config = GraphRagConfig.from_file("settings.yaml")

# Or load from environment variables
config = GraphRagConfig.from_env()
See the configuration guide for details on available settings.

Error handling

The build_index function returns results even if some workflows fail. Check the error field in each result:
results = await build_index(config=config)

for result in results:
    if result.error:
        print(f"Error in {result.workflow}: {result.error}")
        # Handle the error
    else:
        print(f"{result.workflow} completed successfully")

Build docs developers (and LLMs) love