Skip to main content

Overview

The ArticleProcessor class orchestrates article-level operations in the Hinbox pipeline:
  1. Relevance checking - Determine if an article is relevant to the domain
  2. Entity extraction - Extract all four entity types (people, organizations, locations, events)
  3. QC retry - Automatically retry extraction when quality control detects severe issues
  4. Metadata tracking - Aggregate extraction outcomes and reflection attempts
All methods return PhaseOutcome objects that carry both usable values and observability metadata.

Class Definition

from src.engine.article_processor import ArticleProcessor

processor = ArticleProcessor(domain: str, model_type: str = "gemini")
domain
str
required
Domain configuration name (e.g., "guantanamo")
model_type
str
default:"gemini"
LLM mode: "gemini" (cloud) or "ollama" (local)

Attributes

processor.domain: str           # Domain name
processor.model_type: str       # "gemini" or "ollama"
processor.specific_model: str   # Resolved model name (CLOUD_MODEL or OLLAMA_MODEL)
processor.logger: Logger        # Module logger

Methods

check_relevance()

Check whether an article is relevant to the configured domain.
outcome = processor.check_relevance(
    article_content: str,
    article_id: str
) -> PhaseOutcome
article_content
str
required
Full article text to evaluate
article_id
str
required
Article identifier (for error context)
return
PhaseOutcome
PhaseOutcome with:
  • value - bool - True if relevant, False if not
  • meta["reason"] - str - LLM explanation for the decision
  • status - "ok" or "fail"
  • fallback - False (if relevance check itself failed)

Example

processor = ArticleProcessor(domain="guantanamo", model_type="gemini")

outcome = processor.check_relevance(
    article_content="Detainee transfer to Guantanamo Bay detention facility...",
    article_id="doc123"
)

if outcome.value:
    print(f"Article is relevant: {outcome.meta['reason']}")
else:
    print(f"Article not relevant: {outcome.meta['reason']}")
Relevance checking uses domain-specific prompts from configs/<domain>/prompts/relevance.txt. The LLM returns a structured response with is_relevant: bool and reason: str.

extract_single_entity_type()

Extract a single entity type from article text with automatic QC retry.
outcome = processor.extract_single_entity_type(
    entity_type: str,
    article_content: str,
    article_id: str = ""
) -> PhaseOutcome
entity_type
str
required
Entity type to extract: "people", "organizations", "locations", "events"
article_content
str
required
Full article text
article_id
str
default:""
Article identifier (for logging)
return
PhaseOutcome
PhaseOutcome with:
  • value - List[Dict] - Cleaned entity dictionaries (possibly empty on failure)
  • counts - Extraction statistics:
    • raw_count - Entities before QC
    • dropped_missing_required - Dropped for missing required fields
    • deduped - Duplicates removed
    • final_count - Entities after QC
  • flags - QC warning flags (e.g., ["zero_entities", "high_drop_rate"])
  • meta - QC fixes applied + retry metadata:
    • qc_fixes - List of corrections made
    • retry_attempted - bool - Whether retry was triggered
    • retry_trigger_flags - Flags that triggered retry
    • retry_output_count - Entity count from retry attempt
    • retry_used - bool - Whether retry result was kept

QC Retry Logic

When the first extraction attempt triggers severe QC flags, a single retry is attempted with a repair hint:
# Retry triggers
_RETRY_TRIGGER_FLAGS = {
    "zero_entities",
    "high_drop_rate",
    "many_duplicates",
    "many_low_quality_names"
}

# First attempt
raw_entities = extractor.extract_cloud(text=article_content)
cleaned, qc_report = run_extraction_qc(entity_type, raw_entities, domain)

