Overview
The olmOCR pipeline (pipeline.py) is the core component that processes millions of PDFs through a fine-tuned vision-language model. It supports both local single-machine processing and distributed multi-node processing via S3.
Pipeline Modes
Local Processing
S3 Distributed
Beaker Cluster
Process PDFs on a single machine with a GPU: python -m olmocr.pipeline ./localworkspace --pdfs tests/gnarly_pdfs/ * .pdf
Features:
Workspace stored on local filesystem
Results in ./localworkspace/results/
Lock files in ./localworkspace/worker_locks/
Work queue in ./localworkspace/work_index_list.csv.zstd
Process millions of PDFs across multiple GPU nodes: # First worker initializes queue
python -m olmocr.pipeline s3://bucket/workspace --pdfs s3://bucket/pdfs/ * .pdf
# Additional workers join automatically
python -m olmocr.pipeline s3://bucket/workspace
Features:
Workspace coordinated via S3
Multiple workers pull from shared queue
Lock files prevent duplicate processing
Results written to s3://bucket/workspace/results/
Launch multiple GPU workers on Beaker: python -m olmocr.pipeline s3://bucket/workspace \
--pdfs s3://bucket/pdfs/ * .pdf \
--beaker \
--beaker_gpus 4
Features:
Automatically launches N GPU replicas
Manages secrets (AWS credentials, HF tokens)
Configurable priority and cluster selection
Work Queue Architecture
The work queue system enables efficient distributed processing with fault tolerance and coordination.
Queue Components
@dataclass
class WorkItem :
"""Represents a single work item in the queue"""
hash : str
work_paths: List[ str ]
Queue Initialization
PDFs are grouped into work items (~500 pages each) and stored in work_index_list.csv.zstd: abc123def, s3://bucket/doc1.pdf, s3://bucket/doc2.pdf, s3://bucket/doc3.pdf
456789ghi, s3://bucket/doc4.pdf, s3://bucket/doc5.pdf
Each line contains a hash and the PDF paths in that work group.
Work Assignment
Workers pull work items randomly from the queue: async def get_work ( self , worker_lock_timeout_secs : int = 1800 ):
while True :
work_item = self ._queue.get_nowait()
# Skip if already completed
if await self .is_completed(work_item.hash):
continue
# Skip if locked by another worker (within timeout)
if lock_is_active(work_item.hash):
continue
# Create lock file
create_lock(work_item.hash)
return work_item
Lock Management
Lock files prevent duplicate processing:
Created in worker_locks/output_{hash}.jsonl
Timeout after 30 minutes (configurable)
Deleted when work completes successfully
Completion Tracking
Completed work is tracked via output files:
Results written to results/output_{hash}.jsonl
Workers skip items with existing output files
Enables resumable processing after failures
Work Queue Classes
olmOCR provides two work queue implementations:
LocalWorkQueue
S3WorkQueue
class LocalWorkQueue ( WorkQueue ):
"""Local filesystem-based work queue"""
def __init__ ( self , workspace_path : str ):
self .workspace_path = os.path.abspath(workspace_path)
self ._index_path = os.path.join(workspace_path, "work_index_list.csv.zstd" )
self ._results_dir = os.path.join(workspace_path, "results" )
self ._locks_dir = os.path.join(workspace_path, "worker_locks" )
self ._queue = Queue()
Inference Architecture
The pipeline uses sglang for efficient vision model inference.
sglang Server
A background task manages the sglang server:
async def sglang_server_task ( args , semaphore ):
cmd = [
"python3" , "-m" , "sglang.launch_server" ,
"--model-path" , args.model,
"--chat-template" , args.model_chat_template,
"--port" , str ( SGLANG_SERVER_PORT ),
"--log-level-http" , "warning" ,
]
proc = await asyncio.create_subprocess_exec( * cmd)
# Monitor server logs
await read_server_logs(proc.stdout, proc.stderr)
# Release semaphore when queue is empty
if server_ready and queue_empty:
semaphore.release()
The server automatically restarts on errors (up to 5 times). GPU memory fraction is adjusted based on available VRAM.
Request Flow
Each PDF page goes through this inference pipeline:
Build Query
Prepare the inference request with image and anchor text: async def build_page_query ( local_pdf_path , page , target_longest_image_dim , target_anchor_text_len ):
# Render PDF page to base64 PNG (async)
image_base64 = asyncio.to_thread(
render_pdf_to_base64png,
local_pdf_path,
page,
target_longest_image_dim = target_longest_image_dim
)
# Extract anchor text (CPU-bound, runs in process pool)
anchor_text = loop.run_in_executor(
process_pool,
partial(get_anchor_text, pdf_engine = "pdfreport" , target_length = target_anchor_text_len),
local_pdf_path,
page
)
image_base64, anchor_text = await asyncio.gather(image_base64, anchor_text)
return {
"model" : "Qwen/Qwen2-VL-7B-Instruct" ,
"messages" : [
{
"role" : "user" ,
"content" : [
{ "type" : "text" , "text" : build_finetuning_prompt(anchor_text)},
{ "type" : "image_url" , "image_url" : { "url" : f "data:image/png;base64, { image_base64 } " }}
]
}
],
"max_tokens" : 3000 ,
"temperature" : 0.8
}
Send Request
Make HTTP POST request to sglang server: async def process_page ( args , worker_id , pdf_path , pdf_local_path , page_num ):
query = await build_page_query(pdf_local_path, page_num, args.target_longest_image_dim, args.target_anchor_text_len)
status_code, response_body = await apost(
f "http://localhost: { SGLANG_SERVER_PORT } /v1/chat/completions" ,
json_data = query
)
base_response_data = json.loads(response_body)
model_response_json = json.loads(base_response_data[ "choices" ][ 0 ][ "message" ][ "content" ])
page_response = PageResponse( ** model_response_json)
return PageResult(
pdf_path,
page_num,
page_response,
input_tokens = base_response_data[ "usage" ].get( "prompt_tokens" , 0 ),
output_tokens = base_response_data[ "usage" ].get( "completion_tokens" , 0 ),
is_fallback = False
)
Retry Logic
Handle failures with retry and fallback:
Max 8 retries per page (configurable via --max_page_retries)
Exponential backoff for server connection errors
Reduce anchor text length if context exceeds limit
Rotate image if model detects incorrect orientation
Fallback to pdftotext if all retries fail
Aggregate Results
Combine page results into a Dolma document: async def process_pdf ( args , worker_id , pdf_orig_path ):
# Process all pages concurrently
async with asyncio.TaskGroup() as tg:
for page_num in range ( 1 , num_pages + 1 ):
task = tg.create_task(process_page(args, worker_id, pdf_orig_path, tf.name, page_num))
page_tasks.append(task)
page_results = [task.result() for task in page_tasks]
# Check error rate
num_fallback_pages = sum (page_result.is_fallback for page_result in page_results)
if num_fallback_pages / num_pages > args.max_page_error_rate:
logger.error( f "Document { pdf_orig_path } has too many fallback pages, discarding" )
return None
return build_dolma_document(pdf_orig_path, page_results)
Custom HTTP Implementation
For 100M+ requests, standard HTTP libraries can deadlock. olmOCR uses a custom async HTTP implementation:
async def apost ( url , json_data ):
"""Manual HTTP POST implementation to avoid connection pool issues"""
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or 80
reader, writer = await asyncio.open_connection(host, port)
json_payload = json.dumps(json_data)
request = (
f "POST { path } HTTP/1.1 \r\n "
f "Host: { host } \r\n "
f "Content-Type: application/json \r\n "
f "Content-Length: { len (json_payload) } \r\n "
f "Connection: close \r\n\r\n "
f " { json_payload } "
)
writer.write(request.encode())
await writer.drain()
# Read response...
return status_code, response_body
This custom implementation only supports fixed content-length responses. It’s specifically designed for sglang’s API.
Worker Coordination
Multiple workers coordinate via semaphores and async task groups:
async def worker ( args , work_queue , semaphore , worker_id ):
while True :
# Wait for semaphore (only 1 worker processes at a time)
await semaphore.acquire()
work_item = await work_queue.get_work()
if work_item is None :
semaphore.release()
break
try :
# Process all PDFs in this work item
async with asyncio.TaskGroup() as tg:
dolma_tasks = [tg.create_task(process_pdf(args, worker_id, pdf)) for pdf in work_item.work_paths]
# Write results
output_path = os.path.join(args.workspace, "results" , f "output_ { work_item.hash } .jsonl" )
write_dolma_docs(output_path, dolma_docs)
await work_queue.mark_done(work_item)
finally :
semaphore.release()
Semaphore Control
The pipeline uses a semaphore to ensure optimal GPU utilization:
Initial state : Semaphore starts with count=1
Worker blocking : Only 1 worker can send requests at a time
Auto-release : When sglang queue is empty for 30s, semaphore releases
Saturation : Next worker starts sending requests to keep GPU busy
Output : First worker continues writing results while second worker saturates GPU
This design maximizes GPU utilization while allowing workers to output results as soon as possible.
Process Pools
CPU-bound operations run in separate process pools to avoid blocking:
# Global process pool (max 32 workers)
process_pool = ProcessPoolExecutor(
max_workers = min (multiprocessing.cpu_count() // 2 + 1 , 32 ),
mp_context = multiprocessing.get_context( "spawn" )
)
# Anchor text extraction runs in process pool
loop = asyncio.get_running_loop()
anchor_text = loop.run_in_executor(
process_pool,
partial(get_anchor_text, pdf_engine = "pdfreport" , target_length = target_anchor_text_len),
local_pdf_path,
page
)
get_anchor_text() is not thread-safe and must run in a process pool, not a thread pool.
Pipeline Parameters
Key configuration options from pipeline.py:889-931:
Filesystem path or S3 path for storing work queue and results
PDF paths to process (glob patterns supported)
Target pages per work item group
Maximum retries per page before falling back
Maximum allowable failed pages per document (1/250)
Number of concurrent workers
model
string
default: "allenai/olmOCR-7B-0225-preview"
Model path (local, HuggingFace, S3, or GCS)
Maximum context length for the model
Longest dimension for rendered PDF images
Maximum anchor text length in characters
Metrics and Monitoring
The pipeline tracks detailed metrics:
metrics = MetricsKeeper( window = 60 * 5 ) # 5-minute window
tracker = WorkerTracker() # Per-worker status
# Track token usage
metrics.add_metrics(
sglang_input_tokens = prompt_tokens,
sglang_output_tokens = completion_tokens,
finished_input_tokens = total_input,
finished_output_tokens = total_output
)
# Track worker status
await tracker.track_work(worker_id, f " { pdf_path } - { page_num } " , "started" )
await tracker.track_work(worker_id, f " { pdf_path } - { page_num } " , "finished" )
Metrics are logged every 10 seconds during processing.
Next Steps
Anchor Text Learn how anchor text improves extraction quality
CLI Reference Explore all pipeline command-line options
Work Queues API documentation for work queue classes
Cluster Usage Deploy olmOCR at scale