Skip to main content
Bulk operations allow you to add multiple episodes to your knowledge graph in a single operation, dramatically improving throughput and reducing API costs.

Why Use Bulk Operations?

Performance

Process multiple episodes in parallel with shared context

Cost Efficiency

Reduce LLM API calls through batched processing

Deduplication

Better entity deduplication across episodes

Consistency

Atomic operations for data integrity

Basic Bulk Ingestion

Use add_episode_bulk() to add multiple episodes:
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

# Create list of episodes
episodes = [
    RawEpisode(
        name="Episode 1",
        content="Alice is a software engineer at Google.",
        source=EpisodeType.text,
        source_description="Team bio",
        reference_time=datetime.now(timezone.utc)
    ),
    RawEpisode(
        name="Episode 2",
        content="Bob is a product manager at Microsoft.",
        source=EpisodeType.text,
        source_description="Team bio",
        reference_time=datetime.now(timezone.utc)
    ),
    RawEpisode(
        name="Episode 3",
        content="Alice and Bob worked together at Amazon.",
        source=EpisodeType.text,
        source_description="Team bio",
        reference_time=datetime.now(timezone.utc)
    )
]

# Initialize Graphiti
graphiti = Graphiti(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="password"
)

# Add episodes in bulk
result = await graphiti.add_episode_bulk(bulk_episodes=episodes)

print(f"Episodes added: {len(result.episodes)}")
print(f"Entities extracted: {len(result.nodes)}")
print(f"Relationships extracted: {len(result.edges)}")

RawEpisode Structure

The RawEpisode model defines episodes for bulk processing:
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

episode = RawEpisode(
    name="Unique episode identifier",
    content="Episode content as string",
    source=EpisodeType.text,  # text, json, or message
    source_description="Where this data came from",
    reference_time=datetime.now(timezone.utc),
    uuid=None  # Optional: auto-generated if not provided
)
ParameterTypeRequiredDescription
namestrYesUnique episode name
contentstrYesEpisode content (text or JSON string)
sourceEpisodeTypeYesType of episode
source_descriptionstrYesSource description
reference_timedatetimeYesWhen this information was valid
uuidstrNoCustom UUID (auto-generated if omitted)

Processing JSON Data

Convert JSON objects to strings for bulk ingestion:
import json
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

# Load product data
products = [
    {"name": "Laptop", "price": 1299, "category": "Electronics"},
    {"name": "Phone", "price": 899, "category": "Electronics"},
    {"name": "Headphones", "price": 199, "category": "Accessories"}
]

# Create bulk episodes
episodes = [
    RawEpisode(
        name=f"Product {i}",
        content=json.dumps(product),
        source=EpisodeType.json,
        source_description="Product catalog",
        reference_time=datetime.now(timezone.utc)
    )
    for i, product in enumerate(products)
]

# Ingest
result = await graphiti.add_episode_bulk(bulk_episodes=episodes)

Loading from Files

Read episodes from external files:
import json
from pathlib import Path
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

# Load from JSON file
with open("data/products.json") as f:
    data = json.load(f)

episodes = [
    RawEpisode(
        name=f"Product {i}",
        content=json.dumps(item),
        source=EpisodeType.json,
        source_description="Product database export",
        reference_time=datetime.now(timezone.utc)
    )
    for i, item in enumerate(data["products"])
]

result = await graphiti.add_episode_bulk(bulk_episodes=episodes)

Bulk Parameters

The add_episode_bulk() method supports the same parameters as add_episode():
from pydantic import BaseModel, Field

# Define custom entities
class Product(BaseModel):
    """A commercial product."""
    category: str | None = Field(description="Product category")
    price: float | None = Field(description="Price in USD")

class Category(BaseModel):
    """A product category."""
    department: str | None = Field(description="Department name")

# Define relationships
class BelongsTo(BaseModel):
    """Product belongs to category."""
    pass

entity_types = {"Product": Product, "Category": Category}
edge_types = {"BELONGS_TO": BelongsTo}
edge_type_map = {("Product", "Category"): ["BELONGS_TO"]}

result = await graphiti.add_episode_bulk(
    bulk_episodes=episodes,
    group_id="product_catalog",
    entity_types=entity_types,
    edge_types=edge_types,
    edge_type_map=edge_type_map
)
ParameterTypeDescription
bulk_episodeslist[RawEpisode]Episodes to process
group_idstrGraph partition identifier
entity_typesdictCustom entity type definitions
excluded_entity_typeslist[str]Entity types to exclude
edge_typesdictCustom relationship definitions
edge_type_mapdictAllowed relationships by entity type
custom_extraction_instructionsstrAdditional instructions for extraction
sagastr | SagaNodeSaga to associate episodes with

Bulk Results

The add_episode_bulk() method returns an AddBulkEpisodeResults object:
result = await graphiti.add_episode_bulk(bulk_episodes=episodes)

print(f"Episodes: {len(result.episodes)}")
print(f"Entities: {len(result.nodes)}")
print(f"Relationships: {len(result.edges)}")
print(f"Episodic edges: {len(result.episodic_edges)}")

# Access specific results
for episode in result.episodes:
    print(f"Episode: {episode.name} ({episode.uuid})")

