Bulk operations allow you to add multiple episodes to your knowledge graph in a single operation, dramatically improving throughput and reducing API costs.
Why Use Bulk Operations?
Performance Process multiple episodes in parallel with shared context
Cost Efficiency Reduce LLM API calls through batched processing
Deduplication Better entity deduplication across episodes
Consistency Atomic operations for data integrity
Basic Bulk Ingestion
Use add_episode_bulk() to add multiple episodes:
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
# Create list of episodes
episodes = [
RawEpisode(
name = "Episode 1" ,
content = "Alice is a software engineer at Google." ,
source = EpisodeType.text,
source_description = "Team bio" ,
reference_time = datetime.now(timezone.utc)
),
RawEpisode(
name = "Episode 2" ,
content = "Bob is a product manager at Microsoft." ,
source = EpisodeType.text,
source_description = "Team bio" ,
reference_time = datetime.now(timezone.utc)
),
RawEpisode(
name = "Episode 3" ,
content = "Alice and Bob worked together at Amazon." ,
source = EpisodeType.text,
source_description = "Team bio" ,
reference_time = datetime.now(timezone.utc)
)
]
# Initialize Graphiti
graphiti = Graphiti(
uri = "bolt://localhost:7687" ,
user = "neo4j" ,
password = "password"
)
# Add episodes in bulk
result = await graphiti.add_episode_bulk( bulk_episodes = episodes)
print ( f "Episodes added: { len (result.episodes) } " )
print ( f "Entities extracted: { len (result.nodes) } " )
print ( f "Relationships extracted: { len (result.edges) } " )
RawEpisode Structure
The RawEpisode model defines episodes for bulk processing:
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
episode = RawEpisode(
name = "Unique episode identifier" ,
content = "Episode content as string" ,
source = EpisodeType.text, # text, json, or message
source_description = "Where this data came from" ,
reference_time = datetime.now(timezone.utc),
uuid = None # Optional: auto-generated if not provided
)
Parameter Type Required Description namestrYes Unique episode name contentstrYes Episode content (text or JSON string) sourceEpisodeTypeYes Type of episode source_descriptionstrYes Source description reference_timedatetimeYes When this information was valid uuidstrNo Custom UUID (auto-generated if omitted)
Processing JSON Data
Convert JSON objects to strings for bulk ingestion:
import json
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
# Load product data
products = [
{ "name" : "Laptop" , "price" : 1299 , "category" : "Electronics" },
{ "name" : "Phone" , "price" : 899 , "category" : "Electronics" },
{ "name" : "Headphones" , "price" : 199 , "category" : "Accessories" }
]
# Create bulk episodes
episodes = [
RawEpisode(
name = f "Product { i } " ,
content = json.dumps(product),
source = EpisodeType.json,
source_description = "Product catalog" ,
reference_time = datetime.now(timezone.utc)
)
for i, product in enumerate (products)
]
# Ingest
result = await graphiti.add_episode_bulk( bulk_episodes = episodes)
Loading from Files
Read episodes from external files:
import json
from pathlib import Path
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
# Load from JSON file
with open ( "data/products.json" ) as f:
data = json.load(f)
episodes = [
RawEpisode(
name = f "Product { i } " ,
content = json.dumps(item),
source = EpisodeType.json,
source_description = "Product database export" ,
reference_time = datetime.now(timezone.utc)
)
for i, item in enumerate (data[ "products" ])
]
result = await graphiti.add_episode_bulk( bulk_episodes = episodes)
Bulk Parameters
The add_episode_bulk() method supports the same parameters as add_episode():
from pydantic import BaseModel, Field
# Define custom entities
class Product ( BaseModel ):
"""A commercial product."""
category: str | None = Field( description = "Product category" )
price: float | None = Field( description = "Price in USD" )
class Category ( BaseModel ):
"""A product category."""
department: str | None = Field( description = "Department name" )
# Define relationships
class BelongsTo ( BaseModel ):
"""Product belongs to category."""
pass
entity_types = { "Product" : Product, "Category" : Category}
edge_types = { "BELONGS_TO" : BelongsTo}
edge_type_map = {( "Product" , "Category" ): [ "BELONGS_TO" ]}
result = await graphiti.add_episode_bulk(
bulk_episodes = episodes,
group_id = "product_catalog" ,
entity_types = entity_types,
edge_types = edge_types,
edge_type_map = edge_type_map
)
Parameter Type Description bulk_episodeslist[RawEpisode]Episodes to process group_idstrGraph partition identifier entity_typesdictCustom entity type definitions excluded_entity_typeslist[str]Entity types to exclude edge_typesdictCustom relationship definitions edge_type_mapdictAllowed relationships by entity type custom_extraction_instructionsstrAdditional instructions for extraction sagastr | SagaNodeSaga to associate episodes with
Bulk Results
The add_episode_bulk() method returns an AddBulkEpisodeResults object:
result = await graphiti.add_episode_bulk( bulk_episodes = episodes)
print ( f "Episodes: { len (result.episodes) } " )
print ( f "Entities: { len (result.nodes) } " )
print ( f "Relationships: { len (result.edges) } " )
print ( f "Episodic edges: { len (result.episodic_edges) } " )
# Access specific results
for episode in result.episodes:
print ( f "Episode: { episode.name } ( { episode.uuid } )" )
for node in result.nodes:
print ( f "Entity: { node.name } - { node.summary } " )
print ( f " Labels: { node.labels } " )
print ( f " Attributes: { node.attributes } " )
for edge in result.edges:
print ( f "Relationship: { edge.fact } " )
print ( f " Type: { edge.name } " )
Bulk vs Sequential Processing
Bulk Processing
Sequential Processing
Advantages:
Faster overall throughput
Better entity deduplication
Reduced API calls
More efficient embeddings
Disadvantages:
No edge invalidation
No date extraction
All-or-nothing operation
Best for:
Initial data loading
Batch imports
Historical data
Advantages:
Edge invalidation
Temporal reasoning
Incremental updates
Better error handling
Disadvantages:
Slower throughput
More API calls
Higher costs
Best for:
Real-time updates
Incremental ingestion
Complex temporal data
Example: E-commerce Bulk Ingest
import json
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
from pathlib import Path
async def ingest_product_catalog ():
# Load product data
with open ( "data/products.json" ) as f:
products = json.load(f)[ "products" ]
# Create episodes
episodes = [
RawEpisode(
name = f "Product { i } " ,
content = json.dumps(product),
source = EpisodeType.json,
source_description = "Product catalog" ,
reference_time = datetime.now(timezone.utc)
)
for i, product in enumerate (products)
]
# Initialize Graphiti
graphiti = Graphiti(
uri = "bolt://localhost:7687" ,
user = "neo4j" ,
password = "password"
)
try :
# Build indices (first time only)
await graphiti.build_indices_and_constraints()
# Bulk ingest
result = await graphiti.add_episode_bulk(
bulk_episodes = episodes,
group_id = "product_catalog"
)
print ( f "✓ Imported { len (result.episodes) } products" )
print ( f "✓ Extracted { len (result.nodes) } entities" )
print ( f "✓ Created { len (result.edges) } relationships" )
finally :
await graphiti.close()
# Run
import asyncio
asyncio.run(ingest_product_catalog())
Chunking Large Datasets
For very large datasets, process in chunks:
from graphiti_core import Graphiti
from graphiti_core.utils.bulk_utils import RawEpisode
def chunk_list ( items , chunk_size ):
"""Split list into chunks."""
for i in range ( 0 , len (items), chunk_size):
yield items[i:i + chunk_size]
async def ingest_large_dataset ( all_episodes : list[RawEpisode]):
graphiti = Graphiti(
uri = "bolt://localhost:7687" ,
user = "neo4j" ,
password = "password"
)
try :
# Process in chunks of 100
for i, chunk in enumerate (chunk_list(all_episodes, 100 )):
print ( f "Processing chunk { i + 1 } ..." )
result = await graphiti.add_episode_bulk( bulk_episodes = chunk)
print ( f " Added { len (result.episodes) } episodes" )
finally :
await graphiti.close()
Sagas with Bulk Operations
Associate bulk episodes with a saga:
from graphiti_core.utils.bulk_utils import RawEpisode
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
# Create sequential episodes
episodes = [
RawEpisode(
name = f "Message { i } " ,
content = f "Conversation message { i } " ,
source = EpisodeType.message,
source_description = "Chat transcript" ,
reference_time = datetime.now(timezone.utc)
)
for i in range ( 10 )
]
# Add to saga
result = await graphiti.add_episode_bulk(
bulk_episodes = episodes,
saga = "Customer Support Chat 123" # Creates or uses existing saga
)
# Episodes are linked in a chain
print ( f "Saga episodes: { len (result.episodes) } " )
Sagas connect episodes with:
HAS_EPISODE edges from saga to each episode
NEXT_EPISODE edges linking consecutive episodes
Batch size
Process 50-200 episodes per batch for optimal performance
Parallel processing
Split large datasets into chunks and process sequentially
Group IDs
Use different group_id values to partition data
Monitor memory
Large batches increase memory usage - adjust based on available RAM
Error Handling
try :
result = await graphiti.add_episode_bulk( bulk_episodes = episodes)
print ( f "Success: { len (result.episodes) } episodes" )
except Exception as e:
print ( f "Bulk ingestion failed: { e } " )
# Consider processing episodes individually to identify failures
for episode in episodes:
try :
await graphiti.add_episode(
name = episode.name,
episode_body = episode.content,
source = episode.source,
source_description = episode.source_description,
reference_time = episode.reference_time
)
except Exception as ep_error:
print ( f "Failed episode { episode.name } : { ep_error } " )
Provide additional context for entity extraction:
result = await graphiti.add_episode_bulk(
bulk_episodes = episodes,
custom_extraction_instructions = """
Focus on extracting product names, prices, and categories.
Pay special attention to product hierarchies and relationships.
Extract manufacturer information when available.
"""
)
Best Practices
Batch Size Process 50-200 episodes per batch for optimal performance and memory usage
Error Recovery Implement chunking and retry logic for large datasets
Consistent Schemas Use the same entity types across all episodes in a batch
Monitor Progress Log results after each batch to track progress
Limitations
Bulk operations do not support:
Edge invalidation - Old relationships are not automatically invalidated
Date extraction - Temporal information is not extracted from content
Community updates - Use build_communities() separately if needed
For temporal data with evolving relationships, use sequential add_episode() calls instead of bulk operations.
Next Steps
Adding Episodes Learn about sequential episode addition
Custom Entities Define entity types for bulk ingestion
Searching Search your bulk-loaded knowledge graph