Overview
The metrics module provides tools for tracking performance metrics and monitoring worker states in distributed processing systems. It includes the MetricsKeeper for throughput tracking and WorkerTracker for monitoring worker status.
MetricsKeeper
Constructor
MetricsKeeper(window=60 * 5)
Initializes a metrics keeper that tracks cumulative and windowed metrics.
Time window in seconds for recent metrics calculation (default: 5 minutes)
Example:
from olmocr.metrics import MetricsKeeper
# Track metrics with 10-minute window
metrics = MetricsKeeper(window=600)
add_metrics
Adds metrics to the keeper with arbitrary metric names and values.
Arbitrary keyword arguments where keys are metric names and values are counts to add
Example:
metrics = MetricsKeeper()
# Add multiple metrics
metrics.add_metrics(
tokens_processed=1000,
pages_rendered=5,
api_calls=3
)
# Add more metrics later
metrics.add_metrics(
tokens_processed=2000,
errors=1
)
_str_
Returns a formatted table showing lifetime and recent throughput rates.
Returns: Formatted string with metrics table showing:
- Metric name
- Lifetime rate (total / elapsed time)
- Recent rate (window sum / window time)
Example:
metrics = MetricsKeeper(window=60)
metrics.add_metrics(tokens=1000)
time.sleep(1)
metrics.add_metrics(tokens=2000)
print(metrics)
# Output:
# Metric Name Lifetime (tokens/sec) Recently (tokens/sec)
# ---------------------------------------------------------------------------
# tokens 3000.00 3000.00
MetricsKeeper Attributes
window
Time window in seconds for recent metrics.
start_time
Timestamp when the MetricsKeeper was created.
total_metrics
total_metrics: defaultdict[str, int]
Cumulative metrics since the keeper was created.
window_sum
window_sum: defaultdict[str, int]
Sum of metrics within the current time window.
Complete MetricsKeeper Example
import time
from olmocr.metrics import MetricsKeeper
# Initialize with 2-minute window
metrics = MetricsKeeper(window=120)
# Simulate processing loop
for i in range(10):
# Process some data
tokens_processed = 1000 * (i + 1)
pages_completed = 5
# Record metrics
metrics.add_metrics(
tokens=tokens_processed,
pages=pages_completed,
batches=1
)
time.sleep(0.5)
# Print status every few iterations
if i % 3 == 0:
print(metrics)
print("\n")
# Final report
print("\nFinal Metrics:")
print(metrics)
WorkerTracker
Constructor
Initializes a worker tracker for monitoring worker states in async environments.
Example:
from olmocr.metrics import WorkerTracker
tracker = WorkerTracker()
track_work
async track_work(worker_id: int, work_item_id: str, state: str)
Updates the state count for a specific worker.
Unique identifier of the work item (for reference, not used in current implementation)
State name to increment (e.g., “processing”, “completed”, “failed”)
Example:
import asyncio
from olmocr.metrics import WorkerTracker
tracker = WorkerTracker()
async def process_item(worker_id, item_id):
await tracker.track_work(worker_id, item_id, "started")
# ... do work ...
await tracker.track_work(worker_id, item_id, "completed")
await process_item(1, "item-123")
clear_work
async clear_work(worker_id: int)
Clears all state counts for a specific worker.
The ID of the worker to clear
Example:
# Reset worker 1's metrics
await tracker.clear_work(1)
get_status_table
async get_status_table() -> str
Generates a formatted table showing the current status of all workers.
Returns: String containing a formatted table with:
- Worker IDs as rows
- State names as columns
- Counts for each worker/state combination
Example:
status = await tracker.get_status_table()
print(status)
# Output:
# Worker ID | completed | failed | processing | started
# ---------+-----------+--------+------------+--------
# 1 | 15 | 2 | 1 | 18
# 2 | 20 | 0 | 0 | 20
# 3 | 12 | 1 | 2 | 15
Complete WorkerTracker Example
import asyncio
from olmocr.metrics import WorkerTracker
async def worker(worker_id: int, tracker: WorkerTracker, num_items: int):
"""Simulate a worker processing multiple items."""
for i in range(num_items):
item_id = f"worker-{worker_id}-item-{i}"
# Track start
await tracker.track_work(worker_id, item_id, "started")
# Simulate processing
await asyncio.sleep(0.1)
# Track completion or failure (random)
import random
if random.random() > 0.1: # 90% success rate
await tracker.track_work(worker_id, item_id, "completed")
else:
await tracker.track_work(worker_id, item_id, "failed")
async def main():
tracker = WorkerTracker()
# Launch multiple workers
workers = [
worker(1, tracker, 10),
worker(2, tracker, 15),
worker(3, tracker, 12),
]
# Run all workers concurrently
await asyncio.gather(*workers)
# Print final status
print("\nWorker Status:")
status = await tracker.get_status_table()
print(status)
# Run the example
asyncio.run(main())
WorkerTracker Attributes
worker_status
worker_status: Dict[int, Dict[str, int]]
Mapping from worker ID to a dictionary of state counts.
lock
Asyncio lock ensuring thread-safe access to worker status.
Combining MetricsKeeper and WorkerTracker
import asyncio
import time
from olmocr.metrics import MetricsKeeper, WorkerTracker
async def monitoring_loop(metrics: MetricsKeeper, tracker: WorkerTracker):
"""Periodically print metrics and worker status."""
while True:
await asyncio.sleep(5)
print("\n" + "="*80)
print("METRICS:")
print(metrics)
print("\nWORKER STATUS:")
print(await tracker.get_status_table())
print("="*80 + "\n")
async def worker_with_metrics(
worker_id: int,
tracker: WorkerTracker,
metrics: MetricsKeeper,
num_items: int
):
"""Worker that tracks both state and metrics."""
for i in range(num_items):
item_id = f"worker-{worker_id}-item-{i}"
await tracker.track_work(worker_id, item_id, "started")
# Simulate processing
tokens = 1000
await asyncio.sleep(0.1)
# Record metrics
metrics.add_metrics(
tokens_processed=tokens,
items_completed=1
)
await tracker.track_work(worker_id, item_id, "completed")
async def main():
metrics = MetricsKeeper(window=60)
tracker = WorkerTracker()
# Start monitoring
monitor = asyncio.create_task(monitoring_loop(metrics, tracker))
# Launch workers
workers = [
worker_with_metrics(i, tracker, metrics, 20)
for i in range(1, 5)
]
await asyncio.gather(*workers)
monitor.cancel()
# Final report
print("\nFINAL REPORT:")
print(metrics)
print("\n" + await tracker.get_status_table())
asyncio.run(main())