Skip to main content

Overview

CheckThat AI supports batch processing capabilities for evaluating multiple claims efficiently. While there isn’t a dedicated batch processing endpoint, you can leverage the API’s architecture to implement efficient batch workflows.

Batch Processing Strategies

1. Sequential Processing

Process claims one at a time using the /chat or /v1/chat/completions endpoints:
import requests
import time

def process_claims_sequential(claims, model="gpt-4o", api_key="sk-proj-..."):
    results = []
    
    for claim in claims:
        response = requests.post(
            "https://api.checkthat-ai.com/chat",
            json={
                "user_query": claim,
                "model": model,
                "api_key": api_key
            },
            stream=True
        )
        
        # Collect streaming response
        full_response = ""
        for chunk in response.iter_content(decode_unicode=True):
            if chunk:
                full_response += chunk
        
        results.append({
            "claim": claim,
            "normalized": full_response
        })
        
        # Respect rate limits (10 requests per 60 seconds)
        time.sleep(6)
    
    return results

# Usage
claims = [
    "The Earth is flat.",
    "Vaccines cause autism.",
    "Climate change is a hoax."
]

results = process_claims_sequential(claims)

2. Parallel Processing with Rate Limiting

Process multiple claims in parallel while respecting rate limits:
import asyncio
import aiohttp
from asyncio import Semaphore

async def process_claim(session, claim, model, api_key, semaphore):
    async with semaphore:  # Limit concurrent requests
        async with session.post(
            "https://api.checkthat-ai.com/chat",
            json={
                "user_query": claim,
                "model": model,
                "api_key": api_key
            }
        ) as response:
            result = await response.text()
            return {"claim": claim, "normalized": result}

async def process_claims_parallel(claims, model="gpt-4o", api_key="sk-proj-...", max_concurrent=5):
    semaphore = Semaphore(max_concurrent)
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            process_claim(session, claim, model, api_key, semaphore)
            for claim in claims
        ]
        results = await asyncio.gather(*tasks)
    
    return results

# Usage
claims = [
    "The moon landing was faked.",
    "5G causes COVID-19.",
    "The pyramids were built by aliens."
]

results = asyncio.run(process_claims_parallel(claims))

3. Batch with Refinement

Process claims with automatic refinement enabled:
from openai import OpenAI

def process_batch_with_refinement(claims, model="gpt-4o", api_key="sk-proj-..."):
    client = OpenAI(
        api_key=api_key,
        base_url="https://api.checkthat-ai.com/v1"
    )
    
    results = []
    
    for claim in claims:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "user", "content": claim}
            ],
            # CheckThat AI custom parameters
            extra_body={
                "refine_claims": True,
                "refine_model": "gpt-4o",
                "refine_threshold": 0.7,
                "refine_max_iters": 3,
                "checkthat_api_key": api_key
            }
        )
        
        results.append({
            "claim": claim,
            "response": response.choices[0].message.content,
            "refinement_metadata": getattr(response, 'refinement_metadata', None)
        })
    
    return results

Batch File Processing

Process claims from a file (CSV, JSON, or text):

CSV Processing

import csv
import json
import requests

def process_csv_batch(input_file, output_file, model="gpt-4o", api_key="sk-proj-..."):
    results = []
    
    # Read claims from CSV
    with open(input_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        claims = [row['claim'] for row in reader]
    
    # Process each claim
    for i, claim in enumerate(claims):
        print(f"Processing claim {i+1}/{len(claims)}...")
        
        response = requests.post(
            "https://api.checkthat-ai.com/chat",
            json={
                "user_query": claim,
                "model": model,
                "api_key": api_key
            },
            stream=True
        )
        
        full_response = ""
        for chunk in response.iter_content(decode_unicode=True):
            if chunk:
                full_response += chunk
        
        results.append({
            "original_claim": claim,
            "normalized_claim": full_response
        })
    
    # Write results to output file
    with open(output_file, 'w', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['original_claim', 'normalized_claim'])
        writer.writeheader()
        writer.writerows(results)
    
    return results

# Usage
results = process_csv_batch(
    'input_claims.csv',
    'output_normalized.csv',
    model='gpt-4o',
    api_key='sk-proj-...'
)

JSON Processing

import json
import requests

def process_json_batch(input_file, output_file, model="gpt-4o", api_key="sk-proj-..."):
    # Read claims from JSON
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    claims = data.get('claims', [])
    results = []
    
    # Process each claim
    for claim_obj in claims:
        claim_text = claim_obj.get('text', '')
        
        response = requests.post(
            "https://api.checkthat-ai.com/chat",
            json={
                "user_query": claim_text,
                "model": model,
                "api_key": api_key
            },
            stream=True
        )
        
        full_response = ""
        for chunk in response.iter_content(decode_unicode=True):
            if chunk:
                full_response += chunk
        
        results.append({
            "id": claim_obj.get('id'),
            "original": claim_text,
            "normalized": full_response,
            "metadata": claim_obj.get('metadata', {})
        })
    
    # Write results
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump({"results": results}, f, indent=2)
    
    return results

# Usage
results = process_json_batch(
    'claims.json',
    'normalized_claims.json',
    model='gpt-4o',
    api_key='sk-proj-...'
)

Batch Evaluation

Use CheckThat AI’s evaluation services for batch quality assessment:
from openai import OpenAI
from typing import List, Dict

def batch_evaluate_claims(
    claim_pairs: List[Dict[str, str]],
    model: str = "gpt-4o",
    api_key: str = "sk-proj-..."
) -> List[Dict]:
    """
    Evaluate multiple claim pairs (original vs normalized).
    
    Args:
        claim_pairs: List of dicts with 'original' and 'normalized' keys
        model: Model to use for evaluation
        api_key: API key
    
    Returns:
        List of evaluation results
    """
    client = OpenAI(
        api_key=api_key,
        base_url="https://api.checkthat-ai.com/v1"
    )
    
    results = []
    
    for pair in claim_pairs:
        # Use refinement to evaluate quality
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "user", "content": pair['normalized']}
            ],
            extra_body={
                "refine_claims": True,
                "refine_model": model,
                "refine_threshold": 0.8,
                "checkthat_api_key": api_key
            }
        )
        
        results.append({
            "original": pair['original'],
            "normalized": pair['normalized'],
            "evaluation": response.refinement_metadata
        })
    
    return results

