Skip to main content

Overview

The EntityMerger class orchestrates entity deduplication and profile merging using a multi-stage pipeline:
  1. Lexical blocking - RapidFuzz fuzzy matching with variant-aware expansion (aliases, acronyms, equivalence groups)
  2. Batched embeddings - Precompute evidence embeddings for all entities in a single batch call
  3. Similarity search - Cosine similarity over shortlisted candidates
  4. Match verification - LLM confirms or rejects the merge
  5. Gray-band arbitration - Dispute agent resolves uncertain cases
  6. Canonical name selection - Picks the better name and demotes the other to alternative names
  7. Profile update - Merges article evidence and regenerates the profile embedding

Class Definition

from src.engine.mergers import EntityMerger

merger = EntityMerger(entity_type: str)
entity_type
str
required
The type of entity to merge. Supported values:
  • "people" - Person entities (keyed by name)
  • "organizations" - Organization entities (keyed by (name, type) tuple)
  • "locations" - Location entities (keyed by (name, type) tuple)
  • "events" - Event entities (keyed by (title, start_date) tuple)

Raises

  • ValueError - If entity_type is not one of the supported values

Entity Type Configuration

Each entity type has specific configuration for key fields, alternative name attributes, and logging:
ENTITY_CONFIGS = {
    "people": {
        "key_field": "name",
        "key_type": str,
        "alternative_field": "alternative_names",
        "log_color": "green",
    },
    "organizations": {
        "key_field": ("name", "type"),
        "key_type": tuple,
        "alternative_field": "alternative_names",
        "log_color": "blue",
    },
    "locations": {
        "key_field": ("name", "type"),
        "key_type": tuple,
        "alternative_field": "alternative_names",
        "log_color": "blue",
    },
    "events": {
        "key_field": ("title", "start_date"),
        "key_type": tuple,
        "alternative_field": "alternative_titles",
        "log_color": "blue",
    },
}

Methods

merge_entities()

Merge extracted entities with the existing entity database using evidence-first batched pipeline.
stats = merger.merge_entities(
    extracted_entities: List[Dict[str, Any]],
    entities: Dict[str, Dict],
    article_id: str,
    article_title: str,
    article_url: str,
    article_published_date: Any,
    article_content: str,
    extraction_timestamp: str,
    model_type: str = "gemini",
    similarity_threshold: Optional[float] = None,
    domain: str = "guantanamo",
    domain_config: Optional[DomainConfig] = None
) -> MergeStats
extracted_entities
List[Dict[str, Any]]
required
List of newly extracted entities to merge (from extraction phase)
entities
Dict[str, Dict]
required
The existing entity database (mutated in-place). Structure:
{
    "people": {"John Doe": {...entity...}},
    "organizations": {...},
    "locations": {...},
    "events": {...}
}
article_id
str
required
Unique identifier for the source article
article_title
str
required
Article title (stored in entity article links)
article_url
str
required
Article URL (stored in entity article links)
article_published_date
Any
required
Article publication date (stored in entity article links)
article_content
str
required
Full article text (used to build evidence context windows)
extraction_timestamp
str
required
ISO timestamp when extraction occurred
model_type
str
default:"gemini"
Either "gemini" (cloud) or "ollama" (local) for match checking and profile generation
similarity_threshold
Optional[float]
default:"None"
Cosine similarity threshold (0-1). If None, uses domain config default for this entity type.
domain
str
default:"guantanamo"
Domain configuration name
domain_config
Optional[DomainConfig]
default:"None"
Preloaded domain config. If None, constructs from domain parameter.
return
MergeStats
Statistics object with counts:
@dataclass
class MergeStats:
    new: int = 0          # Created entities
    merged: int = 0       # Merged into existing
    skipped: int = 0      # Match rejected
    disputed: int = 0     # Gray-band arbitrations
    errors: int = 0       # Processing failures
    
    @property
    def total(self) -> int:
        return self.new + self.merged + self.skipped + self.disputed + self.errors

find_similar_entity()

Find the most similar entity using lexical blocking + embedding similarity.
similar_key, similarity_score = merger.find_similar_entity(
    entity_key: Union[str, Tuple],
    entity_embedding: List[float],
    entities: Dict[str, Dict],
    similarity_threshold: float = SIMILARITY_THRESHOLD,
    *,
    embedding_model: Optional[str] = None,
    embedding_dim: Optional[int] = None,
    lexical_blocking_config: Optional[Dict[str, Any]] = None,
    query_entity: Optional[Dict[str, Any]] = None,
    equivalence_groups: Optional[List[List[str]]] = None
) -> Tuple[Optional[Union[str, Tuple]], Optional[float]]
entity_key
Union[str, Tuple]
required
Entity key (str for people, tuple for orgs/locations/events)
entity_embedding
List[float]
required
Evidence embedding vector for the entity
entities
Dict[str, Dict]
required
Entity database to search
similarity_threshold
float
default:"SIMILARITY_THRESHOLD"
Minimum cosine similarity to consider a match
embedding_model
Optional[str]
default:"None"
Model name for compatibility checking
embedding_dim
Optional[int]
default:"None"
Embedding dimension for compatibility checking
lexical_blocking_config
Optional[Dict[str, Any]]
default:"None"
Lexical blocking configuration:
{
    "enabled": True,
    "threshold": 60,        # RapidFuzz WRatio cutoff
    "max_candidates": 50    # Max shortlist size
}
query_entity
Optional[Dict[str, Any]]
default:"None"
Full entity dict (enables variant-aware blocking using aliases)
equivalence_groups
Optional[List[List[str]]]
default:"None"
Name equivalence groups from domain config. Example:
[["JTF-GTMO", "Joint Task Force Guantanamo"], ...]
return
Tuple[Optional[Union[str, Tuple]], Optional[float]]
Returns (best_match_key, similarity_score) or (None, None) if no match found.Special case: Exact key match with incompatible embeddings returns (key, 1.0) to defer decision to match checker.

