Overview
The EntityMerger class orchestrates entity deduplication and profile merging using a multi-stage pipeline:
Lexical blocking - RapidFuzz fuzzy matching with variant-aware expansion (aliases, acronyms, equivalence groups)
Batched embeddings - Precompute evidence embeddings for all entities in a single batch call
Similarity search - Cosine similarity over shortlisted candidates
Match verification - LLM confirms or rejects the merge
Gray-band arbitration - Dispute agent resolves uncertain cases
Canonical name selection - Picks the better name and demotes the other to alternative names
Profile update - Merges article evidence and regenerates the profile embedding
Class Definition
from src.engine.mergers import EntityMerger
merger = EntityMerger(entity_type: str )
The type of entity to merge. Supported values:
"people" - Person entities (keyed by name)
"organizations" - Organization entities (keyed by (name, type) tuple)
"locations" - Location entities (keyed by (name, type) tuple)
"events" - Event entities (keyed by (title, start_date) tuple)
Raises
ValueError - If entity_type is not one of the supported values
Entity Type Configuration
Each entity type has specific configuration for key fields, alternative name attributes, and logging:
ENTITY_CONFIGS = {
"people" : {
"key_field" : "name" ,
"key_type" : str ,
"alternative_field" : "alternative_names" ,
"log_color" : "green" ,
},
"organizations" : {
"key_field" : ( "name" , "type" ),
"key_type" : tuple ,
"alternative_field" : "alternative_names" ,
"log_color" : "blue" ,
},
"locations" : {
"key_field" : ( "name" , "type" ),
"key_type" : tuple ,
"alternative_field" : "alternative_names" ,
"log_color" : "blue" ,
},
"events" : {
"key_field" : ( "title" , "start_date" ),
"key_type" : tuple ,
"alternative_field" : "alternative_titles" ,
"log_color" : "blue" ,
},
}
Methods
merge_entities()
Merge extracted entities with the existing entity database using evidence-first batched pipeline.
stats = merger.merge_entities(
extracted_entities: List[Dict[ str , Any]],
entities: Dict[ str , Dict],
article_id: str ,
article_title: str ,
article_url: str ,
article_published_date: Any,
article_content: str ,
extraction_timestamp: str ,
model_type: str = "gemini" ,
similarity_threshold: Optional[ float ] = None ,
domain: str = "guantanamo" ,
domain_config: Optional[DomainConfig] = None
) -> MergeStats
List of newly extracted entities to merge (from extraction phase)
The existing entity database (mutated in-place). Structure: {
"people" : { "John Doe" : { ... entity ... }},
"organizations" : { ... },
"locations" : { ... },
"events" : { ... }
}
Unique identifier for the source article
Article title (stored in entity article links)
Article URL (stored in entity article links)
Article publication date (stored in entity article links)
Full article text (used to build evidence context windows)
ISO timestamp when extraction occurred
Either "gemini" (cloud) or "ollama" (local) for match checking and profile generation
similarity_threshold
Optional[float]
default: "None"
Cosine similarity threshold (0-1). If None, uses domain config default for this entity type.
Domain configuration name
domain_config
Optional[DomainConfig]
default: "None"
Preloaded domain config. If None, constructs from domain parameter.
Statistics object with counts: @dataclass
class MergeStats :
new: int = 0 # Created entities
merged: int = 0 # Merged into existing
skipped: int = 0 # Match rejected
disputed: int = 0 # Gray-band arbitrations
errors: int = 0 # Processing failures
@ property
def total ( self ) -> int :
return self .new + self .merged + self .skipped + self .disputed + self .errors
find_similar_entity()
Find the most similar entity using lexical blocking + embedding similarity.
similar_key, similarity_score = merger.find_similar_entity(
entity_key: Union[ str , Tuple],
entity_embedding: List[ float ],
entities: Dict[ str , Dict],
similarity_threshold: float = SIMILARITY_THRESHOLD ,
* ,
embedding_model: Optional[ str ] = None ,
embedding_dim: Optional[ int ] = None ,
lexical_blocking_config: Optional[Dict[ str , Any]] = None ,
query_entity: Optional[Dict[ str , Any]] = None ,
equivalence_groups: Optional[List[List[ str ]]] = None
) -> Tuple[Optional[Union[ str , Tuple]], Optional[ float ]]
entity_key
Union[str, Tuple]
required
Entity key (str for people, tuple for orgs/locations/events)
Evidence embedding vector for the entity
Entity database to search
similarity_threshold
float
default: "SIMILARITY_THRESHOLD"
Minimum cosine similarity to consider a match
embedding_model
Optional[str]
default: "None"
Model name for compatibility checking
embedding_dim
Optional[int]
default: "None"
Embedding dimension for compatibility checking
lexical_blocking_config
Optional[Dict[str, Any]]
default: "None"
Lexical blocking configuration: {
"enabled" : True ,
"threshold" : 60 , # RapidFuzz WRatio cutoff
"max_candidates" : 50 # Max shortlist size
}
query_entity
Optional[Dict[str, Any]]
default: "None"
Full entity dict (enables variant-aware blocking using aliases)
equivalence_groups
Optional[List[List[str]]]
default: "None"
Name equivalence groups from domain config. Example: [[ "JTF-GTMO" , "Joint Task Force Guantanamo" ], ... ]
return
Tuple[Optional[Union[str, Tuple]], Optional[float]]
Returns (best_match_key, similarity_score) or (None, None) if no match found. Special case: Exact key match with incompatible embeddings returns (key, 1.0) to defer decision to match checker.
Show Lexical Blocking Details
When lexical_blocking_config["enabled"] == True, the method:
Builds variant texts for each candidate (canonical name + aliases + alternative names + acronyms)
Runs RapidFuzz WRatio scoring against all variant combinations
Adds equivalence group bypass (if query name is in a group, all group members bypass the threshold)
Adds names_likely_same() deterministic bypass (handles initials, suffixes, common patterns)
Returns shortlist of candidates (max max_candidates)
Cosine similarity runs only on shortlisted candidates
This reduces similarity search from O(n) to O(k) where k is much less than n.
Merge Pipeline Phases
The merge_entities() method operates in distinct phases:
Phase 1: Build Evidence Texts
For each valid entity, constructs a deterministic pseudo-profile from structured fields + article context windows:
evidence_text = merger._build_evidence_text(
entity_key = entity_key,
entity_dict = entity_dict,
article_content = article_content,
max_chars = 1500 ,
window_chars = 240 ,
max_windows = 3
)
Context windows are extracted around entity mentions in the article (case-insensitive). Falls back to article prefix if no mentions found.
Phase 2: Batch-Embed Evidence
All evidence texts are embedded in batches using EmbeddingManager:
evidence_texts = [wi.evidence_text for wi in work_items]
all_evidence_vecs, model, dim, fingerprint = _batch_embed_texts(
embedding_manager,
evidence_texts,
batch_size = 32
)
This replaces the old per-entity embedding approach and dramatically improves performance.
Phase 3: Sequential Merge Decisions
For each entity (using precomputed embeddings):
Similarity search - Find best candidate via find_similar_entity()
Low-quality guard - Skip merge if candidate has generic name
Match check - LLM verifies the merge using evidence texts:
result = cloud_model_check_match(
new_name, existing_name,
evidence_text, existing_profile_text,
entity_type = entity_type
)
Gray-band routing - If similarity is near threshold AND match confidence is uncertain, route to dispute agent:
is_gray_band = abs (similarity - threshold) <= MERGE_GRAY_BAND_DELTA
is_uncertain = result.confidence < MERGE_UNCERTAIN_CONFIDENCE_CUTOFF
if is_gray_band and is_uncertain:
dispute_decision = run_merge_dispute_agent( ... )
should_merge = dispute_decision.action == MergeDisputeAction. MERGE
Canonical name selection - Pick better name via _pick_canonical_key():
Scores based on length, completeness, acronym derivation
Containment bonus: longer name wins
Acronym derivation bonus: full form wins over abbreviation
Swaps entity key if incoming name is better (rare but supported)
Profile update - Either update_profile() (new article) or create_profile() (repair missing profile)
Alias absorption - Merge alternative names and aliases from incoming entity
Phase 4 & 5: Batch-Embed Profiles
All profile texts (from creates and updates) are embedded in batches and committed:
profile_texts = [p.profile_text for p in pending_profiles]
prof_vecs, prof_model, prof_dim, prof_fp = _batch_embed_texts(
embedding_manager, profile_texts, batch_size = 32
)
for j, pending in enumerate (pending_profiles):
pending.entity_ref[ "profile_embedding" ] = prof_vecs[j]
pending.entity_ref[ "profile_embedding_model" ] = prof_model
# ...
Embedding Compatibility
When comparing embeddings, the merger checks compatibility:
def _embeddings_compatible (
new_dim : int ,
existing_entity : Dict[ str , Any],
new_model : Optional[ str ] = None
) -> bool :
existing_model, existing_dim = _get_search_embedding_meta(existing_entity)
if existing_dim != new_dim:
return False # Can't compare different dimensions
if new_model and existing_model and new_model != existing_model:
return False # Can't compare different model families
return True
Incompatible embeddings are skipped during similarity search. For exact key matches with incompatible embeddings, returns similarity=1.0 to force match checker evaluation (prevents duplicates after model changes).
Canonical Name Selection
The _pick_canonical_key() method scores names and picks the better one:
canonical_key, demoted_key, swapped = merger._pick_canonical_key(
existing_key = "John Smith" ,
incoming_key = "Dr. John Smith"
)
# Returns: ("Dr. John Smith", "John Smith", True)
Scoring criteria:
Length (longer = better)
Completeness (avoids initials, includes titles)
Containment bonus: +1.0 if one name fully contains the other
Acronym derivation bonus: +2.0 for full form over acronym
Swap threshold: incoming must score >0.3 higher to replace existing
When swapped, the entity is re-keyed in the database and the demoted name moves to alternative_names.
Example Usage
from src.engine.mergers import EntityMerger
from src.config_loader import DomainConfig
merger = EntityMerger( "people" )
domain_config = DomainConfig( "guantanamo" )
entities = {
"people" : {},
"organizations" : {},
"locations" : {},
"events" : {}
}
extracted = [
{ "name" : "Geoffrey Miller" , "alternative_names" : []},
{ "name" : "Major General Geoffrey Miller" , "alternative_names" : []}
]
stats = merger.merge_entities(
extracted_entities = extracted,
entities = entities,
article_id = "doc123" ,
article_title = "JTF-GTMO Leadership Changes" ,
article_url = "https://example.com/doc123" ,
article_published_date = "2002-11-15" ,
article_content = article_text,
extraction_timestamp = "2024-01-15T10:30:00Z" ,
model_type = "gemini" ,
domain = "guantanamo" ,
domain_config = domain_config
)
print ( f "Created: { stats.new } , Merged: { stats.merged } , Skipped: { stats.skipped } " )
# Likely output: "Created: 1, Merged: 1, Skipped: 0"
# (second mention merges into first, canonical name selected)
Convenience Factory Functions
from src.engine.mergers import (
create_people_merger,
create_organizations_merger,
create_locations_merger,
create_events_merger
)
people_merger = create_people_merger()
org_merger = create_organizations_merger()
loc_merger = create_locations_merger()
event_merger = create_events_merger()
Source Location
~/workspace/source/src/engine/mergers.py