Skip to main content

Overview

LiteLLM provides extensive logging and observability features to track, monitor, and debug LLM requests. Integrate with popular observability platforms or build custom logging solutions.

Basic Logging

Enable Verbose Logging

import litellm

# Enable debug logging
litellm.set_verbose = True

response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)
# Prints detailed logs to stdout

Environment Variable

# Set log level via environment
export LITELLM_LOG=DEBUG  # or INFO, WARNING, ERROR

# Enable JSON formatted logs
export JSON_LOGS=true

Custom Loggers

Basic Custom Logger

from litellm.integrations import CustomLogger
import litellm

class MyLogger(CustomLogger):
    def log_pre_api_call(self, model, messages, kwargs):
        print(f"Making request to {model}")
        print(f"Messages: {messages}")
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        print(f"Request succeeded")
        print(f"Model: {kwargs.get('model')}")
        print(f"Latency: {end_time - start_time}s")
        print(f"Tokens: {response_obj.usage.total_tokens}")
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        print(f"Request failed")
        print(f"Error: {kwargs.get('exception')}")

# Register the logger
my_logger = MyLogger()
litellm.callbacks = [my_logger]

# All requests will be logged
response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

Async Custom Logger

class AsyncLogger(CustomLogger):
    async def async_log_pre_api_call(self, model, messages, kwargs):
        # Async pre-call logging
        await self.log_to_database("pre_call", model, messages)
    
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Async success logging
        await self.log_to_database("success", kwargs, response_obj)
    
    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
        # Async failure logging
        await self.log_to_database("failure", kwargs, response_obj)
    
    async def log_to_database(self, event_type, *args):
        # Your async database logging logic
        pass

litellm.callbacks = [AsyncLogger()]

Built-in Integrations

LiteLLM integrates with 20+ observability platforms:

Langfuse

import litellm
import os

# Set credentials
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"

# Enable Langfuse logging
litellm.success_callback = ["langfuse"]
litellm.failure_callback = ["langfuse"]

response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically logged to Langfuse

DataDog

import os

os.environ["DD_API_KEY"] = "your-datadog-key"
os.environ["DD_SITE"] = "datadoghq.com"

litellm.success_callback = ["datadog"]
litellm.failure_callback = ["datadog"]

# Metrics sent to DataDog
response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

Prometheus

litellm.success_callback = ["prometheus"]
litellm.failure_callback = ["prometheus"]

# Metrics available at /metrics endpoint
response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

OpenTelemetry

import os

os.environ["OTEL_EXPORTER"] = "otlp_http"
os.environ["OTEL_ENDPOINT"] = "http://localhost:4318"

litellm.success_callback = ["otel"]
litellm.failure_callback = ["otel"]

response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

Other Integrations

# Available integrations:
litellm.success_callback = [
    "langfuse",
    "datadog",
    "prometheus",
    "otel",
    "logfire",
    "lunary",
    "helicone",
    "traceloop",
    "athina",
    "langsmith",
    "langtrace",
    "arize",
    "wandb",
    "braintrust",
    "openmeter",
    "galileo"
    # ... and more
]

Standard Logging Payload

All loggers receive a standardized payload:
class DetailedLogger(CustomLogger):
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Request information
        model = kwargs.get("model")
        messages = kwargs.get("messages")
        temperature = kwargs.get("temperature")
        max_tokens = kwargs.get("max_tokens")
        
        # Response information
        usage = response_obj.usage
        prompt_tokens = usage.prompt_tokens
        completion_tokens = usage.completion_tokens
        total_tokens = usage.total_tokens
        
        # Metadata
        metadata = kwargs.get("metadata", {})
        user_id = metadata.get("user_id")
        request_id = kwargs.get("litellm_call_id")
        
        # Timing
        latency = end_time - start_time
        
        # Cost
        cost = kwargs.get("response_cost", 0)
        
        print(f"Request {request_id}:")
        print(f"  Model: {model}")
        print(f"  User: {user_id}")
        print(f"  Latency: {latency:.2f}s")
        print(f"  Tokens: {total_tokens}")
        print(f"  Cost: ${cost:.6f}")

Logging Streaming Responses

class StreamLogger(CustomLogger):
    def log_stream_event(self, kwargs, response_obj, start_time, end_time):
        # Called for each streaming chunk
        chunk = response_obj
        if hasattr(chunk.choices[0].delta, 'content'):
            content = chunk.choices[0].delta.content
            print(f"Chunk: {content}")
    
    async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time):
        # Async version for better performance
        pass

litellm.callbacks = [StreamLogger()]

response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a story"}],
    stream=True
)

for chunk in response:
    print(chunk.choices[0].delta.content, end="")

Database Logging

PostgreSQL Logger

import psycopg2
from litellm.integrations import CustomLogger
from datetime import datetime

class PostgreSQLLogger(CustomLogger):
    def __init__(self):
        super().__init__()
        self.conn = psycopg2.connect(
            host="localhost",
            database="litellm_logs",
            user="postgres",
            password="password"
        )
        self.create_table()
    
    def create_table(self):
        cursor = self.conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS llm_requests (
                id SERIAL PRIMARY KEY,
                timestamp TIMESTAMP,
                request_id TEXT,
                model TEXT,
                user_id TEXT,
                prompt_tokens INTEGER,
                completion_tokens INTEGER,
                total_tokens INTEGER,
                latency REAL,
                cost REAL,
                status TEXT,
                error TEXT
            )
        """)
        self.conn.commit()
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT INTO llm_requests 
            (timestamp, request_id, model, user_id, prompt_tokens, 
             completion_tokens, total_tokens, latency, cost, status)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            datetime.now(),
            kwargs.get("litellm_call_id"),
            kwargs.get("model"),
            kwargs.get("metadata", {}).get("user_id"),
            response_obj.usage.prompt_tokens,
            response_obj.usage.completion_tokens,
            response_obj.usage.total_tokens,
            end_time - start_time,
            kwargs.get("response_cost", 0),
            "success"
        ))
        self.conn.commit()
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT INTO llm_requests 
            (timestamp, request_id, model, user_id, latency, status, error)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """, (
            datetime.now(),
            kwargs.get("litellm_call_id"),
            kwargs.get("model"),
            kwargs.get("metadata", {}).get("user_id"),
            end_time - start_time,
            "failure",
            str(kwargs.get("exception"))
        ))
        self.conn.commit()

