Skip to main content
Batch evaluation lets you run evaluators across entire datasets efficiently. Phoenix provides optimized execution with concurrency, error handling, and progress tracking.

Why Batch Evaluation?

Batch evaluation is essential for:
  • Regression testing: Ensure new model versions don’t degrade quality
  • Dataset validation: Evaluate model performance on test sets
  • Production monitoring: Analyze historical traces for quality issues
  • A/B testing: Compare different prompts, models, or configurations
  • Continuous improvement: Track quality metrics over time

Evaluating DataFrames

The primary interface for batch evaluation is evaluate_dataframe, which runs evaluators on pandas DataFrames.

Basic Usage

import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe

# Create evaluator
@create_evaluator(name="word_count")
def word_count(text: str) -> int:
    return len(text.split())

# Prepare data
df = pd.DataFrame({
    "text": [
        "Hello world",
        "This is a longer sentence with more words",
        "Short"
    ]
})

# Run evaluation
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[word_count]
)

print(results_df)
# Columns: text, word_count_execution_details, word_count_score

Multiple Evaluators

Run multiple evaluators in parallel:
from phoenix.evals import create_evaluator, evaluate_dataframe
from phoenix.evals.metrics import exact_match
import pandas as pd

@create_evaluator(name="length")
def length(text: str) -> int:
    return len(text)

@create_evaluator(name="has_punctuation")
def has_punctuation(text: str) -> bool:
    return any(c in text for c in ".,!?;:")

df = pd.DataFrame({
    "text": ["Hello, world!", "Test", "How are you?"]
})

# All evaluators run in parallel
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[length, has_punctuation]
)

print(results_df.columns)
# ['text', 'length_execution_details', 'length_score',
#  'has_punctuation_execution_details', 'has_punctuation_score']

Working with Results

Evaluation results are stored as JSON-serialized Score objects:
import json
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe

@create_evaluator(name="sentiment")
def sentiment(text: str) -> str:
    if any(word in text.lower() for word in ["great", "excellent", "amazing"]):
        return "positive"
    elif any(word in text.lower() for word in ["bad", "terrible", "awful"]):
        return "negative"
    return "neutral"

df = pd.DataFrame({
    "text": ["This is great!", "Terrible experience", "It was okay"]
})

results_df = evaluate_dataframe(dataframe=df, evaluators=[sentiment])

# Extract scores from JSON
for idx, row in results_df.iterrows():
    score_data = json.loads(row["sentiment_score"])
    print(f"Row {idx}: {score_data['label']} (score: {score_data.get('score')})")

# Convert to separate columns for analysis
results_df["sentiment_label"] = results_df["sentiment_score"].apply(
    lambda x: json.loads(x)["label"] if x else None
)

print(results_df[["text", "sentiment_label"]])

LLM Batch Evaluation

LLM-based evaluators benefit from Phoenix’s automatic concurrency:
import pandas as pd
from phoenix.evals import create_classifier, evaluate_dataframe, LLM

llm = LLM(provider="openai", model="gpt-4o-mini")

# Create LLM evaluator
relevance_eval = create_classifier(
    name="relevance",
    prompt_template="""
Is this answer relevant to the question?

Question: {input}
Answer: {output}
    """,
    llm=llm,
    choices={"relevant": 1.0, "irrelevant": 0.0}
)

# Prepare dataset
df = pd.DataFrame([
    {"input": "What is AI?", "output": "AI is artificial intelligence."},
    {"input": "What is ML?", "output": "Paris is a city in France."},
    {"input": "Define NLP", "output": "NLP is natural language processing."}
])

# Evaluate - automatically runs with concurrency
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[relevance_eval]
)

import json
for idx, row in results_df.iterrows():
    score = json.loads(row["relevance_score"])
    print(f"{row['input']}: {score['label']} - {score['explanation'][:50]}...")

Async Batch Evaluation

For async evaluators, use async_evaluate_dataframe for better performance:
import pandas as pd
import asyncio
from phoenix.evals import create_evaluator, async_evaluate_dataframe

@create_evaluator(name="async_check")
async def async_check(text: str) -> bool:
    # Simulate async API call
    await asyncio.sleep(0.1)
    return len(text) > 10

