Overview
The s3_utils module provides utilities for working with cloud storage systems including Amazon S3, Google Cloud Storage (GCS), and Weka. It handles path parsing, file downloads, uploads, and glob pattern expansion.
Path Utilities
parse_s3_path
parse_s3_path(s3_path: str) -> tuple[str, str]
Parses an S3-style path into bucket and key components.
Storage path starting with s3://, gs://, or weka://
Returns: Tuple of (bucket_name, key_path)
Raises: ValueError if path doesn’t start with a supported prefix
Example:
from olmocr.s3_utils import parse_s3_path
bucket, key = parse_s3_path("s3://my-bucket/data/file.pdf")
print(f"Bucket: {bucket}") # "my-bucket"
print(f"Key: {key}") # "data/file.pdf"
expand_s3_glob
expand_s3_glob(s3_client, s3_glob: str) -> dict[str, str]
Expands an S3 path that may contain wildcards (e.g., *.pdf).
S3 path with optional wildcard patterns (*, ?, [, ])
Returns: Dictionary mapping s3://bucket/key to ETag for each matching object
Raises:
ValueError if path doesn’t start with s3://
ValueError if no objects are found
ValueError if a bare prefix is provided instead of a file or wildcard
Example:
import boto3
from olmocr.s3_utils import expand_s3_glob
s3_client = boto3.client("s3")
# Match all PDF files in a directory
files = expand_s3_glob(s3_client, "s3://my-bucket/pdfs/*.pdf")
for s3_path, etag in files.items():
print(f"{s3_path} (ETag: {etag})")
# Match a single file
file = expand_s3_glob(s3_client, "s3://my-bucket/data/document.pdf")
File Operations
get_s3_bytes
get_s3_bytes(
s3_client,
s3_path: str,
start_index: Optional[int] = None,
end_index: Optional[int] = None
) -> bytes
Retrieves bytes from an S3 object or local file. Supports range queries for partial downloads.
S3 path (s3://bucket/key) or local file path
Starting byte position for range query
Ending byte position for range query
Returns: Bytes content from the file
Example:
import boto3
from olmocr.s3_utils import get_s3_bytes
s3_client = boto3.client("s3")
# Download entire file
data = get_s3_bytes(s3_client, "s3://bucket/file.pdf")
# Download first 1000 bytes
header = get_s3_bytes(s3_client, "s3://bucket/file.pdf", start_index=0, end_index=999)
# Download from byte 1000 to end
tail = get_s3_bytes(s3_client, "s3://bucket/file.pdf", start_index=1000)
get_s3_bytes_with_backoff
get_s3_bytes_with_backoff(
s3_client,
pdf_s3_path: str,
max_retries: int = 8,
backoff_factor: int = 2
) -> bytes
Retrieves S3 bytes with exponential backoff retry logic.
Maximum number of retry attempts
Exponential backoff multiplier
Returns: Bytes content from the file
Raises:
ClientError for AccessDenied or NoSuchKey errors (no retry)
Exception after max retries exhausted
Example:
import boto3
from olmocr.s3_utils import get_s3_bytes_with_backoff
s3_client = boto3.client("s3")
# Robust download with automatic retries
try:
data = get_s3_bytes_with_backoff(
s3_client,
"s3://bucket/large-file.pdf",
max_retries=10,
backoff_factor=3
)
except Exception as e:
print(f"Failed after retries: {e}")
put_s3_bytes
put_s3_bytes(s3_client, s3_path: str, data: bytes)
Uploads bytes to an S3 object.
S3 destination path (s3://bucket/key)
Example:
import boto3
from olmocr.s3_utils import put_s3_bytes
s3_client = boto3.client("s3")
data = b"Hello, World!"
put_s3_bytes(s3_client, "s3://bucket/output.txt", data)
Compression Utilities
download_zstd_csv
download_zstd_csv(s3_client, s3_path: str) -> list[str]
Downloads and decompresses a Zstandard-compressed CSV file from S3.
S3 path to the .zstd file
Returns: List of decompressed lines from the CSV
Example:
import boto3
from olmocr.s3_utils import download_zstd_csv
s3_client = boto3.client("s3")
lines = download_zstd_csv(s3_client, "s3://bucket/data.csv.zstd")
for line in lines:
print(line)
upload_zstd_csv
upload_zstd_csv(s3_client, s3_path: str, lines: list[str])
Compresses and uploads a list of lines as a Zstandard-compressed CSV to S3.
S3 destination path for the .zstd file
List of text lines to compress and upload
Example:
import boto3
from olmocr.s3_utils import upload_zstd_csv
s3_client = boto3.client("s3")
lines = ["header1,header2", "value1,value2", "value3,value4"]
upload_zstd_csv(s3_client, "s3://bucket/output.csv.zstd", lines)
Directory Operations
download_directory
download_directory(model_choices: List[str], local_dir: str)
Downloads a directory from cloud storage to local filesystem. Attempts each source in order until successful.
List of storage paths to try (weka://, gs://, or s3://). Weka links are prioritized.
Local directory path where files will be downloaded
Raises: ValueError if all download attempts fail
Example:
from olmocr.s3_utils import download_directory
# Try multiple sources in order of preference
model_paths = [
"weka://models/my-model",
"gs://backup-bucket/my-model",
"s3://archive-bucket/my-model"
]
download_directory(model_paths, "./local-models")
download_dir_from_storage
download_dir_from_storage(
storage_path: str,
local_dir: str,
storage_type: str
)
Downloads files from cloud storage with MD5 hash-based syncing.
Cloud storage path (weka://, gs://, or s3://)
Local directory for downloaded files
Type of storage: "weka", "gcs", or "s3"
Raises: ValueError for unsupported storage type or missing credentials
Features:
- Parallel downloads using ThreadPoolExecutor
- MD5 hash comparison to skip unchanged files
- Progress bar with tqdm
- Automatic retry with configurable settings
Utility Functions
parse_custom_id
parse_custom_id(custom_id: str) -> tuple[str, int]
Parses a custom ID string into S3 path and page number.
Custom ID in format s3://bucket/file-{page_number}
Returns: Tuple of (s3_path, page_number)
Example:
from olmocr.s3_utils import parse_custom_id
s3_path, page = parse_custom_id("s3://bucket/doc.pdf-5")
print(f"Path: {s3_path}, Page: {page}") # "s3://bucket/doc.pdf", 5
is_running_on_gcp
is_running_on_gcp() -> bool
Checks if the code is running on a Google Cloud Platform instance.
Returns: True if running on GCP, False otherwise
Example:
from olmocr.s3_utils import is_running_on_gcp
if is_running_on_gcp():
print("Running on GCP - using optimized settings")
else:
print("Running locally or on other cloud")
Environment Variables
For Weka storage access, set these environment variables:
WEKA_ACCESS_KEY_ID - Access key for Weka authentication
WEKA_SECRET_ACCESS_KEY - Secret key for Weka authentication
For Beaker/Pluto nodes, Weka is automatically disabled based on:
BEAKER_NODE_HOSTNAME - Hostname used to detect special environments