Overview
The ArticleProcessor class orchestrates article-level operations in the Hinbox pipeline:
- Relevance checking - Determine if an article is relevant to the domain
- Entity extraction - Extract all four entity types (people, organizations, locations, events)
- QC retry - Automatically retry extraction when quality control detects severe issues
- Metadata tracking - Aggregate extraction outcomes and reflection attempts
All methods return PhaseOutcome objects that carry both usable values and observability metadata.
Class Definition
from src.engine.article_processor import ArticleProcessor
processor = ArticleProcessor(domain: str, model_type: str = "gemini")
Domain configuration name (e.g., "guantanamo")
LLM mode: "gemini" (cloud) or "ollama" (local)
Attributes
processor.domain: str # Domain name
processor.model_type: str # "gemini" or "ollama"
processor.specific_model: str # Resolved model name (CLOUD_MODEL or OLLAMA_MODEL)
processor.logger: Logger # Module logger
Methods
check_relevance()
Check whether an article is relevant to the configured domain.
outcome = processor.check_relevance(
article_content: str,
article_id: str
) -> PhaseOutcome
Full article text to evaluate
Article identifier (for error context)
PhaseOutcome with:
value - bool - True if relevant, False if not
meta["reason"] - str - LLM explanation for the decision
status - "ok" or "fail"
fallback - False (if relevance check itself failed)
Example
processor = ArticleProcessor(domain="guantanamo", model_type="gemini")
outcome = processor.check_relevance(
article_content="Detainee transfer to Guantanamo Bay detention facility...",
article_id="doc123"
)
if outcome.value:
print(f"Article is relevant: {outcome.meta['reason']}")
else:
print(f"Article not relevant: {outcome.meta['reason']}")
Relevance checking uses domain-specific prompts from configs/<domain>/prompts/relevance.txt. The LLM returns a structured response with is_relevant: bool and reason: str.
Extract a single entity type from article text with automatic QC retry.
outcome = processor.extract_single_entity_type(
entity_type: str,
article_content: str,
article_id: str = ""
) -> PhaseOutcome
Entity type to extract: "people", "organizations", "locations", "events"
Article identifier (for logging)
PhaseOutcome with:
value - List[Dict] - Cleaned entity dictionaries (possibly empty on failure)
counts - Extraction statistics:
raw_count - Entities before QC
dropped_missing_required - Dropped for missing required fields
deduped - Duplicates removed
final_count - Entities after QC
flags - QC warning flags (e.g., ["zero_entities", "high_drop_rate"])
meta - QC fixes applied + retry metadata:
qc_fixes - List of corrections made
retry_attempted - bool - Whether retry was triggered
retry_trigger_flags - Flags that triggered retry
retry_output_count - Entity count from retry attempt
retry_used - bool - Whether retry result was kept
QC Retry Logic
When the first extraction attempt triggers severe QC flags, a single retry is attempted with a repair hint:
# Retry triggers
_RETRY_TRIGGER_FLAGS = {
"zero_entities",
"high_drop_rate",
"many_duplicates",
"many_low_quality_names"
}
# First attempt
raw_entities = extractor.extract_cloud(text=article_content)
cleaned, qc_report = run_extraction_qc(entity_type, raw_entities, domain)
if should_retry_extraction(qc_report.flags):
# Build repair hint
hint = f"""IMPORTANT — Previous extraction of {entity_type} had quality issues
({', '.join(flags)}). Please ensure all required fields are populated,
avoid duplicate entries, and return every relevant entity found in the
text as a complete JSON array."""
# Retry with hint
raw_entities_v2 = extractor.extract_cloud(text=article_content, repair_hint=hint)
cleaned_v2, qc_report_v2 = run_extraction_qc(entity_type, raw_entities_v2, domain)
# Pick better result (higher count wins, fewer severe flags breaks ties)
if qc_report_v2.output_count > qc_report.output_count:
use_v2 = True
The repair_hint is appended to the system prompt, guiding the LLM to fix specific issues.
Example
outcome = processor.extract_single_entity_type(
entity_type="people",
article_content=article_text,
article_id="doc123"
)
people = outcome.value
print(f"Extracted {len(people)} people")
print(f"QC flags: {outcome.flags}")
print(f"Retry attempted: {outcome.meta.get('retry_attempted', False)}")
if outcome.meta.get("retry_used"):
print(f"Retry produced better results ({outcome.meta['retry_output_count']} entities)")
Extract all four entity types from an article (optionally in parallel).
entities = processor.extract_all_entities(
article_content: str,
article_id: str,
processing_metadata: Dict[str, Any],
verbose: bool = False,
max_workers: int = 1
) -> Dict[str, List[Dict]]
Metadata dictionary (mutated in-place with extraction outcomes)
Number of parallel extraction workers. When > 1, the four entity types are extracted concurrently via ThreadPoolExecutor (bounded by the global LLM semaphore).
Dictionary mapping entity types to extracted entities:{
"people": [{"name": "...", ...}, ...],
"organizations": [{"name": "...", "type": "...", ...}, ...],
"locations": [{"name": "...", "type": "...", ...}, ...],
"events": [{"title": "...", "start_date": "...", ...}, ...]
}
The method writes extraction outcomes and statistics into processing_metadata:
processing_metadata["phase_outcomes"]["extraction"] = {
"people": {"status": "ok", "value": [...], "counts": {...}, "flags": [...]},
"organizations": {...},
"locations": {...},
"events": {...}
}
processing_metadata["phase_outcomes"]["entity_relevance"] = {
"people": {"input_count": 5, "dropped": 1, "output_count": 4},
# ...
}
processing_metadata["reflection_summary"] = {
"total_attempts": 12 # Sum of all reflection iterations across entity types
}
processing_metadata["extracted_counts"] = {
"people": 4,
"organizations": 3,
"locations": 2,
"events": 1
}
Entity Relevance Filter
After extraction, entities are filtered by article relevance (mention validation):
relevance_types = ["people", "organizations", "locations"]
for et in relevance_types:
if entities[et]:
filtered, rel_report = filter_entities_by_article_relevance(
entity_type=et,
entities=entities[et],
article_text=article_content,
domain=domain,
require_mention=True # Drop entities not mentioned in article
)
entities[et] = filtered
This conservative filter prevents false positives from hallucinated entities.
Example
processor = ArticleProcessor(domain="guantanamo", model_type="gemini")
metadata = {}
entities = processor.extract_all_entities(
article_content=article_text,
article_id="doc123",
processing_metadata=metadata,
verbose=True,
max_workers=4 # Parallel extraction
)
print(f"People: {len(entities['people'])}")
print(f"Organizations: {len(entities['organizations'])}")
print(f"Locations: {len(entities['locations'])}")
print(f"Events: {len(entities['events'])}")
print(f"Total reflection attempts: {metadata['reflection_summary']['total_attempts']}")
prepare_article_info()
Extract standard article fields from a row dictionary with robust fallbacks.
article_info = processor.prepare_article_info(
row: Dict,
row_index: int
) -> Dict[str, str]
Article row dictionary (from Parquet or other source)
Row index (used as fallback ID if no id field found)
Standardized article info:{
"id": str, # row.get("id") or row.get("article_id") or f"row_{row_index}"
"title": str, # row.get("title") or "(untitled)"
"url": str, # row.get("article_url") or row.get("url") or "#"
"published_date": str, # row.get("published_date") or ""
"content": str # row.get("clean_text") or row.get("content") or ""
}
Initialize processing metadata from existing row, preserving processed flag.
metadata = processor.initialize_processing_metadata(row: Dict) -> Dict[str, Any]
Returns a dictionary with {"processed": False} by default, or preserves existing metadata from row["processing_metadata"].
Stamp processing metadata with final extraction counts and timestamp.
processor.finalize_processing_metadata(
processing_metadata: Dict[str, Any],
extracted_entities: Dict[str, List],
extraction_timestamp: str,
verbose: bool,
row_index: int
) -> float
Mutates processing_metadata to add:
processed = True
extraction_timestamp
extracted_counts
reflection_summary (if not already present)
Returns 0.0 (historical interface compatibility).
track_reflection_attempts()
Count reflection attempts from entity reflection histories.
attempts = processor.track_reflection_attempts(
extracted_entities: Any,
entity_type: str,
verbose: bool = False
) -> int
Iterates over entities and sums len(entity["reflection_history"]) across all items. Tolerates both dict and Pydantic model shapes.
PhaseOutcome Structure
All extraction methods return PhaseOutcome objects:
from src.utils.outcomes import PhaseOutcome
outcome = PhaseOutcome.ok(
phase="extract.people",
value=[{...}, {...}], # Extracted entities
counts={"raw_count": 10, "final_count": 8},
flags=["high_drop_rate"],
meta={"qc_fixes": ["removed_duplicates"], "retry_attempted": True}
)
# Access results
entities = outcome.value
is_success = outcome.status == "ok"
qc_flags = outcome.flags
retry_info = outcome.meta
# Serialize to dict
metadata_dict = outcome.to_metadata_dict()
PhaseOutcome is a structured result object used throughout the pipeline.
When max_workers > 1, extraction runs in parallel:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as pool:
futures = {
et: pool.submit(
self.extract_single_entity_type,
et, article_content, article_id
)
for et in ["people", "organizations", "locations", "events"]
}
# Collect in stable order (not completion order)
for et in ["people", "organizations", "locations", "events"]:
outcome = futures[et].result()
entities[et] = outcome.value or []
The global LLM semaphore (in src/constants.py) bounds concurrency to prevent API rate limits.
Integration Example
Complete article processing workflow:
from src.engine.article_processor import ArticleProcessor
from datetime import datetime, UTC
processor = ArticleProcessor(domain="guantanamo", model_type="gemini")
# 1. Check relevance
relevance_outcome = processor.check_relevance(
article_content=article_text,
article_id="doc123"
)
if not relevance_outcome.value:
print(f"Article not relevant: {relevance_outcome.meta['reason']}")
exit()
# 2. Extract entities
metadata = {}
entities = processor.extract_all_entities(
article_content=article_text,
article_id="doc123",
processing_metadata=metadata,
verbose=True,
max_workers=4
)
# 3. Finalize metadata
processor.finalize_processing_metadata(
processing_metadata=metadata,
extracted_entities=entities,
extraction_timestamp=datetime.now(UTC).isoformat(),
verbose=True,
row_index=0
)
print(f"Extracted: {metadata['extracted_counts']}")
print(f"Reflections: {metadata['reflection_summary']['total_attempts']}")
Source Location
~/workspace/source/src/engine/article_processor.py