Skip to main content

Overview

The s3_utils module provides utilities for working with cloud storage systems including Amazon S3, Google Cloud Storage (GCS), and Weka. It handles path parsing, file downloads, uploads, and glob pattern expansion.

Path Utilities

parse_s3_path

parse_s3_path(s3_path: str) -> tuple[str, str]
Parses an S3-style path into bucket and key components.
s3_path
str
required
Storage path starting with s3://, gs://, or weka://
Returns: Tuple of (bucket_name, key_path) Raises: ValueError if path doesn’t start with a supported prefix Example:
from olmocr.s3_utils import parse_s3_path

bucket, key = parse_s3_path("s3://my-bucket/data/file.pdf")
print(f"Bucket: {bucket}")  # "my-bucket"
print(f"Key: {key}")        # "data/file.pdf"

expand_s3_glob

expand_s3_glob(s3_client, s3_glob: str) -> dict[str, str]
Expands an S3 path that may contain wildcards (e.g., *.pdf).
s3_client
boto3.client
required
Boto3 S3 client instance
s3_glob
str
required
S3 path with optional wildcard patterns (*, ?, [, ])
Returns: Dictionary mapping s3://bucket/key to ETag for each matching object Raises:
  • ValueError if path doesn’t start with s3://
  • ValueError if no objects are found
  • ValueError if a bare prefix is provided instead of a file or wildcard
Example:
import boto3
from olmocr.s3_utils import expand_s3_glob

s3_client = boto3.client("s3")

# Match all PDF files in a directory
files = expand_s3_glob(s3_client, "s3://my-bucket/pdfs/*.pdf")
for s3_path, etag in files.items():
    print(f"{s3_path} (ETag: {etag})")

# Match a single file
file = expand_s3_glob(s3_client, "s3://my-bucket/data/document.pdf")

File Operations

get_s3_bytes