if should_retry_extraction(qc_report.flags):
    # Build repair hint
    hint = f"""IMPORTANT — Previous extraction of {entity_type} had quality issues 
    ({', '.join(flags)}). Please ensure all required fields are populated, 
    avoid duplicate entries, and return every relevant entity found in the 
    text as a complete JSON array."""
    
    # Retry with hint
    raw_entities_v2 = extractor.extract_cloud(text=article_content, repair_hint=hint)
    cleaned_v2, qc_report_v2 = run_extraction_qc(entity_type, raw_entities_v2, domain)
    
    # Pick better result (higher count wins, fewer severe flags breaks ties)
    if qc_report_v2.output_count > qc_report.output_count:
        use_v2 = True
The repair_hint is appended to the system prompt, guiding the LLM to fix specific issues.

Example

outcome = processor.extract_single_entity_type(
    entity_type="people",
    article_content=article_text,
    article_id="doc123"
)

people = outcome.value
print(f"Extracted {len(people)} people")
print(f"QC flags: {outcome.flags}")
print(f"Retry attempted: {outcome.meta.get('retry_attempted', False)}")

if outcome.meta.get("retry_used"):
    print(f"Retry produced better results ({outcome.meta['retry_output_count']} entities)")

extract_all_entities()

Extract all four entity types from an article (optionally in parallel).
entities = processor.extract_all_entities(
    article_content: str,
    article_id: str,
    processing_metadata: Dict[str, Any],
    verbose: bool = False,
    max_workers: int = 1
) -> Dict[str, List[Dict]]
article_content
str
required
Full article text
article_id
str
required
Article identifier
processing_metadata
Dict[str, Any]
required
Metadata dictionary (mutated in-place with extraction outcomes)
verbose
bool
default:"False"
Enable detailed logging
max_workers
int
default:"1"
Number of parallel extraction workers. When > 1, the four entity types are extracted concurrently via ThreadPoolExecutor (bounded by the global LLM semaphore).
return
Dict[str, List[Dict]]
Dictionary mapping entity types to extracted entities:
{
    "people": [{"name": "...", ...}, ...],
    "organizations": [{"name": "...", "type": "...", ...}, ...],
    "locations": [{"name": "...", "type": "...", ...}, ...],
    "events": [{"title": "...", "start_date": "...", ...}, ...]
}

Metadata Mutation

The method writes extraction outcomes and statistics into processing_metadata:
processing_metadata["phase_outcomes"]["extraction"] = {
    "people": {"status": "ok", "value": [...], "counts": {...}, "flags": [...]},
    "organizations": {...},
    "locations": {...},
    "events": {...}
}

processing_metadata["phase_outcomes"]["entity_relevance"] = {
    "people": {"input_count": 5, "dropped": 1, "output_count": 4},
    # ...
}

processing_metadata["reflection_summary"] = {
    "total_attempts": 12  # Sum of all reflection iterations across entity types
}

processing_metadata["extracted_counts"] = {
    "people": 4,
    "organizations": 3,
    "locations": 2,
    "events": 1
}

Entity Relevance Filter

After extraction, entities are filtered by article relevance (mention validation):
relevance_types = ["people", "organizations", "locations"]

for et in relevance_types:
    if entities[et]:
        filtered, rel_report = filter_entities_by_article_relevance(
            entity_type=et,
            entities=entities[et],
            article_text=article_content,
            domain=domain,
            require_mention=True  # Drop entities not mentioned in article
        )
        entities[et] = filtered
This conservative filter prevents false positives from hallucinated entities.

Example

processor = ArticleProcessor(domain="guantanamo", model_type="gemini")

metadata = {}
entities = processor.extract_all_entities(
    article_content=article_text,
    article_id="doc123",
    processing_metadata=metadata,
    verbose=True,
    max_workers=4  # Parallel extraction
)

print(f"People: {len(entities['people'])}")
print(f"Organizations: {len(entities['organizations'])}")
print(f"Locations: {len(entities['locations'])}")
print(f"Events: {len(entities['events'])}")

print(f"Total reflection attempts: {metadata['reflection_summary']['total_attempts']}")

prepare_article_info()

