Skip to main content

Overview

REMem includes built-in metrics for QA and retrieval evaluation. You can add custom metrics by implementing the BaseMetric class and registering them with the evaluation pipeline.

Base Metric Interface

From evaluation/base.py:10-34:
from typing import Any, Dict, List, Optional, Tuple
from remem.utils.config_utils import BaseConfig

class BaseMetric:
    global_config: BaseConfig
    metric_name = "base"
    
    def __init__(self, global_config: Optional[BaseConfig] = None):
        if global_config is None:
            self.global_config = BaseConfig()
        else:
            self.global_config = global_config
    
    def calculate_metric_scores(
        self, 
        *args: Any, 
        **kwargs: Any
    ) -> Tuple[Dict[str, Any], List[Any]]:
        """
        Calculate overall metric score and per-example scores.
        
        Returns:
            Tuple[Dict[str, Any], List[Any]]:
                - Overall metrics (e.g., {"F1": 0.85})
                - Per-example metrics (e.g., [{"F1": 0.9}, {"F1": 0.8}, ...])
        """
        return {}, []

Built-in QA Metrics

Exact Match

From evaluation/qa_eval.py:16-54:
from remem.utils.eval_utils import normalize_answer
import numpy as np

class QAExactMatch(BaseMetric):
    metric_name: str = "qa_exact_match"
    
    def calculate_metric_scores(
        self,
        gold_answers: List[List[str]],
        predicted_answers: List[str],
        aggregation_fn: Callable = np.max,
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Calculate Exact Match (EM) score.
        
        Args:
            gold_answers: List of lists of gold answers per question
            predicted_answers: List of predicted answers
            aggregation_fn: How to aggregate when multiple gold answers (default: max)
        
        Returns:
            - Overall metrics: {"ExactMatch": avg_em}
            - Per-example metrics: [{"ExactMatch": em}, ...]
        """
        example_eval_results = []
        total_em = 0
        
        for gold_list, predicted in zip(gold_answers, predicted_answers):
            em_scores = [
                1.0 if normalize_answer(gold) == normalize_answer(predicted) else 0.0
                for gold in gold_list
            ]
            aggregated_em = aggregation_fn(em_scores)
            example_eval_results.append({"ExactMatch": aggregated_em})
            total_em += aggregated_em
        
        avg_em = total_em / len(gold_answers) if gold_answers else 0.0
        pooled_eval_results = {"ExactMatch": float(avg_em)}
        
        return pooled_eval_results, example_eval_results

F1 Score

From evaluation/qa_eval.py:57-109:
from collections import Counter

class QAF1Score(BaseMetric):
    metric_name: str = "qa_f1_score"
    
    def calculate_metric_scores(
        self,
        gold_answers: List[List[str]],
        predicted_answers: List[str],
        aggregation_fn: Callable = np.max,
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Calculate F1 score based on token overlap.
        """
        def compute_f1(gold: str, predicted: str) -> float:
            gold_tokens = normalize_answer(gold).split()
            predicted_tokens = normalize_answer(predicted).split()
            common = Counter(predicted_tokens) & Counter(gold_tokens)
            num_same = sum(common.values())
            
            if num_same == 0:
                return 0.0
            
            precision = 1.0 * num_same / len(predicted_tokens)
            recall = 1.0 * num_same / len(gold_tokens)
            return 2 * (precision * recall) / (precision + recall)
        
        example_eval_results = []
        total_f1 = 0.0
        
        for gold_list, predicted in zip(gold_answers, predicted_answers):
            f1_scores = [compute_f1(gold, predicted) for gold in gold_list]
            aggregated_f1 = aggregation_fn(f1_scores)
            example_eval_results.append({"F1": aggregated_f1})
            total_f1 += aggregated_f1
        
        avg_f1 = total_f1 / len(gold_answers) if gold_answers else 0.0
        pooled_eval_results = {"F1": float(avg_f1)}
        
        return pooled_eval_results, example_eval_results

Built-in Retrieval Metrics

From evaluation/retrieval_eval.py:
  • Recall - Fraction of gold documents retrieved
  • MRR - Mean Reciprocal Rank
  • NDCG - Normalized Discounted Cumulative Gain

Creating Custom Metrics

1. Basic Custom Metric

# src/remem/evaluation/custom_metric.py
from typing import Dict, List, Tuple
import numpy as np
from remem.evaluation.base import BaseMetric
from remem.utils.config_utils import BaseConfig

class SemanticSimilarity(BaseMetric):
    metric_name: str = "semantic_similarity"
    
    def __init__(self, global_config: BaseConfig = None):
        super().__init__(global_config)
        # Initialize embedding model for semantic comparison
        from sentence_transformers import SentenceTransformer
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
    
    def calculate_metric_scores(
        self,
        gold_answers: List[List[str]],
        predicted_answers: List[str],
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Calculate semantic similarity between gold and predicted answers.
        """
        from sklearn.metrics.pairwise import cosine_similarity
        
        example_eval_results = []
        total_similarity = 0.0
        
        for gold_list, predicted in zip(gold_answers, predicted_answers):
            # Encode predicted answer
            pred_embedding = self.encoder.encode([predicted])
            
            # Encode all gold answers
            gold_embeddings = self.encoder.encode(gold_list)
            
            # Compute cosine similarity
            similarities = cosine_similarity(pred_embedding, gold_embeddings)[0]
            
            # Take max similarity across gold answers
            max_similarity = float(np.max(similarities))
            
            example_eval_results.append({"SemanticSimilarity": max_similarity})
            total_similarity += max_similarity
        
        avg_similarity = total_similarity / len(gold_answers) if gold_answers else 0.0
        pooled_eval_results = {"SemanticSimilarity": avg_similarity}
        
        return pooled_eval_results, example_eval_results

2. Question-Type-Aware Metric

# src/remem/evaluation/typed_metric.py
from collections import defaultdict
from remem.evaluation.base import BaseMetric

class TypedAccuracy(BaseMetric):
    metric_name: str = "typed_accuracy"
    
    def calculate_metric_scores(
        self,
        gold_answers: List[List[str]],
        predicted_answers: List[str],
        question_types: List[str] = None,
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Calculate accuracy broken down by question type.
        
        Args:
            gold_answers: Gold standard answers
            predicted_answers: Model predictions
            question_types: List of question types (e.g., ['factoid', 'temporal', ...])
        """
        if question_types is None:
            question_types = ["unknown"] * len(gold_answers)
        
        # Track scores by type
        type_scores = defaultdict(list)
        example_eval_results = []
        
        for gold_list, predicted, q_type in zip(gold_answers, predicted_answers, question_types):
            # Compute accuracy for this example
            correct = any(
                normalize_answer(gold) == normalize_answer(predicted)
                for gold in gold_list
            )
            score = 1.0 if correct else 0.0
            
            type_scores[q_type].append(score)
            example_eval_results.append({"Accuracy": score, "QuestionType": q_type})
        
        # Aggregate by type
        pooled_eval_results = {}
        for q_type, scores in type_scores.items():
            pooled_eval_results[f"Accuracy_{q_type}"] = np.mean(scores)
        
        # Overall accuracy
        all_scores = [ex["Accuracy"] for ex in example_eval_results]
        pooled_eval_results["Accuracy_Overall"] = np.mean(all_scores)
        
        return pooled_eval_results, example_eval_results

3. Retrieval Quality Metric

# src/remem/evaluation/retrieval_quality.py
from remem.evaluation.base import BaseMetric
from remem.utils.misc_utils import QuerySolution

class ContextPrecision(BaseMetric):
    metric_name: str = "context_precision"
    
    def calculate_metric_scores(
        self,
        query_solutions: List[QuerySolution],
        gold_docs: List[List[str]],
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Calculate precision of retrieved contexts.
        
        Precision@K = (# relevant docs in top-K) / K
        """
        example_eval_results = []
        total_precision = 0.0
        
        for qs, gold_doc_list in zip(query_solutions, gold_docs):
            retrieved_docs = qs.docs[:10]  # Top-10
            
            # Count how many retrieved docs are in gold set
            relevant_count = sum(
                1 for doc in retrieved_docs
                if any(self._doc_match(doc, gold) for gold in gold_doc_list)
            )
            
            precision = relevant_count / len(retrieved_docs) if retrieved_docs else 0.0
            
            example_eval_results.append({"Precision@10": precision})
            total_precision += precision
        
        avg_precision = total_precision / len(query_solutions) if query_solutions else 0.0
        pooled_eval_results = {"Precision@10": avg_precision}
        
        return pooled_eval_results, example_eval_results
    
    def _doc_match(self, retrieved: str, gold: str) -> bool:
        """Check if retrieved doc matches gold doc (fuzzy match)."""
        # Simple substring match
        return gold.strip() in retrieved.strip() or retrieved.strip() in gold.strip()

Integrating Metrics

1. Register in Evaluator Factory

In remem.py, the get_evaluators() method creates metric instances:
def get_evaluators(self, gold_answers, gold_docs, metrics):
    """Create evaluator instances based on requested metrics."""
    qa_evaluators = []
    retrieval_evaluators = []
    
    for metric in metrics:
        if metric == "qa_em":
            from remem.evaluation.qa_eval import QAExactMatch
            qa_evaluators.append(QAExactMatch(global_config=self.global_config))
        elif metric == "qa_f1":
            from remem.evaluation.qa_eval import QAF1Score
            qa_evaluators.append(QAF1Score(global_config=self.global_config))
        elif metric == "semantic_similarity":  # Your custom metric
            from remem.evaluation.custom_metric import SemanticSimilarity
            qa_evaluators.append(SemanticSimilarity(global_config=self.global_config))
        # ... more metrics
    
    return qa_evaluators, retrieval_evaluators

2. Use in RAG Pipeline

from remem.remem import ReMem
from remem.utils.config_utils import BaseConfig

config = BaseConfig(
    dataset="test",
    extract_method="openie",
)

rag = ReMem(global_config=config)
rag.index(docs)

# Use custom metric
solutions, responses, meta, ret_metrics, qa_metrics = rag.rag_for_qa(
    queries=["What is X?"],
    gold_answers=[["Y"]],
    metrics=("qa_em", "qa_f1", "semantic_similarity"),  # Include custom metric
)

print(qa_metrics)
# {"ExactMatch": 0.5, "F1": 0.75, "SemanticSimilarity": 0.88}

Advanced: LLM-as-Judge Metrics

REMem includes LLM-based evaluation: From evaluation/qa_evalsuit_llm_judge.py:
class EvalSuitLLMJudge(BaseMetric):
    metric_name: str = "llm_judge"
    
    def calculate_metric_scores(
        self,
        questions: List[str],
        predicted_answers: List[str],
        gold_answers: List[List[str]],
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Use an LLM to judge answer quality.
        """
        # Construct prompts for LLM judge
        judge_prompts = [
            self._build_judge_prompt(q, pred, gold)
            for q, pred, gold in zip(questions, predicted_answers, gold_answers)
        ]
        
        # Get LLM judgments
        judgments = self.llm.batch_infer(judge_prompts)
        
        # Parse scores from judgments
        scores = [self._parse_score(j) for j in judgments]
        
        return {"LLM_Judge_Score": np.mean(scores)}, \
               [{"LLM_Judge_Score": s} for s in scores]

Create Custom LLM Judge

from remem.evaluation.base import BaseMetric
from remem.llm.openai_gpt import CacheOpenAI

class CustomLLMJudge(BaseMetric):
    metric_name: str = "custom_llm_judge"
    
    def __init__(self, global_config):
        super().__init__(global_config)
        self.llm = CacheOpenAI(
            model_name="gpt-4o-mini",
            global_config=global_config
        )
    
    def calculate_metric_scores(
        self,
        questions: List[str],
        predicted_answers: List[str],
        gold_answers: List[List[str]],
        **kwargs
    ) -> Tuple[Dict[str, float], List[Dict[str, float]]]:
        """
        Judge answer quality on scale of 1-5.
        """
        example_results = []
        total_score = 0.0
        
        for question, predicted, gold_list in zip(questions, predicted_answers, gold_answers):
            prompt = f"""
Question: {question}

Gold Answer: {gold_list[0]}

Predicted Answer: {predicted}

Rate the predicted answer on a scale of 1-5, where:
1 = Completely wrong
2 = Partially wrong
3 = Partially correct
4 = Mostly correct
5 = Completely correct

Respond with ONLY the number.
"""
            
            response, _, _ = self.llm.infer(
                messages=[{"role": "user", "content": prompt}]
            )
            
            try:
                score = float(response.strip())
            except:
                score = 0.0
            
            example_results.append({"LLM_Score": score})
            total_score += score
        
        avg_score = total_score / len(questions) if questions else 0.0
        return {"LLM_Score": avg_score}, example_results

Metric Utilities

Answer Normalization

From utils/eval_utils.py:
import re
import string

def normalize_answer(s: str) -> str:
    """Normalize answer for comparison."""
    def remove_articles(text):
        return re.sub(r"\b(a|an|the)\b", " ", text)
    
    def white_space_fix(text):
        return " ".join(text.split())
    
    def remove_punc(text):
        exclude = set(string.punctuation)
        return "".join(ch for ch in text if ch not in exclude)
    
    def lower(text):
        return text.lower()
    
    return white_space_fix(remove_articles(remove_punc(lower(s))))

Testing Metrics

# Test your custom metric
from remem.evaluation.custom_metric import SemanticSimilarity
from remem.utils.config_utils import BaseConfig

metric = SemanticSimilarity(global_config=BaseConfig())

gold_answers = [["Paris", "Paris, France"], ["1950"]]
predicted_answers = ["paris", "nineteen fifty"]

overall, per_example = metric.calculate_metric_scores(
    gold_answers=gold_answers,
    predicted_answers=predicted_answers
)

print("Overall:", overall)
print("Per-example:", per_example)

Next Steps

Build docs developers (and LLMs) love