Skip to main content

Overview

CheckThat AI currently does not have a dedicated WebSocket endpoint implemented in the API. However, the platform provides streaming HTTP responses that offer similar real-time capabilities for most use cases.

Current Real-Time Capabilities

HTTP Streaming

The /chat endpoint supports Server-Sent Events (SSE)-style streaming, providing real-time updates as the LLM generates responses:
import requests

response = requests.post(
    "https://api.checkthat-ai.com/chat",
    json={
        "user_query": "Analyze this claim...",
        "model": "gpt-4o",
        "api_key": "sk-proj-..."
    },
    stream=True  # Enable streaming
)

# Process chunks as they arrive
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
    if chunk:
        print(chunk, end='', flush=True)

Chat Completions Streaming

The /v1/chat/completions endpoint supports OpenAI-compatible streaming:
from openai import OpenAI

client = OpenAI(
    api_key="sk-proj-your-key",
    base_url="https://api.checkthat-ai.com/v1"
)

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Your claim here"}],
    stream=True  # Enable streaming
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end='', flush=True)

Implementing WebSocket-Like Behavior

While there’s no native WebSocket support, you can implement similar real-time functionality:

1. Streaming with Progress Updates

const fetchWithProgress = async (query) => {
  const response = await fetch('https://api.checkthat-ai.com/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      user_query: query,
      model: 'gpt-4o',
      api_key: 'sk-proj-...'
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    
    // Emit progress event
    onProgress(buffer);
  }

  return buffer;
};

// Usage
await fetchWithProgress('Claim to analyze');

2. Long Polling Alternative

For bidirectional communication, implement long polling:
import time
import requests

class LongPollingClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.api_key = api_key
        self.conversation_id = None
    
    def send_message(self, message):
        """Send a message and poll for response."""
        response = requests.post(
            f"{self.base_url}/chat",
            json={
                "user_query": message,
                "model": "gpt-4o",
                "api_key": self.api_key,
                "conversation_id": self.conversation_id
            },
            stream=True
        )
        
        full_response = ""
        for chunk in response.iter_content(decode_unicode=True):
            if chunk:
                full_response += chunk
                # Emit real-time update
                yield chunk
        
        return full_response

# Usage
client = LongPollingClient("https://api.checkthat-ai.com", "sk-proj-...")
for chunk in client.send_message("Analyze this claim"):
    print(chunk, end='', flush=True)

3. Server-Sent Events (SSE) Wrapper

Create an SSE wrapper for the streaming endpoint:
class CheckThatSSE {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.baseUrl = 'https://api.checkthat-ai.com';
  }

  async *streamChat(query, model = 'gpt-4o') {
    const response = await fetch(`${this.baseUrl}/chat`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        user_query: query,
        model: model,
        api_key: this.apiKey
      })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      yield { type: 'message', data: chunk };
    }

    yield { type: 'done' };
  }
}

// Usage
const sse = new CheckThatSSE('sk-proj-...');

for await (const event of sse.streamChat('Your claim here')) {
  if (event.type === 'message') {
    console.log(event.data);
  } else if (event.type === 'done') {
    console.log('Stream complete');
  }
}

Future WebSocket Support

If CheckThat AI implements WebSocket support in the future, it would likely follow this pattern:

Proposed WebSocket Endpoint

ws://api.checkthat-ai.com/ws
wss://api.checkthat-ai.com/ws  (secure)

Connection Authentication

const ws = new WebSocket(
  'wss://api.checkthat-ai.com/ws',
  [],
  {
    headers: {
      'Authorization': 'Bearer sk-proj-your-api-key'
    }
  }
);

Message Format

Client to Server:
{
  "type": "claim_analysis",
  "data": {
    "query": "The Earth is flat.",
    "model": "gpt-4o",
    "conversation_id": "conv-123"
  }
}
Server to Client:
{
  "type": "chunk",
  "data": {
    "content": "Analyzing claim...",
    "conversation_id": "conv-123",
    "timestamp": "2026-03-04T15:30:00Z"
  }
}

Event Types

connection_established
event
Sent when WebSocket connection is established
chunk
event
Streaming response chunk from LLM
complete
event
Response generation completed
error
event
Error occurred during processing
rate_limit
event
Rate limit warning or exceeded

Alternatives to WebSocket

Until native WebSocket support is available, consider these alternatives:

1. Use HTTP/2 Server Push

Modern browsers support HTTP/2 which provides multiplexing and server push capabilities.

2. GraphQL Subscriptions

If CheckThat AI adds GraphQL support, subscriptions provide real-time updates:
subscription OnClaimProcessed {
  claimProcessed(conversationId: "conv-123") {
    normalized
    confidence
    timestamp
  }
}

3. Polling with ETags

Implement efficient polling using ETags to minimize bandwidth:
import requests

def poll_with_etag(url, interval=5):
    etag = None
    
    while True:
        headers = {'If-None-Match': etag} if etag else {}
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            etag = response.headers.get('ETag')
            yield response.json()
        elif response.status_code == 304:
            # No changes
            pass
        
        time.sleep(interval)

Real-Time Monitoring

For monitoring long-running operations, implement custom event handling:
import requests
import json
from typing import Callable

class StreamingMonitor:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.checkthat-ai.com"
    
    def process_with_callbacks(
        self,
        query: str,
        model: str,
        on_chunk: Callable[[str], None] = None,
        on_complete: Callable[[str], None] = None,
        on_error: Callable[[Exception], None] = None
    ):
        try:
            response = requests.post(
                f"{self.base_url}/chat",
                json={
                    "user_query": query,
                    "model": model,
                    "api_key": self.api_key
                },
                stream=True
            )
            
            response.raise_for_status()
            
            full_response = ""
            for chunk in response.iter_content(decode_unicode=True):
                if chunk:
                    full_response += chunk
                    if on_chunk:
                        on_chunk(chunk)
            
            if on_complete:
                on_complete(full_response)
                
        except Exception as e:
            if on_error:
                on_error(e)
            raise

# Usage
monitor = StreamingMonitor("sk-proj-...")

monitor.process_with_callbacks(
    query="Analyze this claim",
    model="gpt-4o",
    on_chunk=lambda chunk: print(f"Received: {chunk}"),
    on_complete=lambda result: print(f"\nComplete: {len(result)} chars"),
    on_error=lambda e: print(f"Error: {e}")
)

Performance Considerations

When streaming large responses, implement proper buffer management to avoid memory issues. Process chunks incrementally rather than accumulating everything in memory.
Use connection pooling for multiple streaming requests to reduce overhead:
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=10))
Implement automatic reconnection logic with exponential backoff for interrupted streams.
Set appropriate timeouts for streaming connections:
response = requests.post(url, stream=True, timeout=(10, 300))
# (connection timeout, read timeout)

Best Practices

  1. Use Streaming for Long Responses - Always enable streaming for better user experience
  2. Handle Connection Interruptions - Implement retry logic and state management
  3. Monitor Rate Limits - Track rate limit headers even in streaming mode
  4. Implement Heartbeats - Send periodic keepalive messages for long-running connections
  5. Clean Up Resources - Properly close streaming connections when done

Chat Endpoint

Streaming chat interface for claim normalization

Chat Completions

OpenAI-compatible streaming completions

Build docs developers (and LLMs) love