Extract standard article fields from a row dictionary with robust fallbacks.
article_info = processor.prepare_article_info(
    row: Dict,
    row_index: int
) -> Dict[str, str]
row
Dict
required
Article row dictionary (from Parquet or other source)
row_index
int
required
Row index (used as fallback ID if no id field found)
return
Dict[str, str]
Standardized article info:
{
    "id": str,              # row.get("id") or row.get("article_id") or f"row_{row_index}"
    "title": str,           # row.get("title") or "(untitled)"
    "url": str,             # row.get("article_url") or row.get("url") or "#"
    "published_date": str,  # row.get("published_date") or ""
    "content": str          # row.get("clean_text") or row.get("content") or ""
}

initialize_processing_metadata()

Initialize processing metadata from existing row, preserving processed flag.
metadata = processor.initialize_processing_metadata(row: Dict) -> Dict[str, Any]
Returns a dictionary with {"processed": False} by default, or preserves existing metadata from row["processing_metadata"].

finalize_processing_metadata()

Stamp processing metadata with final extraction counts and timestamp.
processor.finalize_processing_metadata(
    processing_metadata: Dict[str, Any],
    extracted_entities: Dict[str, List],
    extraction_timestamp: str,
    verbose: bool,
    row_index: int
) -> float
Mutates processing_metadata to add:
  • processed = True
  • extraction_timestamp
  • extracted_counts
  • reflection_summary (if not already present)
Returns 0.0 (historical interface compatibility).

track_reflection_attempts()

Count reflection attempts from entity reflection histories.
attempts = processor.track_reflection_attempts(
    extracted_entities: Any,
    entity_type: str,
    verbose: bool = False
) -> int
Iterates over entities and sums len(entity["reflection_history"]) across all items. Tolerates both dict and Pydantic model shapes.

PhaseOutcome Structure

All extraction methods return PhaseOutcome objects:
from src.utils.outcomes import PhaseOutcome

outcome = PhaseOutcome.ok(
    phase="extract.people",
    value=[{...}, {...}],  # Extracted entities
    counts={"raw_count": 10, "final_count": 8},
    flags=["high_drop_rate"],
    meta={"qc_fixes": ["removed_duplicates"], "retry_attempted": True}
)

# Access results
entities = outcome.value
is_success = outcome.status == "ok"
qc_flags = outcome.flags
retry_info = outcome.meta

# Serialize to dict
metadata_dict = outcome.to_metadata_dict()
PhaseOutcome is a structured result object used throughout the pipeline.

Parallel Extraction

When max_workers > 1, extraction runs in parallel:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as pool:
    futures = {
        et: pool.submit(
            self.extract_single_entity_type,
            et, article_content, article_id
        )
        for et in ["people", "organizations", "locations", "events"]
    }
    
    # Collect in stable order (not completion order)
    for et in ["people", "organizations", "locations", "events"]:
        outcome = futures[et].result()
        entities[et] = outcome.value or []
The global LLM semaphore (in src/constants.py) bounds concurrency to prevent API rate limits.

Integration Example

Complete article processing workflow:
from src.engine.article_processor import ArticleProcessor
from datetime import datetime, UTC

processor = ArticleProcessor(domain="guantanamo", model_type="gemini")

# 1. Check relevance
relevance_outcome = processor.check_relevance(
    article_content=article_text,
    article_id="doc123"
)

if not relevance_outcome.value:
    print(f"Article not relevant: {relevance_outcome.meta['reason']}")
    exit()

# 2. Extract entities
metadata = {}
entities = processor.extract_all_entities(
    article_content=article_text,
    article_id="doc123",
    processing_metadata=metadata,
    verbose=True,
    max_workers=4
)

# 3. Finalize metadata
processor.finalize_processing_metadata(
    processing_metadata=metadata,
    extracted_entities=entities,
    extraction_timestamp=datetime.now(UTC).isoformat(),
    verbose=True,
    row_index=0
)

print(f"Extracted: {metadata['extracted_counts']}")
print(f"Reflections: {metadata['reflection_summary']['total_attempts']}")

Source Location

~/workspace/source/src/engine/article_processor.py

Build docs developers (and LLMs) love