Rate Limiting Considerations

When processing batches, respect the rate limits:
  • 10 requests per 60 seconds per IP address
  • Implement delays between requests
  • Use exponential backoff for retries
  • Monitor rate limit headers
import time
import requests

def rate_limited_batch_process(claims, delay=6):
    """
    Process claims with rate limiting.
    
    Args:
        claims: List of claims to process
        delay: Delay between requests in seconds (default: 6s = 10 req/min)
    """
    results = []
    
    for i, claim in enumerate(claims):
        # Check rate limit headers from previous response
        if i > 0:
            time.sleep(delay)
        
        response = requests.post(
            "https://api.checkthat-ai.com/chat",
            json={"user_query": claim, "model": "gpt-4o", "api_key": "sk-proj-..."},
            stream=True
        )
        
        # Check rate limit headers
        remaining = response.headers.get('X-RateLimit-Remaining')
        reset_time = response.headers.get('X-RateLimit-Reset')
        
        if remaining and int(remaining) < 2:
            # Wait until reset if close to limit
            wait_time = int(reset_time) - int(time.time())
            if wait_time > 0:
                print(f"Rate limit close, waiting {wait_time}s...")
                time.sleep(wait_time + 1)
        
        # Process response
        full_response = ""
        for chunk in response.iter_content(decode_unicode=True):
            if chunk:
                full_response += chunk
        
        results.append(full_response)
    
    return results

Error Handling in Batch Processing

Implement robust error handling for batch operations:
import requests
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def robust_batch_process(claims, model="gpt-4o", api_key="sk-proj-...", max_retries=3):
    results = []
    
    for i, claim in enumerate(claims):
        retries = 0
        success = False
        
        while retries < max_retries and not success:
            try:
                logger.info(f"Processing claim {i+1}/{len(claims)} (attempt {retries+1})")
                
                response = requests.post(
                    "https://api.checkthat-ai.com/chat",
                    json={
                        "user_query": claim,
                        "model": model,
                        "api_key": api_key
                    },
                    stream=True,
                    timeout=60
                )
                
                if response.status_code == 429:
                    # Rate limit exceeded
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limit exceeded, waiting {retry_after}s")
                    time.sleep(retry_after)
                    retries += 1
                    continue
                
                response.raise_for_status()
                
                # Collect response
                full_response = ""
                for chunk in response.iter_content(decode_unicode=True):
                    if chunk:
                        full_response += chunk
                
                results.append({
                    "claim": claim,
                    "normalized": full_response,
                    "success": True
                })
                
                success = True
                
            except requests.exceptions.Timeout:
                logger.error(f"Timeout processing claim {i+1}")
                retries += 1
                time.sleep(5 * retries)  # Exponential backoff
                
            except requests.exceptions.RequestException as e:
                logger.error(f"Error processing claim {i+1}: {e}")
                retries += 1
                time.sleep(5 * retries)
        
        if not success:
            results.append({
                "claim": claim,
                "normalized": None,
                "success": False,
                "error": "Max retries exceeded"
            })
        
        # Rate limiting delay
        if i < len(claims) - 1:
            time.sleep(6)
    
    return results

Progress Tracking

Implement progress tracking for long-running batch jobs:
from tqdm import tqdm
import requests

def batch_process_with_progress(claims, model="gpt-4o", api_key="sk-proj-..."):
    results = []
    
    # Create progress bar
    with tqdm(total=len(claims), desc="Processing claims") as pbar:
        for claim in claims:
            response = requests.post(
                "https://api.checkthat-ai.com/chat",
                json={
                    "user_query": claim,
                    "model": model,
                    "api_key": api_key
                },
                stream=True
            )
            
            full_response = ""
            for chunk in response.iter_content(decode_unicode=True):
                if chunk:
                    full_response += chunk
            
            results.append(full_response)
            pbar.update(1)
            
            # Rate limiting
            time.sleep(6)
    
    return results

Best Practices

Always implement delays between requests and handle 429 responses gracefully. Use the Retry-After header to determine wait times.
Use try-catch blocks and retry logic with exponential backoff. Log errors for debugging and monitoring.
Implement progress tracking and logging for long-running batch jobs. This helps identify issues early.
Periodically save results to avoid losing progress if the process is interrupted.
Use parallel processing with semaphores to respect rate limits while maximizing throughput.

Chat Endpoint

Process individual claims with streaming

Chat Completions

OpenAI-compatible completions API

Build docs developers (and LLMs) love