Skip to main content
The Collector API provides complete visibility into BAML function execution, capturing raw HTTP requests, responses, token usage, timing data, and more - without any abstraction layers.
Collectors were added in BAML version 0.79.0

Quick Start

Create a collector and pass it to any BAML function:
from baml_client import b
from baml_py import Collector

# Create collector
collector = Collector(name="my-collector")

# Use with function
result = await b.ExtractResume("...", baml_options={"collector": collector})

# Access metrics
print(collector.last.usage)  # Token usage
print(collector.last.raw_llm_response)  # Raw LLM output
print(collector.last.calls[-1].http_response)  # HTTP response

Use Cases

Usage Tracking and Cost Monitoring

Track token usage across multiple function calls:
from baml_client import b
from baml_py import Collector

async def process_batch(documents: list[str]):
    collector = Collector(name="batch-processor")
    
    results = []
    for doc in documents:
        result = await b.ExtractResume(doc, baml_options={"collector": collector})
        results.append(result)
    
    # Total usage across all calls
    print(f"Total input tokens: {collector.usage.input_tokens}")
    print(f"Total output tokens: {collector.usage.output_tokens}")
    
    # Calculate cost (example rates)
    input_cost = collector.usage.input_tokens * 0.000003  # $0.003/1K tokens
    output_cost = collector.usage.output_tokens * 0.000015  # $0.015/1K tokens
    total_cost = input_cost + output_cost
    
    print(f"Total cost: ${total_cost:.4f}")
    
    return results

Debugging and Error Analysis

Inspect raw requests and responses for debugging:
from baml_client import b
from baml_py import Collector
import json

async def debug_extraction(text: str):
    collector = Collector(name="debug")
    
    try:
        result = await b.ExtractResume(text, baml_options={"collector": collector})
        return result
    except Exception as e:
        # Inspect what went wrong
        last_call = collector.last.calls[-1]
        
        print("Request URL:", last_call.http_request.url)
        print("Request body:", last_call.http_request.body.json())
        
        if last_call.http_response:
            print("Response status:", last_call.http_response.status)
            print("Response body:", last_call.http_response.body.text())
        
        raise

Performance Monitoring

Track latency and identify slow requests:
from baml_client import b
from baml_py import Collector

async def monitor_performance(inputs: list[str]):
    collector = Collector(name="perf-monitor")
    
    for input_text in inputs:
        await b.ExtractResume(input_text, baml_options={"collector": collector})
    
    # Analyze timing for each call
    for log in collector.logs:
        duration = log.timing.duration_ms
        print(f"Call {log.id}: {duration}ms")
        
        # Check individual LLM calls (including retries)
        for call in log.calls:
            print(f"  - {call.provider}: {call.timing.duration_ms}ms")
    
    # Calculate average latency
    total_duration = sum(log.timing.duration_ms for log in collector.logs)
    avg_duration = total_duration / len(collector.logs)
    print(f"Average latency: {avg_duration:.2f}ms")

Prompt Caching Analysis

Track cached token usage with providers that support prompt caching (Anthropic, OpenAI, Google, Vertex):
from baml_client import b
from baml_py import Collector

async def analyze_caching(large_content: str):
    collector = Collector(name="cache-tracker")
    
    # First call - content cached
    await b.AnalyzeDocument(large_content, "Question 1", baml_options={"collector": collector})
    
    # Second call - uses cached content
    await b.AnalyzeDocument(large_content, "Question 2", baml_options={"collector": collector})
    
    # Compare cache usage
    first_log = collector.logs[0]
    second_log = collector.logs[1]
    
    print(f"First call:")
    print(f"  Input tokens: {first_log.usage.input_tokens}")
    print(f"  Cached tokens: {first_log.usage.cached_input_tokens or 0}")
    
    print(f"Second call:")
    print(f"  Input tokens: {second_log.usage.input_tokens}")
    print(f"  Cached tokens: {second_log.usage.cached_input_tokens or 0}")
    
    # Calculate cache savings
    cache_hit_rate = (
        (second_log.usage.cached_input_tokens or 0) / 
        (second_log.usage.input_tokens or 1)
    ) * 100
    print(f"Cache hit rate: {cache_hit_rate:.1f}%")
