Overview
CheckThat AI supports batch processing capabilities for evaluating multiple claims efficiently. While there isn’t a dedicated batch processing endpoint, you can leverage the API’s architecture to implement efficient batch workflows.Batch Processing Strategies
1. Sequential Processing
Process claims one at a time using the/chat or /v1/chat/completions endpoints:
import requests
import time
def process_claims_sequential(claims, model="gpt-4o", api_key="sk-proj-..."):
results = []
for claim in claims:
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim,
"model": model,
"api_key": api_key
},
stream=True
)
# Collect streaming response
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append({
"claim": claim,
"normalized": full_response
})
# Respect rate limits (10 requests per 60 seconds)
time.sleep(6)
return results
# Usage
claims = [
"The Earth is flat.",
"Vaccines cause autism.",
"Climate change is a hoax."
]
results = process_claims_sequential(claims)
2. Parallel Processing with Rate Limiting
Process multiple claims in parallel while respecting rate limits:import asyncio
import aiohttp
from asyncio import Semaphore
async def process_claim(session, claim, model, api_key, semaphore):
async with semaphore: # Limit concurrent requests
async with session.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim,
"model": model,
"api_key": api_key
}
) as response:
result = await response.text()
return {"claim": claim, "normalized": result}
async def process_claims_parallel(claims, model="gpt-4o", api_key="sk-proj-...", max_concurrent=5):
semaphore = Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
process_claim(session, claim, model, api_key, semaphore)
for claim in claims
]
results = await asyncio.gather(*tasks)
return results
# Usage
claims = [
"The moon landing was faked.",
"5G causes COVID-19.",
"The pyramids were built by aliens."
]
results = asyncio.run(process_claims_parallel(claims))
3. Batch with Refinement
Process claims with automatic refinement enabled:from openai import OpenAI
def process_batch_with_refinement(claims, model="gpt-4o", api_key="sk-proj-..."):
client = OpenAI(
api_key=api_key,
base_url="https://api.checkthat-ai.com/v1"
)
results = []
for claim in claims:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "user", "content": claim}
],
# CheckThat AI custom parameters
extra_body={
"refine_claims": True,
"refine_model": "gpt-4o",
"refine_threshold": 0.7,
"refine_max_iters": 3,
"checkthat_api_key": api_key
}
)
results.append({
"claim": claim,
"response": response.choices[0].message.content,
"refinement_metadata": getattr(response, 'refinement_metadata', None)
})
return results
Batch File Processing
Process claims from a file (CSV, JSON, or text):CSV Processing
import csv
import json
import requests
def process_csv_batch(input_file, output_file, model="gpt-4o", api_key="sk-proj-..."):
results = []
# Read claims from CSV
with open(input_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
claims = [row['claim'] for row in reader]
# Process each claim
for i, claim in enumerate(claims):
print(f"Processing claim {i+1}/{len(claims)}...")
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim,
"model": model,
"api_key": api_key
},
stream=True
)
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append({
"original_claim": claim,
"normalized_claim": full_response
})
# Write results to output file
with open(output_file, 'w', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['original_claim', 'normalized_claim'])
writer.writeheader()
writer.writerows(results)
return results
# Usage
results = process_csv_batch(
'input_claims.csv',
'output_normalized.csv',
model='gpt-4o',
api_key='sk-proj-...'
)
JSON Processing
import json
import requests
def process_json_batch(input_file, output_file, model="gpt-4o", api_key="sk-proj-..."):
# Read claims from JSON
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
claims = data.get('claims', [])
results = []
# Process each claim
for claim_obj in claims:
claim_text = claim_obj.get('text', '')
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim_text,
"model": model,
"api_key": api_key
},
stream=True
)
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append({
"id": claim_obj.get('id'),
"original": claim_text,
"normalized": full_response,
"metadata": claim_obj.get('metadata', {})
})
# Write results
with open(output_file, 'w', encoding='utf-8') as f:
json.dump({"results": results}, f, indent=2)
return results
# Usage
results = process_json_batch(
'claims.json',
'normalized_claims.json',
model='gpt-4o',
api_key='sk-proj-...'
)
Batch Evaluation
Use CheckThat AI’s evaluation services for batch quality assessment:from openai import OpenAI
from typing import List, Dict
def batch_evaluate_claims(
claim_pairs: List[Dict[str, str]],
model: str = "gpt-4o",
api_key: str = "sk-proj-..."
) -> List[Dict]:
"""
Evaluate multiple claim pairs (original vs normalized).
Args:
claim_pairs: List of dicts with 'original' and 'normalized' keys
model: Model to use for evaluation
api_key: API key
Returns:
List of evaluation results
"""
client = OpenAI(
api_key=api_key,
base_url="https://api.checkthat-ai.com/v1"
)
results = []
for pair in claim_pairs:
# Use refinement to evaluate quality
response = client.chat.completions.create(
model=model,
messages=[
{"role": "user", "content": pair['normalized']}
],
extra_body={
"refine_claims": True,
"refine_model": model,
"refine_threshold": 0.8,
"checkthat_api_key": api_key
}
)
results.append({
"original": pair['original'],
"normalized": pair['normalized'],
"evaluation": response.refinement_metadata
})
return results
Rate Limiting Considerations
When processing batches, respect the rate limits:- 10 requests per 60 seconds per IP address
- Implement delays between requests
- Use exponential backoff for retries
- Monitor rate limit headers
import time
import requests
def rate_limited_batch_process(claims, delay=6):
"""
Process claims with rate limiting.
Args:
claims: List of claims to process
delay: Delay between requests in seconds (default: 6s = 10 req/min)
"""
results = []
for i, claim in enumerate(claims):
# Check rate limit headers from previous response
if i > 0:
time.sleep(delay)
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={"user_query": claim, "model": "gpt-4o", "api_key": "sk-proj-..."},
stream=True
)
# Check rate limit headers
remaining = response.headers.get('X-RateLimit-Remaining')
reset_time = response.headers.get('X-RateLimit-Reset')
if remaining and int(remaining) < 2:
# Wait until reset if close to limit
wait_time = int(reset_time) - int(time.time())
if wait_time > 0:
print(f"Rate limit close, waiting {wait_time}s...")
time.sleep(wait_time + 1)
# Process response
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append(full_response)
return results
Error Handling in Batch Processing
Implement robust error handling for batch operations:import requests
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_batch_process(claims, model="gpt-4o", api_key="sk-proj-...", max_retries=3):
results = []
for i, claim in enumerate(claims):
retries = 0
success = False
while retries < max_retries and not success:
try:
logger.info(f"Processing claim {i+1}/{len(claims)} (attempt {retries+1})")
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim,
"model": model,
"api_key": api_key
},
stream=True,
timeout=60
)
if response.status_code == 429:
# Rate limit exceeded
retry_after = int(response.headers.get('Retry-After', 60))
logger.warning(f"Rate limit exceeded, waiting {retry_after}s")
time.sleep(retry_after)
retries += 1
continue
response.raise_for_status()
# Collect response
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append({
"claim": claim,
"normalized": full_response,
"success": True
})
success = True
except requests.exceptions.Timeout:
logger.error(f"Timeout processing claim {i+1}")
retries += 1
time.sleep(5 * retries) # Exponential backoff
except requests.exceptions.RequestException as e:
logger.error(f"Error processing claim {i+1}: {e}")
retries += 1
time.sleep(5 * retries)
if not success:
results.append({
"claim": claim,
"normalized": None,
"success": False,
"error": "Max retries exceeded"
})
# Rate limiting delay
if i < len(claims) - 1:
time.sleep(6)
return results
Progress Tracking
Implement progress tracking for long-running batch jobs:from tqdm import tqdm
import requests
def batch_process_with_progress(claims, model="gpt-4o", api_key="sk-proj-..."):
results = []
# Create progress bar
with tqdm(total=len(claims), desc="Processing claims") as pbar:
for claim in claims:
response = requests.post(
"https://api.checkthat-ai.com/chat",
json={
"user_query": claim,
"model": model,
"api_key": api_key
},
stream=True
)
full_response = ""
for chunk in response.iter_content(decode_unicode=True):
if chunk:
full_response += chunk
results.append(full_response)
pbar.update(1)
# Rate limiting
time.sleep(6)
return results
Best Practices
Respect Rate Limits
Respect Rate Limits
Always implement delays between requests and handle 429 responses gracefully. Use the
Retry-After header to determine wait times.Implement Error Handling
Implement Error Handling
Use try-catch blocks and retry logic with exponential backoff. Log errors for debugging and monitoring.
Monitor Progress
Monitor Progress
Implement progress tracking and logging for long-running batch jobs. This helps identify issues early.
Save Intermediate Results
Save Intermediate Results
Periodically save results to avoid losing progress if the process is interrupted.
Optimize Concurrency
Optimize Concurrency
Use parallel processing with semaphores to respect rate limits while maximizing throughput.
Related Endpoints
Chat Endpoint
Process individual claims with streaming
Chat Completions
OpenAI-compatible completions API