Skip to main content

Overview

The metrics module provides tools for tracking performance metrics and monitoring worker states in distributed processing systems. It includes the MetricsKeeper for throughput tracking and WorkerTracker for monitoring worker status.

MetricsKeeper

Constructor

MetricsKeeper(window=60 * 5)
Initializes a metrics keeper that tracks cumulative and windowed metrics.
window
int
default:"300"
Time window in seconds for recent metrics calculation (default: 5 minutes)
Example:
from olmocr.metrics import MetricsKeeper

# Track metrics with 10-minute window
metrics = MetricsKeeper(window=600)

add_metrics

add_metrics(**kwargs)
Adds metrics to the keeper with arbitrary metric names and values.
**kwargs
dict
required
Arbitrary keyword arguments where keys are metric names and values are counts to add
Example:
metrics = MetricsKeeper()

# Add multiple metrics
metrics.add_metrics(
    tokens_processed=1000,
    pages_rendered=5,
    api_calls=3
)

# Add more metrics later
metrics.add_metrics(
    tokens_processed=2000,
    errors=1
)

_str_

__str__() -> str
Returns a formatted table showing lifetime and recent throughput rates. Returns: Formatted string with metrics table showing:
  • Metric name
  • Lifetime rate (total / elapsed time)
  • Recent rate (window sum / window time)
Example:
metrics = MetricsKeeper(window=60)
metrics.add_metrics(tokens=1000)
time.sleep(1)
metrics.add_metrics(tokens=2000)

print(metrics)
# Output:
# Metric Name                 Lifetime (tokens/sec)    Recently (tokens/sec)
# ---------------------------------------------------------------------------
# tokens                                    3000.00                   3000.00

MetricsKeeper Attributes

window

window: int
Time window in seconds for recent metrics.

start_time

start_time: float
Timestamp when the MetricsKeeper was created.

total_metrics

total_metrics: defaultdict[str, int]
Cumulative metrics since the keeper was created.

window_sum

window_sum: defaultdict[str, int]
Sum of metrics within the current time window.

Complete MetricsKeeper Example

import time
from olmocr.metrics import MetricsKeeper

# Initialize with 2-minute window
metrics = MetricsKeeper(window=120)

# Simulate processing loop
for i in range(10):
    # Process some data
    tokens_processed = 1000 * (i + 1)
    pages_completed = 5
    
    # Record metrics
    metrics.add_metrics(
        tokens=tokens_processed,
        pages=pages_completed,
        batches=1
    )
    
    time.sleep(0.5)
    
    # Print status every few iterations
    if i % 3 == 0:
        print(metrics)
        print("\n")

# Final report
print("\nFinal Metrics:")
print(metrics)

WorkerTracker

Constructor

WorkerTracker()
Initializes a worker tracker for monitoring worker states in async environments. Example:
from olmocr.metrics import WorkerTracker

tracker = WorkerTracker()

track_work

async track_work(worker_id: int, work_item_id: str, state: str)
Updates the state count for a specific worker.
worker_id
int
required
The ID of the worker
work_item_id
str
required
Unique identifier of the work item (for reference, not used in current implementation)
state
str
required
State name to increment (e.g., “processing”, “completed”, “failed”)
Example:
import asyncio
from olmocr.metrics import WorkerTracker

tracker = WorkerTracker()

async def process_item(worker_id, item_id):
    await tracker.track_work(worker_id, item_id, "started")
    # ... do work ...
    await tracker.track_work(worker_id, item_id, "completed")

await process_item(1, "item-123")

clear_work

async clear_work(worker_id: int)
Clears all state counts for a specific worker.
worker_id
int
required
The ID of the worker to clear
Example:
# Reset worker 1's metrics
await tracker.clear_work(1)

get_status_table

async get_status_table() -> str
Generates a formatted table showing the current status of all workers. Returns: String containing a formatted table with:
  • Worker IDs as rows
  • State names as columns
  • Counts for each worker/state combination
Example:
status = await tracker.get_status_table()
print(status)
# Output:
# Worker ID | completed | failed | processing | started
# ---------+-----------+--------+------------+--------
# 1         | 15        | 2      | 1          | 18
# 2         | 20        | 0      | 0          | 20
# 3         | 12        | 1      | 2          | 15

Complete WorkerTracker Example

import asyncio
from olmocr.metrics import WorkerTracker

async def worker(worker_id: int, tracker: WorkerTracker, num_items: int):
    """Simulate a worker processing multiple items."""
    for i in range(num_items):
        item_id = f"worker-{worker_id}-item-{i}"
        
        # Track start
        await tracker.track_work(worker_id, item_id, "started")
        
        # Simulate processing
        await asyncio.sleep(0.1)
        
        # Track completion or failure (random)
        import random
        if random.random() > 0.1:  # 90% success rate
            await tracker.track_work(worker_id, item_id, "completed")
        else:
            await tracker.track_work(worker_id, item_id, "failed")

async def main():
    tracker = WorkerTracker()
    
    # Launch multiple workers
    workers = [
        worker(1, tracker, 10),
        worker(2, tracker, 15),
        worker(3, tracker, 12),
    ]
    
    # Run all workers concurrently
    await asyncio.gather(*workers)
    
    # Print final status
    print("\nWorker Status:")
    status = await tracker.get_status_table()
    print(status)

# Run the example
asyncio.run(main())

WorkerTracker Attributes

worker_status

worker_status: Dict[int, Dict[str, int]]
Mapping from worker ID to a dictionary of state counts.

lock

lock: asyncio.Lock
Asyncio lock ensuring thread-safe access to worker status.

Combining MetricsKeeper and WorkerTracker

import asyncio
import time
from olmocr.metrics import MetricsKeeper, WorkerTracker

async def monitoring_loop(metrics: MetricsKeeper, tracker: WorkerTracker):
    """Periodically print metrics and worker status."""
    while True:
        await asyncio.sleep(5)
        
        print("\n" + "="*80)
        print("METRICS:")
        print(metrics)
        print("\nWORKER STATUS:")
        print(await tracker.get_status_table())
        print("="*80 + "\n")

async def worker_with_metrics(
    worker_id: int,
    tracker: WorkerTracker,
    metrics: MetricsKeeper,
    num_items: int
):
    """Worker that tracks both state and metrics."""
    for i in range(num_items):
        item_id = f"worker-{worker_id}-item-{i}"
        
        await tracker.track_work(worker_id, item_id, "started")
        
        # Simulate processing
        tokens = 1000
        await asyncio.sleep(0.1)
        
        # Record metrics
        metrics.add_metrics(
            tokens_processed=tokens,
            items_completed=1
        )
        
        await tracker.track_work(worker_id, item_id, "completed")

async def main():
    metrics = MetricsKeeper(window=60)
    tracker = WorkerTracker()
    
    # Start monitoring
    monitor = asyncio.create_task(monitoring_loop(metrics, tracker))
    
    # Launch workers
    workers = [
        worker_with_metrics(i, tracker, metrics, 20)
        for i in range(1, 5)
    ]
    
    await asyncio.gather(*workers)
    monitor.cancel()
    
    # Final report
    print("\nFINAL REPORT:")
    print(metrics)
    print("\n" + await tracker.get_status_table())

asyncio.run(main())

Build docs developers (and LLMs) love