Merge Pipeline Phases

The merge_entities() method operates in distinct phases:

Phase 1: Build Evidence Texts

For each valid entity, constructs a deterministic pseudo-profile from structured fields + article context windows:
evidence_text = merger._build_evidence_text(
    entity_key=entity_key,
    entity_dict=entity_dict,
    article_content=article_content,
    max_chars=1500,
    window_chars=240,
    max_windows=3
)
Context windows are extracted around entity mentions in the article (case-insensitive). Falls back to article prefix if no mentions found.

Phase 2: Batch-Embed Evidence

All evidence texts are embedded in batches using EmbeddingManager:
evidence_texts = [wi.evidence_text for wi in work_items]
all_evidence_vecs, model, dim, fingerprint = _batch_embed_texts(
    embedding_manager, 
    evidence_texts, 
    batch_size=32
)
This replaces the old per-entity embedding approach and dramatically improves performance.

Phase 3: Sequential Merge Decisions

For each entity (using precomputed embeddings):
  1. Similarity search - Find best candidate via find_similar_entity()
  2. Low-quality guard - Skip merge if candidate has generic name
  3. Match check - LLM verifies the merge using evidence texts:
    result = cloud_model_check_match(
        new_name, existing_name,
        evidence_text, existing_profile_text,
        entity_type=entity_type
    )
    
  4. Gray-band routing - If similarity is near threshold AND match confidence is uncertain, route to dispute agent:
    is_gray_band = abs(similarity - threshold) <= MERGE_GRAY_BAND_DELTA
    is_uncertain = result.confidence < MERGE_UNCERTAIN_CONFIDENCE_CUTOFF
    
    if is_gray_band and is_uncertain:
        dispute_decision = run_merge_dispute_agent(...)
        should_merge = dispute_decision.action == MergeDisputeAction.MERGE
    
  5. Canonical name selection - Pick better name via _pick_canonical_key():
    • Scores based on length, completeness, acronym derivation
    • Containment bonus: longer name wins
    • Acronym derivation bonus: full form wins over abbreviation
    • Swaps entity key if incoming name is better (rare but supported)
  6. Profile update - Either update_profile() (new article) or create_profile() (repair missing profile)
  7. Alias absorption - Merge alternative names and aliases from incoming entity

Phase 4 & 5: Batch-Embed Profiles

All profile texts (from creates and updates) are embedded in batches and committed:
profile_texts = [p.profile_text for p in pending_profiles]
prof_vecs, prof_model, prof_dim, prof_fp = _batch_embed_texts(
    embedding_manager, profile_texts, batch_size=32
)

for j, pending in enumerate(pending_profiles):
    pending.entity_ref["profile_embedding"] = prof_vecs[j]
    pending.entity_ref["profile_embedding_model"] = prof_model
    # ...

Embedding Compatibility

When comparing embeddings, the merger checks compatibility:
def _embeddings_compatible(
    new_dim: int,
    existing_entity: Dict[str, Any],
    new_model: Optional[str] = None
) -> bool:
    existing_model, existing_dim = _get_search_embedding_meta(existing_entity)
    
    if existing_dim != new_dim:
        return False  # Can't compare different dimensions
    
    if new_model and existing_model and new_model != existing_model:
        return False  # Can't compare different model families
    
    return True
Incompatible embeddings are skipped during similarity search. For exact key matches with incompatible embeddings, returns similarity=1.0 to force match checker evaluation (prevents duplicates after model changes).

Canonical Name Selection

The _pick_canonical_key() method scores names and picks the better one:
canonical_key, demoted_key, swapped = merger._pick_canonical_key(
    existing_key="John Smith",
    incoming_key="Dr. John Smith"
)
# Returns: ("Dr. John Smith", "John Smith", True)
Scoring criteria:
  • Length (longer = better)
  • Completeness (avoids initials, includes titles)
  • Containment bonus: +1.0 if one name fully contains the other
  • Acronym derivation bonus: +2.0 for full form over acronym
  • Swap threshold: incoming must score >0.3 higher to replace existing
When swapped, the entity is re-keyed in the database and the demoted name moves to alternative_names.

Example Usage

from src.engine.mergers import EntityMerger
from src.config_loader import DomainConfig

merger = EntityMerger("people")
domain_config = DomainConfig("guantanamo")

entities = {
    "people": {},
    "organizations": {},
    "locations": {},
    "events": {}
}

extracted = [
    {"name": "Geoffrey Miller", "alternative_names": []},
    {"name": "Major General Geoffrey Miller", "alternative_names": []}
]

stats = merger.merge_entities(
    extracted_entities=extracted,
    entities=entities,
    article_id="doc123",
    article_title="JTF-GTMO Leadership Changes",
    article_url="https://example.com/doc123",
    article_published_date="2002-11-15",
    article_content=article_text,
    extraction_timestamp="2024-01-15T10:30:00Z",
    model_type="gemini",
    domain="guantanamo",
    domain_config=domain_config
)

print(f"Created: {stats.new}, Merged: {stats.merged}, Skipped: {stats.skipped}")
# Likely output: "Created: 1, Merged: 1, Skipped: 0"
# (second mention merges into first, canonical name selected)

Convenience Factory Functions

from src.engine.mergers import (
    create_people_merger,
    create_organizations_merger,
    create_locations_merger,
    create_events_merger
)

people_merger = create_people_merger()
org_merger = create_organizations_merger()
loc_merger = create_locations_merger()
event_merger = create_events_merger()

Source Location

~/workspace/source/src/engine/mergers.py

Build docs developers (and LLMs) love