df = pd.DataFrame({
    "text": ["Short", "This is a much longer text example", "Medium"]
})

# Run async evaluation
results_df = await async_evaluate_dataframe(
    dataframe=df,
    evaluators=[async_check],
    concurrency=10  # Control concurrent requests
)

print(results_df)

Concurrency Control

Control the number of concurrent evaluations:
import pandas as pd
from phoenix.evals import async_evaluate_dataframe, create_classifier, LLM

llm = LLM(provider="openai", model="gpt-4o-mini")

evaluator = create_classifier(
    name="quality",
    prompt_template="Rate quality: {output}",
    llm=llm,
    choices={"high": 1.0, "low": 0.0}
)

df = pd.DataFrame({"output": [f"Example {i}" for i in range(100)]})

# Limit concurrent API calls to avoid rate limits
results_df = await async_evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator],
    concurrency=5  # Max 5 concurrent requests
)

Progress Tracking

Monitor evaluation progress with built-in progress bars:
from phoenix.evals import evaluate_dataframe
import pandas as pd

# Default progress bar
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator]
)

# Custom progress bar format
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator],
    tqdm_bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
)

# Hide progress bar
results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator],
    hide_tqdm_bar=True
)

Error Handling

Phoenix provides robust error handling for batch evaluations:

Execution Details

Every evaluation includes execution details:
import json
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe

@create_evaluator(name="risky_eval")
def risky_eval(text: str) -> float:
    # This might fail on some inputs
    return 1.0 / len(text)

df = pd.DataFrame({
    "text": ["Hello", "", "World"]  # Empty string will cause error
})

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[risky_eval],
    exit_on_error=False  # Continue on errors
)

# Check for errors
for idx, row in results_df.iterrows():
    details = json.loads(row["risky_eval_execution_details"])
    print(f"Row {idx}: Status={details['status']}")
    if details["exceptions"]:
        print(f"  Error: {details['exceptions'][0]}")
    print(f"  Execution time: {details['execution_seconds']:.3f}s")

Retry Logic

Configure retry behavior for transient failures:
from phoenix.evals import evaluate_dataframe

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator],
    max_retries=3,  # Retry up to 3 times on failure
    exit_on_error=False  # Don't stop on errors
)

Error Strategies

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[evaluator],
    exit_on_error=True  # Stop immediately on error (default)
)
Use when: You want to fix errors before continuing

Input Mapping for Batches

Use input mapping when your DataFrame columns don’t match evaluator parameters:
import pandas as pd
from phoenix.evals import create_evaluator, bind_evaluator, evaluate_dataframe

@create_evaluator(name="check_match")
def check_match(expected: str, actual: str) -> bool:
    return expected.strip().lower() == actual.strip().lower()

# DataFrame has different column names
df = pd.DataFrame({
    "ground_truth": ["Paris", "London", "Berlin"],
    "model_output": ["Paris", "paris", "Vienna"]
})

# Bind mapping
bound_evaluator = bind_evaluator(
    evaluator=check_match,
    input_mapping={
        "expected": "ground_truth",
        "actual": "model_output"
    }
)

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[bound_evaluator]
)

import json
for idx, row in results_df.iterrows():
    score = json.loads(row["check_match_score"])
    print(f"{row['ground_truth']} vs {row['model_output']}: {score['score']}")

Evaluating Traces

Evaluate traces collected by Phoenix:
import phoenix as px
import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM

# Get traces from Phoenix
client = px.Client()
traces_df = px.Client().query_spans(
    project_name="my-project",
    start_time=...,
    end_time=...
)

# Extract relevant fields
eval_df = pd.DataFrame({
    "input": traces_df["attributes.input.value"],
    "output": traces_df["attributes.output.value"],
    "trace_id": traces_df["context.trace_id"]
})

# Create evaluator
llm = LLM(provider="openai", model="gpt-4o-mini")
evaluator = create_classifier(
    name="quality",
    prompt_template="Rate quality:\n{input}\n{output}",
    llm=llm,
    choices={"good": 1.0, "bad": 0.0}
)