get_s3_bytes(
    s3_client,
    s3_path: str,
    start_index: Optional[int] = None,
    end_index: Optional[int] = None
) -> bytes
Retrieves bytes from an S3 object or local file. Supports range queries for partial downloads.
s3_client
boto3.client
required
Boto3 S3 client instance
s3_path
str
required
S3 path (s3://bucket/key) or local file path
start_index
int
Starting byte position for range query
end_index
int
Ending byte position for range query
Returns: Bytes content from the file Example:
import boto3
from olmocr.s3_utils import get_s3_bytes

s3_client = boto3.client("s3")

# Download entire file
data = get_s3_bytes(s3_client, "s3://bucket/file.pdf")

# Download first 1000 bytes
header = get_s3_bytes(s3_client, "s3://bucket/file.pdf", start_index=0, end_index=999)

# Download from byte 1000 to end
tail = get_s3_bytes(s3_client, "s3://bucket/file.pdf", start_index=1000)

get_s3_bytes_with_backoff

get_s3_bytes_with_backoff(
    s3_client,
    pdf_s3_path: str,
    max_retries: int = 8,
    backoff_factor: int = 2
) -> bytes
Retrieves S3 bytes with exponential backoff retry logic.
s3_client
boto3.client
required
Boto3 S3 client instance
pdf_s3_path
str
required
S3 path to download
max_retries
int
default:"8"
Maximum number of retry attempts
backoff_factor
int
default:"2"
Exponential backoff multiplier
Returns: Bytes content from the file Raises:
  • ClientError for AccessDenied or NoSuchKey errors (no retry)
  • Exception after max retries exhausted
Example:
import boto3
from olmocr.s3_utils import get_s3_bytes_with_backoff

s3_client = boto3.client("s3")

# Robust download with automatic retries
try:
    data = get_s3_bytes_with_backoff(
        s3_client,
        "s3://bucket/large-file.pdf",
        max_retries=10,
        backoff_factor=3
    )
except Exception as e:
    print(f"Failed after retries: {e}")

put_s3_bytes

put_s3_bytes(s3_client, s3_path: str, data: bytes)
Uploads bytes to an S3 object.
s3_client
boto3.client
required
Boto3 S3 client instance
s3_path
str
required
S3 destination path (s3://bucket/key)
data
bytes
required
Bytes to upload
Example:
import boto3
from olmocr.s3_utils import put_s3_bytes

s3_client = boto3.client("s3")
data = b"Hello, World!"
put_s3_bytes(s3_client, "s3://bucket/output.txt", data)

Compression Utilities

download_zstd_csv

download_zstd_csv(s3_client, s3_path: str) -> list[str]
Downloads and decompresses a Zstandard-compressed CSV file from S3.
s3_client
boto3.client
required
Boto3 S3 client instance
s3_path
str
required
S3 path to the .zstd file
Returns: List of decompressed lines from the CSV Example:
import boto3
from olmocr.s3_utils import download_zstd_csv

s3_client = boto3.client("s3")
lines = download_zstd_csv(s3_client, "s3://bucket/data.csv.zstd")
for line in lines:
    print(line)

upload_zstd_csv

upload_zstd_csv(s3_client, s3_path: str, lines: list[str])
Compresses and uploads a list of lines as a Zstandard-compressed CSV to S3.
s3_client
boto3.client
required
Boto3 S3 client instance
s3_path
str
required
S3 destination path for the .zstd file
lines
list[str]
required
List of text lines to compress and upload
Example:
import boto3
from olmocr.s3_utils import upload_zstd_csv

s3_client = boto3.client("s3")
lines = ["header1,header2", "value1,value2", "value3,value4"]
upload_zstd_csv(s3_client, "s3://bucket/output.csv.zstd", lines)

Directory Operations

download_directory

download_directory(model_choices: List[str], local_dir: str)
Downloads a directory from cloud storage to local filesystem. Attempts each source in order until successful.
model_choices
list[str]
required
List of storage paths to try (weka://, gs://, or s3://). Weka links are prioritized.
local_dir
str
required
Local directory path where files will be downloaded
Raises: ValueError if all download attempts fail Example:
from olmocr.s3_utils import download_directory

# Try multiple sources in order of preference
model_paths = [
    "weka://models/my-model",
    "gs://backup-bucket/my-model",
    "s3://archive-bucket/my-model"
]

download_directory(model_paths, "./local-models")

download_dir_from_storage

download_dir_from_storage(
    storage_path: str,
    local_dir: str,
    storage_type: str
)
Downloads files from cloud storage with MD5 hash-based syncing.
storage_path
str
required
Cloud storage path (weka://, gs://, or s3://)
local_dir
str
required
Local directory for downloaded files
storage_type
str
required
Type of storage: "weka", "gcs", or "s3"
Raises: ValueError for unsupported storage type or missing credentials Features:
  • Parallel downloads using ThreadPoolExecutor
  • MD5 hash comparison to skip unchanged files
  • Progress bar with tqdm
  • Automatic retry with configurable settings

Utility Functions

parse_custom_id

parse_custom_id(custom_id: str) -> tuple[str, int]
Parses a custom ID string into S3 path and page number.
custom_id
str
required
Custom ID in format s3://bucket/file-{page_number}
Returns: Tuple of (s3_path, page_number) Example:
from olmocr.s3_utils import parse_custom_id

s3_path, page = parse_custom_id("s3://bucket/doc.pdf-5")
print(f"Path: {s3_path}, Page: {page}")  # "s3://bucket/doc.pdf", 5

is_running_on_gcp

is_running_on_gcp() -> bool
Checks if the code is running on a Google Cloud Platform instance. Returns: True if running on GCP, False otherwise Example:
from olmocr.s3_utils import is_running_on_gcp

if is_running_on_gcp():
    print("Running on GCP - using optimized settings")
else:
    print("Running locally or on other cloud")

Environment Variables

For Weka storage access, set these environment variables:
  • WEKA_ACCESS_KEY_ID - Access key for Weka authentication
  • WEKA_SECRET_ACCESS_KEY - Secret key for Weka authentication
For Beaker/Pluto nodes, Weka is automatically disabled based on:
  • BEAKER_NODE_HOSTNAME - Hostname used to detect special environments

Build docs developers (and LLMs) love