litellm.callbacks = [PostgreSQLLogger()]

Log Filtering

Selective Logging

class FilteredLogger(CustomLogger):
    def __init__(self):
        super().__init__()
        self.log_models = ["gpt-4", "claude-3-opus"]  # Only log these
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        model = kwargs.get("model")
        
        # Only log specific models
        if model not in self.log_models:
            return
        
        # Log the request
        print(f"Logged request for {model}")

litellm.callbacks = [FilteredLogger()]

Privacy-Preserving Logging

class PrivateLogger(CustomLogger):
    def __init__(self):
        super().__init__(
            turn_off_message_logging=True  # Don't log messages/responses
        )
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Only log metadata, not content
        print(f"Model: {kwargs.get('model')}")
        print(f"Tokens: {response_obj.usage.total_tokens}")
        print(f"Cost: ${kwargs.get('response_cost', 0):.6f}")
        # Messages and responses are not logged

litellm.callbacks = [PrivateLogger()]

Alerting

Error Alerting

class AlertLogger(CustomLogger):
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        exception = kwargs.get("exception")
        model = kwargs.get("model")
        
        # Send alert on failure
        self.send_alert(
            f"LLM Request Failed: {model}",
            f"Error: {str(exception)}"
        )
    
    def send_alert(self, title, message):
        # Send to Slack, PagerDuty, email, etc.
        print(f"⚠️ ALERT: {title}")
        print(message)

litellm.callbacks = [AlertLogger()]

Latency Alerting

class LatencyAlertLogger(CustomLogger):
    def __init__(self, threshold_seconds=5.0):
        super().__init__()
        self.threshold = threshold_seconds
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        latency = end_time - start_time
        
        if latency > self.threshold:
            model = kwargs.get("model")
            print(f"⚠️ High latency: {model} took {latency:.2f}s")
            # Send alert

litellm.callbacks = [LatencyAlertLogger(threshold_seconds=5.0)]

Metrics and Analytics

Request Metrics

from collections import defaultdict
from datetime import datetime

class MetricsLogger(CustomLogger):
    def __init__(self):
        super().__init__()
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "by_model": defaultdict(int),
            "by_user": defaultdict(int)
        }
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        self.metrics["total_requests"] += 1
        self.metrics["successful_requests"] += 1
        self.metrics["total_tokens"] += response_obj.usage.total_tokens
        self.metrics["total_cost"] += kwargs.get("response_cost", 0)
        
        model = kwargs.get("model")
        self.metrics["by_model"][model] += 1
        
        user_id = kwargs.get("metadata", {}).get("user_id")
        if user_id:
            self.metrics["by_user"][user_id] += 1
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        self.metrics["total_requests"] += 1
        self.metrics["failed_requests"] += 1
    
    def print_metrics(self):
        print("\n=== LLM Metrics ===")
        print(f"Total Requests: {self.metrics['total_requests']}")
        print(f"Success Rate: {self.metrics['successful_requests'] / self.metrics['total_requests'] * 100:.1f}%")
        print(f"Total Tokens: {self.metrics['total_tokens']:,}")
        print(f"Total Cost: ${self.metrics['total_cost']:.2f}")
        print(f"\nBy Model:")
        for model, count in self.metrics['by_model'].items():
            print(f"  {model}: {count}")

metrics_logger = MetricsLogger()
litellm.callbacks = [metrics_logger]

# Make requests...
# Then print metrics
metrics_logger.print_metrics()

Best Practices

Logging Best Practices

  1. Use async loggers - Avoid blocking requests
  2. Log selectively - Don’t log everything in production
  3. Protect sensitive data - Use turn_off_message_logging=True
  4. Monitor performance - Track logger latency
  5. Set up alerts - Get notified of failures
  6. Aggregate metrics - Track trends over time
  7. Use structured logging - JSON format for easier parsing

Troubleshooting

Logger Not Called

# Ensure logger is registered
litellm.callbacks = [my_logger]

# Or use success_callback/failure_callback
litellm.success_callback = ["langfuse"]
litellm.failure_callback = ["langfuse"]

Missing Metadata

# Pass metadata in request
response = completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
    metadata={
        "user_id": "user-123",
        "session_id": "session-456",
        "custom_field": "value"
    }
)

# Access in logger
class MyLogger(CustomLogger):
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        metadata = kwargs.get("metadata", {})
        user_id = metadata.get("user_id")

High Logging Latency

# Use async methods
class FastLogger(CustomLogger):
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Non-blocking logging
        await self.log_async(kwargs, response_obj)

# Batch logs
class BatchLogger(CustomLogger):
    def __init__(self):
        super().__init__()
        self.batch = []
        self.batch_size = 100
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        self.batch.append({"kwargs": kwargs, "response": response_obj})
        
        if len(self.batch) >= self.batch_size:
            self.flush_batch()
    
    def flush_batch(self):
        # Write batch to database
        print(f"Flushing {len(self.batch)} logs")
        self.batch = []

Build docs developers (and LLMs) love