Overview
LiteLLM provides extensive logging and observability features to track, monitor, and debug LLM requests. Integrate with popular observability platforms or build custom logging solutions.Basic Logging
Enable Verbose Logging
import litellm
# Enable debug logging
litellm.set_verbose = True
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# Prints detailed logs to stdout
Environment Variable
# Set log level via environment
export LITELLM_LOG=DEBUG # or INFO, WARNING, ERROR
# Enable JSON formatted logs
export JSON_LOGS=true
Custom Loggers
Basic Custom Logger
from litellm.integrations import CustomLogger
import litellm
class MyLogger(CustomLogger):
def log_pre_api_call(self, model, messages, kwargs):
print(f"Making request to {model}")
print(f"Messages: {messages}")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"Request succeeded")
print(f"Model: {kwargs.get('model')}")
print(f"Latency: {end_time - start_time}s")
print(f"Tokens: {response_obj.usage.total_tokens}")
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"Request failed")
print(f"Error: {kwargs.get('exception')}")
# Register the logger
my_logger = MyLogger()
litellm.callbacks = [my_logger]
# All requests will be logged
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
Async Custom Logger
class AsyncLogger(CustomLogger):
async def async_log_pre_api_call(self, model, messages, kwargs):
# Async pre-call logging
await self.log_to_database("pre_call", model, messages)
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
# Async success logging
await self.log_to_database("success", kwargs, response_obj)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
# Async failure logging
await self.log_to_database("failure", kwargs, response_obj)
async def log_to_database(self, event_type, *args):
# Your async database logging logic
pass
litellm.callbacks = [AsyncLogger()]
Built-in Integrations
LiteLLM integrates with 20+ observability platforms:Langfuse
import litellm
import os
# Set credentials
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
# Enable Langfuse logging
litellm.success_callback = ["langfuse"]
litellm.failure_callback = ["langfuse"]
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically logged to Langfuse
DataDog
import os
os.environ["DD_API_KEY"] = "your-datadog-key"
os.environ["DD_SITE"] = "datadoghq.com"
litellm.success_callback = ["datadog"]
litellm.failure_callback = ["datadog"]
# Metrics sent to DataDog
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
Prometheus
litellm.success_callback = ["prometheus"]
litellm.failure_callback = ["prometheus"]
# Metrics available at /metrics endpoint
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
OpenTelemetry
import os
os.environ["OTEL_EXPORTER"] = "otlp_http"
os.environ["OTEL_ENDPOINT"] = "http://localhost:4318"
litellm.success_callback = ["otel"]
litellm.failure_callback = ["otel"]
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
Other Integrations
# Available integrations:
litellm.success_callback = [
"langfuse",
"datadog",
"prometheus",
"otel",
"logfire",
"lunary",
"helicone",
"traceloop",
"athina",
"langsmith",
"langtrace",
"arize",
"wandb",
"braintrust",
"openmeter",
"galileo"
# ... and more
]
Standard Logging Payload
All loggers receive a standardized payload:class DetailedLogger(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
# Request information
model = kwargs.get("model")
messages = kwargs.get("messages")
temperature = kwargs.get("temperature")
max_tokens = kwargs.get("max_tokens")
# Response information
usage = response_obj.usage
prompt_tokens = usage.prompt_tokens
completion_tokens = usage.completion_tokens
total_tokens = usage.total_tokens
# Metadata
metadata = kwargs.get("metadata", {})
user_id = metadata.get("user_id")
request_id = kwargs.get("litellm_call_id")
# Timing
latency = end_time - start_time
# Cost
cost = kwargs.get("response_cost", 0)
print(f"Request {request_id}:")
print(f" Model: {model}")
print(f" User: {user_id}")
print(f" Latency: {latency:.2f}s")
print(f" Tokens: {total_tokens}")
print(f" Cost: ${cost:.6f}")
Logging Streaming Responses
class StreamLogger(CustomLogger):
def log_stream_event(self, kwargs, response_obj, start_time, end_time):
# Called for each streaming chunk
chunk = response_obj
if hasattr(chunk.choices[0].delta, 'content'):
content = chunk.choices[0].delta.content
print(f"Chunk: {content}")
async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time):
# Async version for better performance
pass
litellm.callbacks = [StreamLogger()]
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story"}],
stream=True
)
for chunk in response:
print(chunk.choices[0].delta.content, end="")
Database Logging
PostgreSQL Logger
import psycopg2
from litellm.integrations import CustomLogger
from datetime import datetime
class PostgreSQLLogger(CustomLogger):
def __init__(self):
super().__init__()
self.conn = psycopg2.connect(
host="localhost",
database="litellm_logs",
user="postgres",
password="password"
)
self.create_table()
def create_table(self):
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS llm_requests (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
request_id TEXT,
model TEXT,
user_id TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
latency REAL,
cost REAL,
status TEXT,
error TEXT
)
""")
self.conn.commit()
def log_success_event(self, kwargs, response_obj, start_time, end_time):
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO llm_requests
(timestamp, request_id, model, user_id, prompt_tokens,
completion_tokens, total_tokens, latency, cost, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
datetime.now(),
kwargs.get("litellm_call_id"),
kwargs.get("model"),
kwargs.get("metadata", {}).get("user_id"),
response_obj.usage.prompt_tokens,
response_obj.usage.completion_tokens,
response_obj.usage.total_tokens,
end_time - start_time,
kwargs.get("response_cost", 0),
"success"
))
self.conn.commit()
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO llm_requests
(timestamp, request_id, model, user_id, latency, status, error)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
datetime.now(),
kwargs.get("litellm_call_id"),
kwargs.get("model"),
kwargs.get("metadata", {}).get("user_id"),
end_time - start_time,
"failure",
str(kwargs.get("exception"))
))
self.conn.commit()
litellm.callbacks = [PostgreSQLLogger()]
Log Filtering
Selective Logging
class FilteredLogger(CustomLogger):
def __init__(self):
super().__init__()
self.log_models = ["gpt-4", "claude-3-opus"] # Only log these
def log_success_event(self, kwargs, response_obj, start_time, end_time):
model = kwargs.get("model")
# Only log specific models
if model not in self.log_models:
return
# Log the request
print(f"Logged request for {model}")
litellm.callbacks = [FilteredLogger()]
Privacy-Preserving Logging
class PrivateLogger(CustomLogger):
def __init__(self):
super().__init__(
turn_off_message_logging=True # Don't log messages/responses
)
def log_success_event(self, kwargs, response_obj, start_time, end_time):
# Only log metadata, not content
print(f"Model: {kwargs.get('model')}")
print(f"Tokens: {response_obj.usage.total_tokens}")
print(f"Cost: ${kwargs.get('response_cost', 0):.6f}")
# Messages and responses are not logged
litellm.callbacks = [PrivateLogger()]
Alerting
Error Alerting
class AlertLogger(CustomLogger):
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
exception = kwargs.get("exception")
model = kwargs.get("model")
# Send alert on failure
self.send_alert(
f"LLM Request Failed: {model}",
f"Error: {str(exception)}"
)
def send_alert(self, title, message):
# Send to Slack, PagerDuty, email, etc.
print(f"⚠️ ALERT: {title}")
print(message)
litellm.callbacks = [AlertLogger()]
Latency Alerting
class LatencyAlertLogger(CustomLogger):
def __init__(self, threshold_seconds=5.0):
super().__init__()
self.threshold = threshold_seconds
def log_success_event(self, kwargs, response_obj, start_time, end_time):
latency = end_time - start_time
if latency > self.threshold:
model = kwargs.get("model")
print(f"⚠️ High latency: {model} took {latency:.2f}s")
# Send alert
litellm.callbacks = [LatencyAlertLogger(threshold_seconds=5.0)]
Metrics and Analytics
Request Metrics
from collections import defaultdict
from datetime import datetime
class MetricsLogger(CustomLogger):
def __init__(self):
super().__init__()
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"total_cost": 0.0,
"by_model": defaultdict(int),
"by_user": defaultdict(int)
}
def log_success_event(self, kwargs, response_obj, start_time, end_time):
self.metrics["total_requests"] += 1
self.metrics["successful_requests"] += 1
self.metrics["total_tokens"] += response_obj.usage.total_tokens
self.metrics["total_cost"] += kwargs.get("response_cost", 0)
model = kwargs.get("model")
self.metrics["by_model"][model] += 1
user_id = kwargs.get("metadata", {}).get("user_id")
if user_id:
self.metrics["by_user"][user_id] += 1
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
self.metrics["total_requests"] += 1
self.metrics["failed_requests"] += 1
def print_metrics(self):
print("\n=== LLM Metrics ===")
print(f"Total Requests: {self.metrics['total_requests']}")
print(f"Success Rate: {self.metrics['successful_requests'] / self.metrics['total_requests'] * 100:.1f}%")
print(f"Total Tokens: {self.metrics['total_tokens']:,}")
print(f"Total Cost: ${self.metrics['total_cost']:.2f}")
print(f"\nBy Model:")
for model, count in self.metrics['by_model'].items():
print(f" {model}: {count}")
metrics_logger = MetricsLogger()
litellm.callbacks = [metrics_logger]
# Make requests...
# Then print metrics
metrics_logger.print_metrics()
Best Practices
Logging Best Practices
- Use async loggers - Avoid blocking requests
- Log selectively - Don’t log everything in production
- Protect sensitive data - Use
turn_off_message_logging=True - Monitor performance - Track logger latency
- Set up alerts - Get notified of failures
- Aggregate metrics - Track trends over time
- Use structured logging - JSON format for easier parsing
Troubleshooting
Logger Not Called
# Ensure logger is registered
litellm.callbacks = [my_logger]
# Or use success_callback/failure_callback
litellm.success_callback = ["langfuse"]
litellm.failure_callback = ["langfuse"]
Missing Metadata
# Pass metadata in request
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
metadata={
"user_id": "user-123",
"session_id": "session-456",
"custom_field": "value"
}
)
# Access in logger
class MyLogger(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
metadata = kwargs.get("metadata", {})
user_id = metadata.get("user_id")
High Logging Latency
# Use async methods
class FastLogger(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
# Non-blocking logging
await self.log_async(kwargs, response_obj)
# Batch logs
class BatchLogger(CustomLogger):
def __init__(self):
super().__init__()
self.batch = []
self.batch_size = 100
def log_success_event(self, kwargs, response_obj, start_time, end_time):
self.batch.append({"kwargs": kwargs, "response": response_obj})
if len(self.batch) >= self.batch_size:
self.flush_batch()
def flush_batch(self):
# Write batch to database
print(f"Flushing {len(self.batch)} logs")
self.batch = []
Related Features
- Cost Tracking - Monitor LLM costs
- Guardrails - Log policy violations
- Caching - Track cache hit rates
- Load Balancing - Monitor deployment usage