for node in result.nodes:
    print(f"Entity: {node.name} - {node.summary}")
    print(f"  Labels: {node.labels}")
    print(f"  Attributes: {node.attributes}")

for edge in result.edges:
    print(f"Relationship: {edge.fact}")
    print(f"  Type: {edge.name}")

Bulk vs Sequential Processing

Advantages:
  • Faster overall throughput
  • Better entity deduplication
  • Reduced API calls
  • More efficient embeddings
Disadvantages:
  • No edge invalidation
  • No date extraction
  • All-or-nothing operation
Best for:
  • Initial data loading
  • Batch imports
  • Historical data

Example: E-commerce Bulk Ingest

import json
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
from pathlib import Path

async def ingest_product_catalog():
    # Load product data
    with open("data/products.json") as f:
        products = json.load(f)["products"]
    
    # Create episodes
    episodes = [
        RawEpisode(
            name=f"Product {i}",
            content=json.dumps(product),
            source=EpisodeType.json,
            source_description="Product catalog",
            reference_time=datetime.now(timezone.utc)
        )
        for i, product in enumerate(products)
    ]
    
    # Initialize Graphiti
    graphiti = Graphiti(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="password"
    )
    
    try:
        # Build indices (first time only)
        await graphiti.build_indices_and_constraints()
        
        # Bulk ingest
        result = await graphiti.add_episode_bulk(
            bulk_episodes=episodes,
            group_id="product_catalog"
        )
        
        print(f"✓ Imported {len(result.episodes)} products")
        print(f"✓ Extracted {len(result.nodes)} entities")
        print(f"✓ Created {len(result.edges)} relationships")
        
    finally:
        await graphiti.close()

# Run
import asyncio
asyncio.run(ingest_product_catalog())

Chunking Large Datasets

For very large datasets, process in chunks:
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode

def chunk_list(items, chunk_size):
    """Split list into chunks."""
    for i in range(0, len(items), chunk_size):
        yield items[i:i + chunk_size]

async def ingest_large_dataset(all_episodes: list[RawEpisode]):
    graphiti = Graphiti(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="password"
    )
    
    try:
        # Process in chunks of 100
        for i, chunk in enumerate(chunk_list(all_episodes, 100)):
            print(f"Processing chunk {i+1}...")
            result = await graphiti.add_episode_bulk(bulk_episodes=chunk)
            print(f"  Added {len(result.episodes)} episodes")
    finally:
        await graphiti.close()

Sagas with Bulk Operations

Associate bulk episodes with a saga:
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone

# Create sequential episodes
episodes = [
    RawEpisode(
        name=f"Message {i}",
        content=f"Conversation message {i}",
        source=EpisodeType.message,
        source_description="Chat transcript",
        reference_time=datetime.now(timezone.utc)
    )
    for i in range(10)
]

# Add to saga
result = await graphiti.add_episode_bulk(
    bulk_episodes=episodes,
    saga="Customer Support Chat 123"  # Creates or uses existing saga
)

# Episodes are linked in a chain
print(f"Saga episodes: {len(result.episodes)}")
Sagas connect episodes with:
  • HAS_EPISODE edges from saga to each episode
  • NEXT_EPISODE edges linking consecutive episodes

Performance Optimization

1

Batch size

Process 50-200 episodes per batch for optimal performance
2

Parallel processing

Split large datasets into chunks and process sequentially
3

Group IDs

Use different group_id values to partition data
4

Monitor memory

Large batches increase memory usage - adjust based on available RAM

Error Handling

try:
    result = await graphiti.add_episode_bulk(bulk_episodes=episodes)
    print(f"Success: {len(result.episodes)} episodes")
except Exception as e:
    print(f"Bulk ingestion failed: {e}")
    # Consider processing episodes individually to identify failures
    for episode in episodes:
        try:
            await graphiti.add_episode(
                name=episode.name,
                episode_body=episode.content,
                source=episode.source,
                source_description=episode.source_description,
                reference_time=episode.reference_time
            )
        except Exception as ep_error:
            print(f"Failed episode {episode.name}: {ep_error}")

Custom Extraction Instructions

Provide additional context for entity extraction:
result = await graphiti.add_episode_bulk(
    bulk_episodes=episodes,
    custom_extraction_instructions="""
    Focus on extracting product names, prices, and categories.
    Pay special attention to product hierarchies and relationships.
    Extract manufacturer information when available.
    """
)

Best Practices

Batch Size

Process 50-200 episodes per batch for optimal performance and memory usage

Error Recovery

Implement chunking and retry logic for large datasets

Consistent Schemas

Use the same entity types across all episodes in a batch

Monitor Progress

Log results after each batch to track progress

Limitations

Bulk operations do not support:
  • Edge invalidation - Old relationships are not automatically invalidated
  • Date extraction - Temporal information is not extracted from content
  • Community updates - Use build_communities() separately if needed
For temporal data with evolving relationships, use sequential add_episode() calls instead of bulk operations.

Next Steps

Adding Episodes

Learn about sequential episode addition

Custom Entities

Define entity types for bulk ingestion

Searching

Search your bulk-loaded knowledge graph

Build docs developers (and LLMs) love