Cached token tracking is supported for Anthropic, OpenAI, Google AI, and Vertex AI. AWS Bedrock does not currently report cached tokens.

Multiple Collectors for Different Metrics

Use separate collectors to track different aspects:
from baml_client import b
from baml_py import Collector

async def multi_collector_tracking():
    # Separate collectors for different concerns
    usage_collector = Collector(name="usage-tracking")
    debug_collector = Collector(name="debug-logs")
    
    # Use both collectors for the same call
    result = await b.ExtractResume(
        "...",
        baml_options={"collector": [usage_collector, debug_collector]}
    )
    
    # Both have the same data
    assert usage_collector.last.usage.input_tokens == debug_collector.last.usage.input_tokens
    
    # Use only usage_collector for subsequent calls
    await b.ExtractResume("...", baml_options={"collector": usage_collector})
    
    # usage_collector has 2 logs, debug_collector still has 1
    print(f"Usage logs: {len(usage_collector.logs)}")
    print(f"Debug logs: {len(debug_collector.logs)}")

Retry and Fallback Analysis

Inspect retry attempts and fallback behavior:
from baml_client import b
from baml_py import Collector

async def analyze_retries():
    collector = Collector(name="retry-analysis")
    
    result = await b.ExtractResume("...", baml_options={"collector": collector})
    
    log = collector.last
    
    print(f"Total LLM calls made: {len(log.calls)}")
    
    for i, call in enumerate(log.calls):
        print(f"\nAttempt {i + 1}:")
        print(f"  Provider: {call.provider}")
        print(f"  Client: {call.client_name}")
        print(f"  Selected: {call.selected}")
        print(f"  Duration: {call.timing.duration_ms}ms")
        
        if call.http_response:
            print(f"  Status: {call.http_response.status}")
        
        if call.usage:
            print(f"  Tokens: {call.usage.input_tokens} in / {call.usage.output_tokens} out")

Collector API

Collector Class

PropertyTypeDescription
logsFunctionLog[]All function calls (oldest to newest)
lastFunctionLog | nullMost recent function log
usageUsageCumulative usage across all calls
Methods:
MethodReturnsDescription
id(id: string)FunctionLog | nullGet log by ID
clear()voidClear all logs

FunctionLog

PropertyTypeDescription
idstringRequest ID
function_namestringFunction name
log_type"call" | "stream"Call type
timingTimingRequest timing
usageUsageAggregated usage
callsLLMCall[]All LLM calls (retries + fallbacks)
raw_llm_responsestring | nullRaw response text
tagsMap<string, any>User metadata

Usage

PropertyTypeDescription
input_tokensint | nullInput tokens used
output_tokensint | nullOutput tokens used
cached_input_tokensint | nullCached input tokens
Usage may not include all provider-specific token types (e.g., “thinking_tokens”, “cache_creation_input_tokens”). For those, inspect the raw HTTP response.

Timing

PropertyTypeDescription
start_time_utc_msintStart time (epoch ms)
duration_msint | nullDuration in milliseconds
For streaming calls, StreamTiming extends this with:
PropertyTypeDescription
time_to_first_token_msint | nullTime to first token

LLMCall

PropertyTypeDescription
client_namestringClient name
providerstringProvider name
timingTimingCall timing
http_requestHttpRequestRaw HTTP request
http_responseHttpResponse | nullRaw HTTP response
usageUsage | nullCall usage
selectedboolIf this call was used

HttpRequest / HttpResponse

HttpRequest:
  • url: Request URL
  • method: HTTP method
  • headers: Request headers
  • body: HTTPBody with .text() and .json() methods
HttpResponse:
  • status: HTTP status code
  • headers: Response headers
  • body: HTTPBody with .text() and .json() methods

Best Practices

  1. Single Collector for Related Calls: Use one collector instance for logically related function calls
  2. Clear When Needed: Call collector.clear() to reset state when starting a new logical grouping
  3. Multiple Collectors: Use separate collectors for different tracking purposes (usage vs. debugging)
  4. Error Handling: Always check for collector.last being null before accessing
  5. Cost Tracking: Combine with ClientRegistry to track per-model costs accurately

Build docs developers (and LLMs) love