olmocr.work_queue module provides a flexible work queue system that supports both local and S3-backed distributed processing.
Overview
The work queue system enables:- Distribution of PDF processing work across multiple workers
- Resumable processing with persistent state
- Lock-based coordination to prevent duplicate work
- Support for both local single-machine and distributed multi-machine setups
Architecture
Core Classes
WorkItem
Represents a single unit of work in the queue.SHA1 hash computed from the sorted list of work paths. Used as a unique identifier for the work item.
List of PDF paths (S3 or local) to process together as a batch.
WorkQueue (Abstract Base Class)
Defines the interface that all work queue implementations must follow.Methods
populate_queue
Adds new PDF paths to the work queue.List of individual PDF paths to add to the queue. These will be grouped into work items.
Number of PDF paths to group together in a single WorkItem. Allows batching for efficiency.
- Computes hash for each group of paths
- Skips paths already in the index
- Writes updated index to storage (zstd-compressed CSV)
initialize_queue
Loads the work queue from persistent storage and prepares it for processing. Behavior:- Reads work index from storage
- Identifies completed work items
- Filters out completed items
- Randomizes remaining work order
- Populates internal queue
is_completed
Checks if a work item has been completed.Hash identifier of the work item to check.
True if output file exists, False otherwise.
get_work
Retrieves the next available work item.Number of seconds before considering a worker lock stale and reclaiming the work.Default:
1800 (30 minutes)WorkItem if work is available, None if queue is empty.
Behavior:
- Pulls next item from internal queue
- Checks if already completed (skips if yes)
- Checks for active worker lock
- Creates new lock if work is available
- Retries next item if current is locked/completed
mark_done
Marks a work item as complete.The work item to mark as done.
- Removes worker lock file
- Marks internal queue task as done
size
Returns: Current number of items remaining in the queue.Static Methods
_compute_workgroup_hash
Computes a deterministic hash for a group of paths.List of paths to hash.
LocalWorkQueue
Local filesystem-based work queue implementation for single-machine processing.Constructor
Local directory path where the queue index, results, and locks are stored.
File Structure
Example Usage
S3WorkQueue
S3-backed work queue for distributed multi-machine processing.Constructor
Boto3 S3 client instance for S3 operations.
S3 path (e.g.,
s3://bucket/prefix/) where queue index, results, and locks are stored.S3 Structure
Work Distribution Mechanism
- Initialization: Each worker loads the full work index from S3
- Randomization: Work items are shuffled independently on each worker
- Lock-based coordination: Workers attempt to acquire locks when pulling work
- Stale lock recovery: Locks older than timeout are reclaimed
- Completion tracking: Output files serve as completion markers
Example Usage
Multi-Worker Pattern
Work Index Format
The work index is stored as a zstd-compressed CSV file:- First column: SHA1 hash of the sorted paths
- Remaining columns: PDF paths in the work group
Lock Mechanism
Worker locks prevent duplicate processing:- Worker calls
get_work() - Queue creates empty lock file:
worker_locks/output_<hash>.jsonl - Lock file timestamp tracks when work started
- Other workers skip locked items (unless stale)
- Worker calls
mark_done()to remove lock - Completed output file:
results/output_<hash>.jsonlserves as permanent completion marker
Error Recovery
Stale Locks
If a worker crashes, its locks become stale afterworker_lock_timeout_secs. Other workers can reclaim this work.
Resumable Processing
The queue system is fully resumable:- Completed work items are never re-processed
- Active locks prevent duplicate work
- Workers can join/leave at any time
- Progress persists across restarts
Performance Considerations
Batching
Grouping multiple PDFs per work item (items_per_group) provides several benefits:
- Reduces lock file overhead
- Amortizes S3 API call costs
- Enables better load balancing
Randomization
Shuffling work items on each worker:- Reduces lock contention
- Improves parallel efficiency
- Distributes large documents across workers
Optimal Group Size
The pipeline estimates optimalitems_per_group based on:
- Target pages per group (default: 500)
- Average pages per PDF (sampled from input)
Related
- Pipeline Module - Main processing pipeline
- Rendering API - PDF rendering functions