Skip to main content

Overview

WebSocket endpoint for observing batch processing progress in real-time. This is a read-only observer - processing runs independently in the background, and disconnecting the WebSocket does not cancel the job.

Connection Details

Endpoint: ws://localhost:8000/api/batch/ws/{job_id} Authentication: Pass token as query parameter
ws://localhost:8000/api/batch/ws/{job_id}?token=your_access_token
Alternatively, you can pass the token in the Authorization header:
Authorization: Bearer your_access_token

Path Parameters

job_id
string
required
Unique identifier for the batch processing job. Generated when starting a job via POST /api/batch/start.

Connection Flow

1. Client Connects

Connect to the WebSocket endpoint with the job ID and authentication token.

2. Server Sends Catchup Message

On connect, the server immediately sends all results so far:
type
string
Message type: "catchup"
job_id
string
The job identifier
state
object
Current job state including status, progress, processed_count, failed_count, total
results
array
Array of all document results processed so far
Example:
{
  "type": "catchup",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "state": {
    "status": "processing",
    "progress": 0.3,
    "processed_count": 3,
    "failed_count": 0,
    "total": 10
  },
  "results": [
    {
      "row_id": 0,
      "title": "Document 1",
      "status": "success",
      "tags": ["tag1", "tag2"]
    }
  ]
}

3. Server Streams Progress Updates

As each document is processed, the server sends progress updates:
type
string
Message type: "progress"
job_id
string
The job identifier
row_id
integer
Zero-based row index in the batch
row_number
integer
One-based row number for display (row_id + 1)
title
string
Document title
status
string
Processing status: "pending", "processing", "success", or "failed"
progress
number
Overall job progress as a decimal (0.0 to 1.0)
tags
array
Generated tags (present on success)
error
string
Error message (present on failure)
error_type
string
Error category: "rate-limit", "model-error", "network", etc.
retry_after_ms
integer
Milliseconds to wait before retry (for rate limit errors)
retry_count
integer
Current attempt number
model_name
string
AI model being used for processing
metadata
object
Additional metadata (extraction_method, is_scanned, ocr_confidence, etc.)
Example:
{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "row_id": 0,
  "row_number": 1,
  "title": "Training Manual",
  "status": "success",
  "progress": 0.1,
  "tags": [
    "employee-training",
    "onboarding-procedures",
    "workplace-safety",
    "compliance-guidelines"
  ],
  "metadata": {
    "extraction_method": "pypdf2",
    "is_scanned": false,
    "processing_time": 2.3
  }
}

4. Server Sends Completion Message

When all documents are processed or the job is stopped:
type
string
Message type: "completed", "cancelled", or "error"
job_id
string
The job identifier
processed_count
string
Number of successfully processed documents
failed_count
string
Number of failed documents
message
string
Completion summary message
Example:
{
  "type": "completed",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "processed_count": "9",
  "failed_count": "1",
  "message": "Job completed"
}

5. Keepalive Messages

Every 30 seconds (if no updates), the server sends a keepalive:
{
  "type": "keepalive",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "processing"
}

Message Types Summary

TypeWhen SentDescription
catchupOn connectionAll results and state so far
progressPer documentIndividual document processing update
completedJob finishedAll documents processed
cancelledJob cancelledProcessing stopped by user
errorJob failedCritical error occurred
keepaliveEvery 30sConnection health check

Error Responses

If authentication fails:
{
  "error": "Authentication required",
  "job_id": "550e8400-e29b-41d4-a716-446655440000"
}
Connection closes with code 1008 (policy violation).

Usage Example

const jobId = '550e8400-e29b-41d4-a716-446655440000';
const token = 'your_access_token';
const ws = new WebSocket(`ws://localhost:8000/api/batch/ws/${jobId}?token=${token}`);

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  switch(data.type) {
    case 'catchup':
      console.log('Caught up to current state:', data.state);
      console.log('Existing results:', data.results.length);
      break;
      
    case 'progress':
      console.log(`Document ${data.row_number}: ${data.title}`);
      console.log(`Status: ${data.status}, Progress: ${data.progress * 100}%`);
      if (data.tags) {
        console.log('Tags:', data.tags);
      }
      break;
      
    case 'completed':
      console.log('Job completed!');
      console.log(`Success: ${data.processed_count}, Failed: ${data.failed_count}`);
      ws.close();
      break;
      
    case 'keepalive':
      console.log('Connection alive, status:', data.status);
      break;
  }
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = () => {
  console.log('WebSocket closed (job continues running)');
};

Important Notes

Disconnecting from the WebSocket does not cancel the job. Processing continues in the background. Use POST /api/batch/jobs//cancel to stop a job.
If the job is already completed when you connect, you’ll receive the catchup message followed immediately by a completion message.
Jobs are stored in Redis with a TTL. Results are available for retrieval for a limited time after completion.

Build docs developers (and LLMs) love