Overview
CheckThat AI currently does not have a dedicated WebSocket endpoint implemented in the API. However, the platform provides streaming HTTP responses that offer similar real-time capabilities for most use cases.
Current Real-Time Capabilities
HTTP Streaming
The /chat endpoint supports Server-Sent Events (SSE) -style streaming, providing real-time updates as the LLM generates responses:
import requests
response = requests.post(
"https://api.checkthat-ai.com/chat" ,
json = {
"user_query" : "Analyze this claim..." ,
"model" : "gpt-4o" ,
"api_key" : "sk-proj-..."
},
stream = True # Enable streaming
)
# Process chunks as they arrive
for chunk in response.iter_content( chunk_size = None , decode_unicode = True ):
if chunk:
print (chunk, end = '' , flush = True )
Chat Completions Streaming
The /v1/chat/completions endpoint supports OpenAI-compatible streaming:
from openai import OpenAI
client = OpenAI(
api_key = "sk-proj-your-key" ,
base_url = "https://api.checkthat-ai.com/v1"
)
stream = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Your claim here" }],
stream = True # Enable streaming
)
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = '' , flush = True )
Implementing WebSocket-Like Behavior
While there’s no native WebSocket support, you can implement similar real-time functionality:
1. Streaming with Progress Updates
const fetchWithProgress = async ( query ) => {
const response = await fetch ( 'https://api.checkthat-ai.com/chat' , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({
user_query: query ,
model: 'gpt-4o' ,
api_key: 'sk-proj-...'
})
});
const reader = response . body . getReader ();
const decoder = new TextDecoder ();
let buffer = '' ;
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
buffer += decoder . decode ( value , { stream: true });
// Emit progress event
onProgress ( buffer );
}
return buffer ;
};
// Usage
await fetchWithProgress ( 'Claim to analyze' );
2. Long Polling Alternative
For bidirectional communication, implement long polling:
import time
import requests
class LongPollingClient :
def __init__ ( self , base_url , api_key ):
self .base_url = base_url
self .api_key = api_key
self .conversation_id = None
def send_message ( self , message ):
"""Send a message and poll for response."""
response = requests.post(
f " { self .base_url } /chat" ,
json = {
"user_query" : message,
"model" : "gpt-4o" ,
"api_key" : self .api_key,
"conversation_id" : self .conversation_id
},
stream = True
)
full_response = ""
for chunk in response.iter_content( decode_unicode = True ):
if chunk:
full_response += chunk
# Emit real-time update
yield chunk
return full_response
# Usage
client = LongPollingClient( "https://api.checkthat-ai.com" , "sk-proj-..." )
for chunk in client.send_message( "Analyze this claim" ):
print (chunk, end = '' , flush = True )
3. Server-Sent Events (SSE) Wrapper
Create an SSE wrapper for the streaming endpoint:
class CheckThatSSE {
constructor ( apiKey ) {
this . apiKey = apiKey ;
this . baseUrl = 'https://api.checkthat-ai.com' ;
}
async * streamChat ( query , model = 'gpt-4o' ) {
const response = await fetch ( ` ${ this . baseUrl } /chat` , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({
user_query: query ,
model: model ,
api_key: this . apiKey
})
});
const reader = response . body . getReader ();
const decoder = new TextDecoder ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
const chunk = decoder . decode ( value );
yield { type: 'message' , data: chunk };
}
yield { type: 'done' };
}
}
// Usage
const sse = new CheckThatSSE ( 'sk-proj-...' );
for await ( const event of sse . streamChat ( 'Your claim here' )) {
if ( event . type === 'message' ) {
console . log ( event . data );
} else if ( event . type === 'done' ) {
console . log ( 'Stream complete' );
}
}
Future WebSocket Support
If CheckThat AI implements WebSocket support in the future, it would likely follow this pattern:
Proposed WebSocket Endpoint
ws://api.checkthat-ai.com/ws
wss://api.checkthat-ai.com/ws (secure)
Connection Authentication
const ws = new WebSocket (
'wss://api.checkthat-ai.com/ws' ,
[],
{
headers: {
'Authorization' : 'Bearer sk-proj-your-api-key'
}
}
);
Client to Server:
{
"type" : "claim_analysis" ,
"data" : {
"query" : "The Earth is flat." ,
"model" : "gpt-4o" ,
"conversation_id" : "conv-123"
}
}
Server to Client:
{
"type" : "chunk" ,
"data" : {
"content" : "Analyzing claim..." ,
"conversation_id" : "conv-123" ,
"timestamp" : "2026-03-04T15:30:00Z"
}
}
Event Types
Sent when WebSocket connection is established
Streaming response chunk from LLM
Response generation completed
Error occurred during processing
Rate limit warning or exceeded
Alternatives to WebSocket
Until native WebSocket support is available, consider these alternatives:
1. Use HTTP/2 Server Push
Modern browsers support HTTP/2 which provides multiplexing and server push capabilities.
2. GraphQL Subscriptions
If CheckThat AI adds GraphQL support, subscriptions provide real-time updates:
subscription OnClaimProcessed {
claimProcessed ( conversationId : "conv-123" ) {
normalized
confidence
timestamp
}
}
Implement efficient polling using ETags to minimize bandwidth:
import requests
def poll_with_etag ( url , interval = 5 ):
etag = None
while True :
headers = { 'If-None-Match' : etag} if etag else {}
response = requests.get(url, headers = headers)
if response.status_code == 200 :
etag = response.headers.get( 'ETag' )
yield response.json()
elif response.status_code == 304 :
# No changes
pass
time.sleep(interval)
Real-Time Monitoring
For monitoring long-running operations, implement custom event handling:
import requests
import json
from typing import Callable
class StreamingMonitor :
def __init__ ( self , api_key : str ):
self .api_key = api_key
self .base_url = "https://api.checkthat-ai.com"
def process_with_callbacks (
self ,
query : str ,
model : str ,
on_chunk : Callable[[ str ], None ] = None ,
on_complete : Callable[[ str ], None ] = None ,
on_error : Callable[[ Exception ], None ] = None
):
try :
response = requests.post(
f " { self .base_url } /chat" ,
json = {
"user_query" : query,
"model" : model,
"api_key" : self .api_key
},
stream = True
)
response.raise_for_status()
full_response = ""
for chunk in response.iter_content( decode_unicode = True ):
if chunk:
full_response += chunk
if on_chunk:
on_chunk(chunk)
if on_complete:
on_complete(full_response)
except Exception as e:
if on_error:
on_error(e)
raise
# Usage
monitor = StreamingMonitor( "sk-proj-..." )
monitor.process_with_callbacks(
query = "Analyze this claim" ,
model = "gpt-4o" ,
on_chunk = lambda chunk : print ( f "Received: { chunk } " ),
on_complete = lambda result : print ( f " \n Complete: { len (result) } chars" ),
on_error = lambda e : print ( f "Error: { e } " )
)
When streaming large responses, implement proper buffer management to avoid memory issues. Process chunks incrementally rather than accumulating everything in memory.
Use connection pooling for multiple streaming requests to reduce overhead: session = requests.Session()
session.mount( 'https://' , requests.adapters.HTTPAdapter( pool_connections = 10 ))
Implement automatic reconnection logic with exponential backoff for interrupted streams.
Set appropriate timeouts for streaming connections: response = requests.post(url, stream = True , timeout = ( 10 , 300 ))
# (connection timeout, read timeout)
Best Practices
Use Streaming for Long Responses - Always enable streaming for better user experience
Handle Connection Interruptions - Implement retry logic and state management
Monitor Rate Limits - Track rate limit headers even in streaming mode
Implement Heartbeats - Send periodic keepalive messages for long-running connections
Clean Up Resources - Properly close streaming connections when done
Chat Endpoint Streaming chat interface for claim normalization
Chat Completions OpenAI-compatible streaming completions