# Evaluate traces
results_df = evaluate_dataframe(
    dataframe=eval_df,
    evaluators=[evaluator]
)

# Analyze results
import json
results_df["quality_label"] = results_df["quality_score"].apply(
    lambda x: json.loads(x)["label"] if x else None
)

print("Quality distribution:")
print(results_df["quality_label"].value_counts())

Dataset Evaluation Workflows

Benchmark Dataset

import pandas as pd
from phoenix.evals import evaluate_dataframe
from phoenix.evals.metrics import CorrectnessEvaluator, ConcisenessEvaluator, LLM

# Load benchmark dataset
benchmark_df = pd.read_csv("benchmark_dataset.csv")
# Columns: question, expected_answer, model_answer

# Create evaluators
llm = LLM(provider="openai", model="gpt-4o")
correctness_eval = CorrectnessEvaluator(llm=llm)
conciseness_eval = ConcisenessEvaluator(llm=llm)

# Map DataFrame columns to evaluator inputs
from phoenix.evals import bind_evaluator

correctness_bound = bind_evaluator(
    evaluator=correctness_eval,
    input_mapping={
        "input": "question",
        "output": "model_answer"
    }
)

conciseness_bound = bind_evaluator(
    evaluator=conciseness_eval,
    input_mapping={
        "input": "question",
        "output": "model_answer"
    }
)

# Run evaluation
results_df = evaluate_dataframe(
    dataframe=benchmark_df,
    evaluators=[correctness_bound, conciseness_bound]
)

# Analyze results
import json

results_df["is_correct"] = results_df["correctness_score"].apply(
    lambda x: json.loads(x)["label"] == "correct" if x else False
)
results_df["is_concise"] = results_df["conciseness_score"].apply(
    lambda x: json.loads(x)["label"] == "concise" if x else False
)

print(f"Correctness: {results_df['is_correct'].mean():.2%}")
print(f"Conciseness: {results_df['is_concise'].mean():.2%}")

# Save results
results_df.to_csv("evaluation_results.csv", index=False)

A/B Testing

import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM
import json

# Load A/B test data
df = pd.read_csv("ab_test_data.csv")
# Columns: question, variant_a_response, variant_b_response

llm = LLM(provider="openai", model="gpt-4o")

# Create evaluator
quality_eval = create_classifier(
    name="quality",
    prompt_template="Rate quality: {output}",
    llm=llm,
    choices={"excellent": 5, "good": 4, "fair": 3, "poor": 2, "terrible": 1}
)

# Evaluate variant A
from phoenix.evals import bind_evaluator

variant_a_eval = bind_evaluator(
    evaluator=quality_eval,
    input_mapping={"output": "variant_a_response"}
)

variant_b_eval = bind_evaluator(
    evaluator=quality_eval,
    input_mapping={"output": "variant_b_response"}
)

results_df = evaluate_dataframe(
    dataframe=df,
    evaluators=[variant_a_eval, variant_b_eval]
)

# Compare variants
results_df["variant_a_score"] = results_df["quality_score"].apply(
    lambda x: json.loads(x)["score"] if x else None
)
results_df["variant_b_score"] = results_df["quality_score"].apply(
    lambda x: json.loads(x)["score"] if x else None
)

print(f"Variant A average: {results_df['variant_a_score'].mean():.2f}")
print(f"Variant B average: {results_df['variant_b_score'].mean():.2f}")

# Statistical significance test
from scipy import stats
t_stat, p_value = stats.ttest_rel(
    results_df["variant_a_score"],
    results_df["variant_b_score"]
)
print(f"P-value: {p_value:.4f}")

Regression Testing

import pandas as pd
from phoenix.evals import evaluate_dataframe
from phoenix.evals.metrics import exact_match
import json

# Load test suite
test_df = pd.read_csv("test_suite.csv")
# Columns: input, expected_output, actual_output

# Bind evaluator
from phoenix.evals import bind_evaluator

bound_exact_match = bind_evaluator(
    evaluator=exact_match,
    input_mapping={
        "output": "actual_output",
        "expected": "expected_output"
    }
)

# Run tests
results_df = evaluate_dataframe(
    dataframe=test_df,
    evaluators=[bound_exact_match],
    exit_on_error=False
)

