The Collector API provides complete visibility into BAML function execution, capturing raw HTTP requests, responses, token usage, timing data, and more - without any abstraction layers.
Collectors were added in BAML version 0.79.0
Quick Start
Create a collector and pass it to any BAML function:
from baml_client import b
from baml_py import Collector
# Create collector
collector = Collector(name="my-collector")
# Use with function
result = await b.ExtractResume("...", baml_options={"collector": collector})
# Access metrics
print(collector.last.usage) # Token usage
print(collector.last.raw_llm_response) # Raw LLM output
print(collector.last.calls[-1].http_response) # HTTP response
Use Cases
Usage Tracking and Cost Monitoring
Track token usage across multiple function calls:
from baml_client import b
from baml_py import Collector
async def process_batch(documents: list[str]):
collector = Collector(name="batch-processor")
results = []
for doc in documents:
result = await b.ExtractResume(doc, baml_options={"collector": collector})
results.append(result)
# Total usage across all calls
print(f"Total input tokens: {collector.usage.input_tokens}")
print(f"Total output tokens: {collector.usage.output_tokens}")
# Calculate cost (example rates)
input_cost = collector.usage.input_tokens * 0.000003 # $0.003/1K tokens
output_cost = collector.usage.output_tokens * 0.000015 # $0.015/1K tokens
total_cost = input_cost + output_cost
print(f"Total cost: ${total_cost:.4f}")
return results
Debugging and Error Analysis
Inspect raw requests and responses for debugging:
from baml_client import b
from baml_py import Collector
import json
async def debug_extraction(text: str):
collector = Collector(name="debug")
try:
result = await b.ExtractResume(text, baml_options={"collector": collector})
return result
except Exception as e:
# Inspect what went wrong
last_call = collector.last.calls[-1]
print("Request URL:", last_call.http_request.url)
print("Request body:", last_call.http_request.body.json())
if last_call.http_response:
print("Response status:", last_call.http_response.status)
print("Response body:", last_call.http_response.body.text())
raise
Track latency and identify slow requests:
from baml_client import b
from baml_py import Collector
async def monitor_performance(inputs: list[str]):
collector = Collector(name="perf-monitor")
for input_text in inputs:
await b.ExtractResume(input_text, baml_options={"collector": collector})
# Analyze timing for each call
for log in collector.logs:
duration = log.timing.duration_ms
print(f"Call {log.id}: {duration}ms")
# Check individual LLM calls (including retries)
for call in log.calls:
print(f" - {call.provider}: {call.timing.duration_ms}ms")
# Calculate average latency
total_duration = sum(log.timing.duration_ms for log in collector.logs)
avg_duration = total_duration / len(collector.logs)
print(f"Average latency: {avg_duration:.2f}ms")
Prompt Caching Analysis
Track cached token usage with providers that support prompt caching (Anthropic, OpenAI, Google, Vertex):
from baml_client import b
from baml_py import Collector
async def analyze_caching(large_content: str):
collector = Collector(name="cache-tracker")
# First call - content cached
await b.AnalyzeDocument(large_content, "Question 1", baml_options={"collector": collector})
# Second call - uses cached content
await b.AnalyzeDocument(large_content, "Question 2", baml_options={"collector": collector})
# Compare cache usage
first_log = collector.logs[0]
second_log = collector.logs[1]
print(f"First call:")
print(f" Input tokens: {first_log.usage.input_tokens}")
print(f" Cached tokens: {first_log.usage.cached_input_tokens or 0}")
print(f"Second call:")
print(f" Input tokens: {second_log.usage.input_tokens}")
print(f" Cached tokens: {second_log.usage.cached_input_tokens or 0}")
# Calculate cache savings
cache_hit_rate = (
(second_log.usage.cached_input_tokens or 0) /
(second_log.usage.input_tokens or 1)
) * 100
print(f"Cache hit rate: {cache_hit_rate:.1f}%")
Cached token tracking is supported for Anthropic, OpenAI, Google AI, and Vertex AI. AWS Bedrock does not currently report cached tokens.
Multiple Collectors for Different Metrics
Use separate collectors to track different aspects:
from baml_client import b
from baml_py import Collector
async def multi_collector_tracking():
# Separate collectors for different concerns
usage_collector = Collector(name="usage-tracking")
debug_collector = Collector(name="debug-logs")
# Use both collectors for the same call
result = await b.ExtractResume(
"...",
baml_options={"collector": [usage_collector, debug_collector]}
)
# Both have the same data
assert usage_collector.last.usage.input_tokens == debug_collector.last.usage.input_tokens
# Use only usage_collector for subsequent calls
await b.ExtractResume("...", baml_options={"collector": usage_collector})
# usage_collector has 2 logs, debug_collector still has 1
print(f"Usage logs: {len(usage_collector.logs)}")
print(f"Debug logs: {len(debug_collector.logs)}")
Retry and Fallback Analysis
Inspect retry attempts and fallback behavior:
from baml_client import b
from baml_py import Collector
async def analyze_retries():
collector = Collector(name="retry-analysis")
result = await b.ExtractResume("...", baml_options={"collector": collector})
log = collector.last
print(f"Total LLM calls made: {len(log.calls)}")
for i, call in enumerate(log.calls):
print(f"\nAttempt {i + 1}:")
print(f" Provider: {call.provider}")
print(f" Client: {call.client_name}")
print(f" Selected: {call.selected}")
print(f" Duration: {call.timing.duration_ms}ms")
if call.http_response:
print(f" Status: {call.http_response.status}")
if call.usage:
print(f" Tokens: {call.usage.input_tokens} in / {call.usage.output_tokens} out")
Collector API
Collector Class
| Property | Type | Description |
|---|
logs | FunctionLog[] | All function calls (oldest to newest) |
last | FunctionLog | null | Most recent function log |
usage | Usage | Cumulative usage across all calls |
Methods:
| Method | Returns | Description |
|---|
id(id: string) | FunctionLog | null | Get log by ID |
clear() | void | Clear all logs |
FunctionLog
| Property | Type | Description |
|---|
id | string | Request ID |
function_name | string | Function name |
log_type | "call" | "stream" | Call type |
timing | Timing | Request timing |
usage | Usage | Aggregated usage |
calls | LLMCall[] | All LLM calls (retries + fallbacks) |
raw_llm_response | string | null | Raw response text |
tags | Map<string, any> | User metadata |
Usage
| Property | Type | Description |
|---|
input_tokens | int | null | Input tokens used |
output_tokens | int | null | Output tokens used |
cached_input_tokens | int | null | Cached input tokens |
Usage may not include all provider-specific token types (e.g., “thinking_tokens”, “cache_creation_input_tokens”). For those, inspect the raw HTTP response.
Timing
| Property | Type | Description |
|---|
start_time_utc_ms | int | Start time (epoch ms) |
duration_ms | int | null | Duration in milliseconds |
For streaming calls, StreamTiming extends this with:
| Property | Type | Description |
|---|
time_to_first_token_ms | int | null | Time to first token |
LLMCall
| Property | Type | Description |
|---|
client_name | string | Client name |
provider | string | Provider name |
timing | Timing | Call timing |
http_request | HttpRequest | Raw HTTP request |
http_response | HttpResponse | null | Raw HTTP response |
usage | Usage | null | Call usage |
selected | bool | If this call was used |
HttpRequest / HttpResponse
HttpRequest:
url: Request URL
method: HTTP method
headers: Request headers
body: HTTPBody with .text() and .json() methods
HttpResponse:
status: HTTP status code
headers: Response headers
body: HTTPBody with .text() and .json() methods
Best Practices
- Single Collector for Related Calls: Use one collector instance for logically related function calls
- Clear When Needed: Call
collector.clear() to reset state when starting a new logical grouping
- Multiple Collectors: Use separate collectors for different tracking purposes (usage vs. debugging)
- Error Handling: Always check for
collector.last being null before accessing
- Cost Tracking: Combine with ClientRegistry to track per-model costs accurately