Why Batch Evaluation?
Batch evaluation is essential for:- Regression testing: Ensure new model versions don’t degrade quality
- Dataset validation: Evaluate model performance on test sets
- Production monitoring: Analyze historical traces for quality issues
- A/B testing: Compare different prompts, models, or configurations
- Continuous improvement: Track quality metrics over time
Evaluating DataFrames
The primary interface for batch evaluation isevaluate_dataframe, which runs evaluators on pandas DataFrames.
Basic Usage
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe
# Create evaluator
@create_evaluator(name="word_count")
def word_count(text: str) -> int:
return len(text.split())
# Prepare data
df = pd.DataFrame({
"text": [
"Hello world",
"This is a longer sentence with more words",
"Short"
]
})
# Run evaluation
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[word_count]
)
print(results_df)
# Columns: text, word_count_execution_details, word_count_score
Multiple Evaluators
Run multiple evaluators in parallel:from phoenix.evals import create_evaluator, evaluate_dataframe
from phoenix.evals.metrics import exact_match
import pandas as pd
@create_evaluator(name="length")
def length(text: str) -> int:
return len(text)
@create_evaluator(name="has_punctuation")
def has_punctuation(text: str) -> bool:
return any(c in text for c in ".,!?;:")
df = pd.DataFrame({
"text": ["Hello, world!", "Test", "How are you?"]
})
# All evaluators run in parallel
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[length, has_punctuation]
)
print(results_df.columns)
# ['text', 'length_execution_details', 'length_score',
# 'has_punctuation_execution_details', 'has_punctuation_score']
Working with Results
Evaluation results are stored as JSON-serialized Score objects:import json
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe
@create_evaluator(name="sentiment")
def sentiment(text: str) -> str:
if any(word in text.lower() for word in ["great", "excellent", "amazing"]):
return "positive"
elif any(word in text.lower() for word in ["bad", "terrible", "awful"]):
return "negative"
return "neutral"
df = pd.DataFrame({
"text": ["This is great!", "Terrible experience", "It was okay"]
})
results_df = evaluate_dataframe(dataframe=df, evaluators=[sentiment])
# Extract scores from JSON
for idx, row in results_df.iterrows():
score_data = json.loads(row["sentiment_score"])
print(f"Row {idx}: {score_data['label']} (score: {score_data.get('score')})")
# Convert to separate columns for analysis
results_df["sentiment_label"] = results_df["sentiment_score"].apply(
lambda x: json.loads(x)["label"] if x else None
)
print(results_df[["text", "sentiment_label"]])
LLM Batch Evaluation
LLM-based evaluators benefit from Phoenix’s automatic concurrency:import pandas as pd
from phoenix.evals import create_classifier, evaluate_dataframe, LLM
llm = LLM(provider="openai", model="gpt-4o-mini")
# Create LLM evaluator
relevance_eval = create_classifier(
name="relevance",
prompt_template="""
Is this answer relevant to the question?
Question: {input}
Answer: {output}
""",
llm=llm,
choices={"relevant": 1.0, "irrelevant": 0.0}
)
# Prepare dataset
df = pd.DataFrame([
{"input": "What is AI?", "output": "AI is artificial intelligence."},
{"input": "What is ML?", "output": "Paris is a city in France."},
{"input": "Define NLP", "output": "NLP is natural language processing."}
])
# Evaluate - automatically runs with concurrency
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[relevance_eval]
)
import json
for idx, row in results_df.iterrows():
score = json.loads(row["relevance_score"])
print(f"{row['input']}: {score['label']} - {score['explanation'][:50]}...")
Async Batch Evaluation
For async evaluators, useasync_evaluate_dataframe for better performance:
import pandas as pd
import asyncio
from phoenix.evals import create_evaluator, async_evaluate_dataframe
@create_evaluator(name="async_check")
async def async_check(text: str) -> bool:
# Simulate async API call
await asyncio.sleep(0.1)
return len(text) > 10
df = pd.DataFrame({
"text": ["Short", "This is a much longer text example", "Medium"]
})
# Run async evaluation
results_df = await async_evaluate_dataframe(
dataframe=df,
evaluators=[async_check],
concurrency=10 # Control concurrent requests
)
print(results_df)
Concurrency Control
Control the number of concurrent evaluations:import pandas as pd
from phoenix.evals import async_evaluate_dataframe, create_classifier, LLM
llm = LLM(provider="openai", model="gpt-4o-mini")
evaluator = create_classifier(
name="quality",
prompt_template="Rate quality: {output}",
llm=llm,
choices={"high": 1.0, "low": 0.0}
)
df = pd.DataFrame({"output": [f"Example {i}" for i in range(100)]})
# Limit concurrent API calls to avoid rate limits
results_df = await async_evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
concurrency=5 # Max 5 concurrent requests
)
Progress Tracking
Monitor evaluation progress with built-in progress bars:from phoenix.evals import evaluate_dataframe
import pandas as pd
# Default progress bar
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator]
)
# Custom progress bar format
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
tqdm_bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
)
# Hide progress bar
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
hide_tqdm_bar=True
)
Error Handling
Phoenix provides robust error handling for batch evaluations:Execution Details
Every evaluation includes execution details:import json
import pandas as pd
from phoenix.evals import create_evaluator, evaluate_dataframe
@create_evaluator(name="risky_eval")
def risky_eval(text: str) -> float:
# This might fail on some inputs
return 1.0 / len(text)
df = pd.DataFrame({
"text": ["Hello", "", "World"] # Empty string will cause error
})
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[risky_eval],
exit_on_error=False # Continue on errors
)
# Check for errors
for idx, row in results_df.iterrows():
details = json.loads(row["risky_eval_execution_details"])
print(f"Row {idx}: Status={details['status']}")
if details["exceptions"]:
print(f" Error: {details['exceptions'][0]}")
print(f" Execution time: {details['execution_seconds']:.3f}s")
Retry Logic
Configure retry behavior for transient failures:from phoenix.evals import evaluate_dataframe
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
max_retries=3, # Retry up to 3 times on failure
exit_on_error=False # Don't stop on errors
)
Error Strategies
- Stop on First Error
- Continue on Errors
- Retry Then Continue
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
exit_on_error=True # Stop immediately on error (default)
)
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
exit_on_error=False # Record errors but continue
)
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator],
max_retries=5, # Retry failed evaluations
exit_on_error=False # Continue even after retries exhausted
)
Input Mapping for Batches
Use input mapping when your DataFrame columns don’t match evaluator parameters:import pandas as pd
from phoenix.evals import create_evaluator, bind_evaluator, evaluate_dataframe
@create_evaluator(name="check_match")
def check_match(expected: str, actual: str) -> bool:
return expected.strip().lower() == actual.strip().lower()
# DataFrame has different column names
df = pd.DataFrame({
"ground_truth": ["Paris", "London", "Berlin"],
"model_output": ["Paris", "paris", "Vienna"]
})
# Bind mapping
bound_evaluator = bind_evaluator(
evaluator=check_match,
input_mapping={
"expected": "ground_truth",
"actual": "model_output"
}
)
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[bound_evaluator]
)
import json
for idx, row in results_df.iterrows():
score = json.loads(row["check_match_score"])
print(f"{row['ground_truth']} vs {row['model_output']}: {score['score']}")
Evaluating Traces
Evaluate traces collected by Phoenix:import phoenix as px
import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM
# Get traces from Phoenix
client = px.Client()
traces_df = px.Client().query_spans(
project_name="my-project",
start_time=...,
end_time=...
)
# Extract relevant fields
eval_df = pd.DataFrame({
"input": traces_df["attributes.input.value"],
"output": traces_df["attributes.output.value"],
"trace_id": traces_df["context.trace_id"]
})
# Create evaluator
llm = LLM(provider="openai", model="gpt-4o-mini")
evaluator = create_classifier(
name="quality",
prompt_template="Rate quality:\n{input}\n{output}",
llm=llm,
choices={"good": 1.0, "bad": 0.0}
)
# Evaluate traces
results_df = evaluate_dataframe(
dataframe=eval_df,
evaluators=[evaluator]
)
# Analyze results
import json
results_df["quality_label"] = results_df["quality_score"].apply(
lambda x: json.loads(x)["label"] if x else None
)
print("Quality distribution:")
print(results_df["quality_label"].value_counts())
Dataset Evaluation Workflows
Benchmark Dataset
import pandas as pd
from phoenix.evals import evaluate_dataframe
from phoenix.evals.metrics import CorrectnessEvaluator, ConcisenessEvaluator, LLM
# Load benchmark dataset
benchmark_df = pd.read_csv("benchmark_dataset.csv")
# Columns: question, expected_answer, model_answer
# Create evaluators
llm = LLM(provider="openai", model="gpt-4o")
correctness_eval = CorrectnessEvaluator(llm=llm)
conciseness_eval = ConcisenessEvaluator(llm=llm)
# Map DataFrame columns to evaluator inputs
from phoenix.evals import bind_evaluator
correctness_bound = bind_evaluator(
evaluator=correctness_eval,
input_mapping={
"input": "question",
"output": "model_answer"
}
)
conciseness_bound = bind_evaluator(
evaluator=conciseness_eval,
input_mapping={
"input": "question",
"output": "model_answer"
}
)
# Run evaluation
results_df = evaluate_dataframe(
dataframe=benchmark_df,
evaluators=[correctness_bound, conciseness_bound]
)
# Analyze results
import json
results_df["is_correct"] = results_df["correctness_score"].apply(
lambda x: json.loads(x)["label"] == "correct" if x else False
)
results_df["is_concise"] = results_df["conciseness_score"].apply(
lambda x: json.loads(x)["label"] == "concise" if x else False
)
print(f"Correctness: {results_df['is_correct'].mean():.2%}")
print(f"Conciseness: {results_df['is_concise'].mean():.2%}")
# Save results
results_df.to_csv("evaluation_results.csv", index=False)
A/B Testing
import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM
import json
# Load A/B test data
df = pd.read_csv("ab_test_data.csv")
# Columns: question, variant_a_response, variant_b_response
llm = LLM(provider="openai", model="gpt-4o")
# Create evaluator
quality_eval = create_classifier(
name="quality",
prompt_template="Rate quality: {output}",
llm=llm,
choices={"excellent": 5, "good": 4, "fair": 3, "poor": 2, "terrible": 1}
)
# Evaluate variant A
from phoenix.evals import bind_evaluator
variant_a_eval = bind_evaluator(
evaluator=quality_eval,
input_mapping={"output": "variant_a_response"}
)
variant_b_eval = bind_evaluator(
evaluator=quality_eval,
input_mapping={"output": "variant_b_response"}
)
results_df = evaluate_dataframe(
dataframe=df,
evaluators=[variant_a_eval, variant_b_eval]
)
# Compare variants
results_df["variant_a_score"] = results_df["quality_score"].apply(
lambda x: json.loads(x)["score"] if x else None
)
results_df["variant_b_score"] = results_df["quality_score"].apply(
lambda x: json.loads(x)["score"] if x else None
)
print(f"Variant A average: {results_df['variant_a_score'].mean():.2f}")
print(f"Variant B average: {results_df['variant_b_score'].mean():.2f}")
# Statistical significance test
from scipy import stats
t_stat, p_value = stats.ttest_rel(
results_df["variant_a_score"],
results_df["variant_b_score"]
)
print(f"P-value: {p_value:.4f}")
Regression Testing
import pandas as pd
from phoenix.evals import evaluate_dataframe
from phoenix.evals.metrics import exact_match
import json
# Load test suite
test_df = pd.read_csv("test_suite.csv")
# Columns: input, expected_output, actual_output
# Bind evaluator
from phoenix.evals import bind_evaluator
bound_exact_match = bind_evaluator(
evaluator=exact_match,
input_mapping={
"output": "actual_output",
"expected": "expected_output"
}
)
# Run tests
results_df = evaluate_dataframe(
dataframe=test_df,
evaluators=[bound_exact_match],
exit_on_error=False
)
# Report results
results_df["passed"] = results_df["exact_match_score"].apply(
lambda x: json.loads(x)["score"] == 1.0 if x else False
)
pass_rate = results_df["passed"].mean()
print(f"Pass rate: {pass_rate:.2%}")
print(f"Tests passed: {results_df['passed'].sum()}/{len(results_df)}")
# Show failures
failures = results_df[~results_df["passed"]]
if not failures.empty:
print("\nFailed tests:")
for idx, row in failures.iterrows():
print(f" Input: {row['input']}")
print(f" Expected: {row['expected_output']}")
print(f" Actual: {row['actual_output']}")
print()
Performance Optimization
Batching Strategy
For large datasets, process in chunks to manage memory:
import pandas as pd
from phoenix.evals import evaluate_dataframe
# Process large dataset in chunks
chunk_size = 1000
large_df = pd.read_csv("large_dataset.csv")
results_chunks = []
for i in range(0, len(large_df), chunk_size):
chunk = large_df.iloc[i:i+chunk_size]
results_chunk = evaluate_dataframe(
dataframe=chunk,
evaluators=[evaluator]
)
results_chunks.append(results_chunk)
print(f"Processed {i+len(chunk)}/{len(large_df)} rows")
final_results = pd.concat(results_chunks, ignore_index=True)
Parallel Processing
Phoenix automatically parallelizes evaluations. Control concurrency based on your resources:import pandas as pd
from phoenix.evals import async_evaluate_dataframe
# High concurrency for fast, cheap evaluators
results_df = await async_evaluate_dataframe(
dataframe=df,
evaluators=[fast_evaluator],
concurrency=50
)
# Lower concurrency for expensive LLM evaluations
results_df = await async_evaluate_dataframe(
dataframe=df,
evaluators=[llm_evaluator],
concurrency=5 # Avoid rate limits
)
Best Practices
Sample First
Always test on a small sample before running on the full dataset:
import pandas as pd
from phoenix.evals import evaluate_dataframe
# Test on sample
sample_df = df.sample(n=10, random_state=42)
test_results = evaluate_dataframe(
dataframe=sample_df,
evaluators=[evaluator]
)
# Verify results look good
print(test_results.head())
# Run on full dataset
if input("Proceed with full dataset? (y/n): ") == "y":
full_results = evaluate_dataframe(
dataframe=df,
evaluators=[evaluator]
)
Monitor Costs
Track LLM API costs during evaluation:import pandas as pd
from phoenix.evals import evaluate_dataframe, create_classifier, LLM
import json
llm = LLM(provider="openai", model="gpt-4o-mini")
evaluator = create_classifier(
name="quality",
prompt_template="Rate: {output}",
llm=llm,
choices={"good": 1.0, "bad": 0.0}
)
# Estimate cost on sample
sample_size = 10
full_size = len(df)
sample_results = evaluate_dataframe(
dataframe=df.head(sample_size),
evaluators=[evaluator]
)
# Estimate tokens and cost (rough)
avg_tokens_per_eval = 100 # Adjust based on your prompt
total_evals = full_size * len([evaluator])
estimated_tokens = total_evals * avg_tokens_per_eval
estimated_cost = (estimated_tokens / 1000) * 0.00015 # GPT-4o-mini rate
print(f"Estimated cost for full dataset: ${estimated_cost:.2f}")
print(f"Proceed? (y/n)")
Save Intermediate Results
import pandas as pd
from phoenix.evals import evaluate_dataframe
# Save results incrementally
chunk_size = 100
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
results = evaluate_dataframe(
dataframe=chunk,
evaluators=[evaluator]
)
# Save checkpoint
results.to_csv(f"results_checkpoint_{i}.csv", index=False)
print(f"Saved checkpoint at row {i+len(chunk)}")
Next Steps
Pre-built Metrics
Use ready-made evaluators for common tasks
Custom Evaluators
Create custom evaluation logic