Skip to main content

Overview

The olmOCR pipeline (pipeline.py) is the core component that processes millions of PDFs through a fine-tuned vision-language model. It supports both local single-machine processing and distributed multi-node processing via S3.

Pipeline Modes

Process PDFs on a single machine with a GPU:
python -m olmocr.pipeline ./localworkspace --pdfs tests/gnarly_pdfs/*.pdf
Features:
  • Workspace stored on local filesystem
  • Results in ./localworkspace/results/
  • Lock files in ./localworkspace/worker_locks/
  • Work queue in ./localworkspace/work_index_list.csv.zstd

Work Queue Architecture

The work queue system enables efficient distributed processing with fault tolerance and coordination.

Queue Components

@dataclass
class WorkItem:
    """Represents a single work item in the queue"""
    hash: str
    work_paths: List[str]
1

Queue Initialization

PDFs are grouped into work items (~500 pages each) and stored in work_index_list.csv.zstd:
abc123def,s3://bucket/doc1.pdf,s3://bucket/doc2.pdf,s3://bucket/doc3.pdf
456789ghi,s3://bucket/doc4.pdf,s3://bucket/doc5.pdf
Each line contains a hash and the PDF paths in that work group.
2

Work Assignment

Workers pull work items randomly from the queue:
async def get_work(self, worker_lock_timeout_secs: int = 1800):
    while True:
        work_item = self._queue.get_nowait()

        # Skip if already completed
        if await self.is_completed(work_item.hash):
            continue

        # Skip if locked by another worker (within timeout)
        if lock_is_active(work_item.hash):
            continue

        # Create lock file
        create_lock(work_item.hash)
        return work_item
3

Lock Management

Lock files prevent duplicate processing:
  • Created in worker_locks/output_{hash}.jsonl
  • Timeout after 30 minutes (configurable)
  • Deleted when work completes successfully
4

Completion Tracking

Completed work is tracked via output files:
  • Results written to results/output_{hash}.jsonl
  • Workers skip items with existing output files
  • Enables resumable processing after failures

Work Queue Classes

olmOCR provides two work queue implementations:
class LocalWorkQueue(WorkQueue):
    """Local filesystem-based work queue"""

    def __init__(self, workspace_path: str):
        self.workspace_path = os.path.abspath(workspace_path)
        self._index_path = os.path.join(workspace_path, "work_index_list.csv.zstd")
        self._results_dir = os.path.join(workspace_path, "results")
        self._locks_dir = os.path.join(workspace_path, "worker_locks")
        self._queue = Queue()

Inference Architecture

The pipeline uses sglang for efficient vision model inference.

sglang Server

A background task manages the sglang server:
async def sglang_server_task(args, semaphore):
    cmd = [
        "python3", "-m", "sglang.launch_server",
        "--model-path", args.model,
        "--chat-template", args.model_chat_template,
        "--port", str(SGLANG_SERVER_PORT),
        "--log-level-http", "warning",
    ]

    proc = await asyncio.create_subprocess_exec(*cmd)

    # Monitor server logs
    await read_server_logs(proc.stdout, proc.stderr)

    # Release semaphore when queue is empty
    if server_ready and queue_empty:
        semaphore.release()
The server automatically restarts on errors (up to 5 times). GPU memory fraction is adjusted based on available VRAM.

Request Flow

Each PDF page goes through this inference pipeline:
1

Build Query

Prepare the inference request with image and anchor text:
async def build_page_query(local_pdf_path, page, target_longest_image_dim, target_anchor_text_len):
    # Render PDF page to base64 PNG (async)
    image_base64 = asyncio.to_thread(
        render_pdf_to_base64png,
        local_pdf_path,
        page,
        target_longest_image_dim=target_longest_image_dim
    )

    # Extract anchor text (CPU-bound, runs in process pool)
    anchor_text = loop.run_in_executor(
        process_pool,
        partial(get_anchor_text, pdf_engine="pdfreport", target_length=target_anchor_text_len),
        local_pdf_path,
        page
    )

    image_base64, anchor_text = await asyncio.gather(image_base64, anchor_text)

    return {
        "model": "Qwen/Qwen2-VL-7B-Instruct",
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": build_finetuning_prompt(anchor_text)},
                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}}
                ]
            }
        ],
        "max_tokens": 3000,
        "temperature": 0.8
    }
2

Send Request

Make HTTP POST request to sglang server:
async def process_page(args, worker_id, pdf_path, pdf_local_path, page_num):
    query = await build_page_query(pdf_local_path, page_num, args.target_longest_image_dim, args.target_anchor_text_len)

    status_code, response_body = await apost(
        f"http://localhost:{SGLANG_SERVER_PORT}/v1/chat/completions",
        json_data=query
    )

    base_response_data = json.loads(response_body)
    model_response_json = json.loads(base_response_data["choices"][0]["message"]["content"])
    page_response = PageResponse(**model_response_json)

    return PageResult(
        pdf_path,
        page_num,
        page_response,
        input_tokens=base_response_data["usage"].get("prompt_tokens", 0),
        output_tokens=base_response_data["usage"].get("completion_tokens", 0),
        is_fallback=False
    )
3

Retry Logic

Handle failures with retry and fallback:
  • Max 8 retries per page (configurable via --max_page_retries)
  • Exponential backoff for server connection errors
  • Reduce anchor text length if context exceeds limit
  • Rotate image if model detects incorrect orientation
  • Fallback to pdftotext if all retries fail
4

Aggregate Results

Combine page results into a Dolma document:
async def process_pdf(args, worker_id, pdf_orig_path):
    # Process all pages concurrently
    async with asyncio.TaskGroup() as tg:
        for page_num in range(1, num_pages + 1):
            task = tg.create_task(process_page(args, worker_id, pdf_orig_path, tf.name, page_num))
            page_tasks.append(task)

    page_results = [task.result() for task in page_tasks]

    # Check error rate
    num_fallback_pages = sum(page_result.is_fallback for page_result in page_results)
    if num_fallback_pages / num_pages > args.max_page_error_rate:
        logger.error(f"Document {pdf_orig_path} has too many fallback pages, discarding")
        return None

    return build_dolma_document(pdf_orig_path, page_results)

Custom HTTP Implementation

For 100M+ requests, standard HTTP libraries can deadlock. olmOCR uses a custom async HTTP implementation:
async def apost(url, json_data):
    """Manual HTTP POST implementation to avoid connection pool issues"""
    parsed_url = urlparse(url)
    host = parsed_url.hostname
    port = parsed_url.port or 80

    reader, writer = await asyncio.open_connection(host, port)

    json_payload = json.dumps(json_data)
    request = (
        f"POST {path} HTTP/1.1\r\n"
        f"Host: {host}\r\n"
        f"Content-Type: application/json\r\n"
        f"Content-Length: {len(json_payload)}\r\n"
        f"Connection: close\r\n\r\n"
        f"{json_payload}"
    )
    writer.write(request.encode())
    await writer.drain()

    # Read response...
    return status_code, response_body
This custom implementation only supports fixed content-length responses. It’s specifically designed for sglang’s API.

Worker Coordination

Multiple workers coordinate via semaphores and async task groups:
async def worker(args, work_queue, semaphore, worker_id):
    while True:
        # Wait for semaphore (only 1 worker processes at a time)
        await semaphore.acquire()

        work_item = await work_queue.get_work()
        if work_item is None:
            semaphore.release()
            break

        try:
            # Process all PDFs in this work item
            async with asyncio.TaskGroup() as tg:
                dolma_tasks = [tg.create_task(process_pdf(args, worker_id, pdf)) for pdf in work_item.work_paths]

            # Write results
            output_path = os.path.join(args.workspace, "results", f"output_{work_item.hash}.jsonl")
            write_dolma_docs(output_path, dolma_docs)

            await work_queue.mark_done(work_item)
        finally:
            semaphore.release()

Semaphore Control

The pipeline uses a semaphore to ensure optimal GPU utilization:
  • Initial state: Semaphore starts with count=1
  • Worker blocking: Only 1 worker can send requests at a time
  • Auto-release: When sglang queue is empty for 30s, semaphore releases
  • Saturation: Next worker starts sending requests to keep GPU busy
  • Output: First worker continues writing results while second worker saturates GPU
This design maximizes GPU utilization while allowing workers to output results as soon as possible.

Process Pools

CPU-bound operations run in separate process pools to avoid blocking:
# Global process pool (max 32 workers)
process_pool = ProcessPoolExecutor(
    max_workers=min(multiprocessing.cpu_count() // 2 + 1, 32),
    mp_context=multiprocessing.get_context("spawn")
)

# Anchor text extraction runs in process pool
loop = asyncio.get_running_loop()
anchor_text = loop.run_in_executor(
    process_pool,
    partial(get_anchor_text, pdf_engine="pdfreport", target_length=target_anchor_text_len),
    local_pdf_path,
    page
)
get_anchor_text() is not thread-safe and must run in a process pool, not a thread pool.

Pipeline Parameters

Key configuration options from pipeline.py:889-931:
workspace
string
required
Filesystem path or S3 path for storing work queue and results
pdfs
string[]
PDF paths to process (glob patterns supported)
pages_per_group
int
default:"500"
Target pages per work item group
max_page_retries
int
default:"8"
Maximum retries per page before falling back
max_page_error_rate
float
default:"0.004"
Maximum allowable failed pages per document (1/250)
workers
int
default:"8"
Number of concurrent workers
model
string
default:"allenai/olmOCR-7B-0225-preview"
Model path (local, HuggingFace, S3, or GCS)
model_max_context
int
default:"8192"
Maximum context length for the model
target_longest_image_dim
int
default:"1024"
Longest dimension for rendered PDF images
target_anchor_text_len
int
default:"6000"
Maximum anchor text length in characters

Metrics and Monitoring

The pipeline tracks detailed metrics:
metrics = MetricsKeeper(window=60 * 5)  # 5-minute window
tracker = WorkerTracker()  # Per-worker status

# Track token usage
metrics.add_metrics(
    sglang_input_tokens=prompt_tokens,
    sglang_output_tokens=completion_tokens,
    finished_input_tokens=total_input,
    finished_output_tokens=total_output
)

# Track worker status
await tracker.track_work(worker_id, f"{pdf_path}-{page_num}", "started")
await tracker.track_work(worker_id, f"{pdf_path}-{page_num}", "finished")
Metrics are logged every 10 seconds during processing.

Next Steps

Anchor Text

Learn how anchor text improves extraction quality

CLI Reference

Explore all pipeline command-line options

Work Queues

API documentation for work queue classes

Cluster Usage

Deploy olmOCR at scale

Build docs developers (and LLMs) love