# Report results
results_df["passed"] = results_df["exact_match_score"].apply(
    lambda x: json.loads(x)["score"] == 1.0 if x else False
)

pass_rate = results_df["passed"].mean()
print(f"Pass rate: {pass_rate:.2%}")
print(f"Tests passed: {results_df['passed'].sum()}/{len(results_df)}")

# Show failures
failures = results_df[~results_df["passed"]]
if not failures.empty:
    print("\nFailed tests:")
    for idx, row in failures.iterrows():
        print(f"  Input: {row['input']}")
        print(f"  Expected: {row['expected_output']}")
        print(f"  Actual: {row['actual_output']}")
        print()

Performance Optimization

Batching Strategy

For large datasets, process in chunks to manage memory:
import pandas as pd
from phoenix.evals import evaluate_dataframe

# Process large dataset in chunks
chunk_size = 1000
large_df = pd.read_csv("large_dataset.csv")

results_chunks = []
for i in range(0, len(large_df), chunk_size):
    chunk = large_df.iloc[i:i+chunk_size]
    results_chunk = evaluate_dataframe(
        dataframe=chunk,
        evaluators=[evaluator]
    )
    results_chunks.append(results_chunk)
    print(f"Processed {i+len(chunk)}/{len(large_df)} rows")

final_results = pd.concat(results_chunks, ignore_index=True)

Parallel Processing

Phoenix automatically parallelizes evaluations. Control concurrency based on your resources:
import pandas as pd
from phoenix.evals import async_evaluate_dataframe

# High concurrency for fast, cheap evaluators
results_df = await async_evaluate_dataframe(
    dataframe=df,
    evaluators=[fast_evaluator],
    concurrency=50
)

# Lower concurrency for expensive LLM evaluations
results_df = await async_evaluate_dataframe(
    dataframe=df,
    evaluators=[llm_evaluator],
    concurrency=5  # Avoid rate limits
)

Best Practices

Sample First

Always test on a small sample before running on the full dataset:
import pandas as pd
from phoenix.evals import evaluate_dataframe

# Test on sample
sample_df = df.sample(n=10, random_state=42)
test_results = evaluate_dataframe(
    dataframe=sample_df,
    evaluators=[evaluator]
)

# Verify results look good
print(test_results.head())

# Run on full dataset
if input("Proceed with full dataset? (y/n): ") == "y":
    full_results = evaluate_dataframe(
        dataframe=df,
        evaluators=[evaluator]
    )

Monitor Costs

Track LLM API costs during evaluation:
import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM
import json

llm = LLM(provider="openai", model="gpt-4o-mini")
evaluator = create_classifier(
    name="quality",
    prompt_template="Rate: {output}",
    llm=llm,
    choices={"good": 1.0, "bad": 0.0}
)

# Estimate cost on sample
sample_size = 10
full_size = len(df)

sample_results = evaluate_dataframe(
    dataframe=df.head(sample_size),
    evaluators=[evaluator]
)

# Estimate tokens and cost (rough)
avg_tokens_per_eval = 100  # Adjust based on your prompt
total_evals = full_size * len([evaluator])
estimated_tokens = total_evals * avg_tokens_per_eval
estimated_cost = (estimated_tokens / 1000) * 0.00015  # GPT-4o-mini rate

print(f"Estimated cost for full dataset: ${estimated_cost:.2f}")
print(f"Proceed? (y/n)")

Save Intermediate Results

import pandas as pd
from phoenix.evals import evaluate_dataframe

# Save results incrementally
chunk_size = 100
for i in range(0, len(df), chunk_size):
    chunk = df.iloc[i:i+chunk_size]
    results = evaluate_dataframe(
        dataframe=chunk,
        evaluators=[evaluator]
    )
    
    # Save checkpoint
    results.to_csv(f"results_checkpoint_{i}.csv", index=False)
    print(f"Saved checkpoint at row {i+len(chunk)}")

Next Steps

Pre-built Metrics

Use ready-made evaluators for common tasks

Custom Evaluators

Create custom evaluation logic

Build docs developers (and LLMs) love