Skip to main content

Overview

Agent performance depends on multiple factors: model latency, tool execution time, network I/O, and execution patterns. This guide covers proven optimization techniques for production deployments.

Model Response Caching

Cache identical requests to reduce costs and latency:
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
from langchain_openai import ChatOpenAI

# Enable global LLM caching
set_llm_cache(InMemoryCache())

model = ChatOpenAI(model="gpt-4")

# First call: hits API
response1 = model.invoke("What is 2+2?")

# Second identical call: served from cache (instant)
response2 = model.invoke("What is 2+2?")
Cache vs Prompt Caching: This is LangChain’s application-level cache, distinct from provider-specific prompt caching (Anthropic Claude, OpenAI GPT-4, etc.).

Custom Cache Implementations

Extend BaseCache for Redis, database, or file-based caching:
from langchain_core.caches import BaseCache
from langchain_core.outputs import Generation
import redis
import json

class RedisCache(BaseCache):
    """Redis-backed LLM cache for distributed systems."""
    
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
    
    def _make_key(self, prompt: str, llm_string: str) -> str:
        """Generate cache key from prompt and model config."""
        return f"llm_cache:{hash(prompt + llm_string)}"
    
    def lookup(self, prompt: str, llm_string: str) -> list[Generation] | None:
        """Look up cached response."""
        key = self._make_key(prompt, llm_string)
        cached = self.redis.get(key)
        
        if cached:
            data = json.loads(cached)
            return [Generation(**gen) for gen in data]
        return None
    
    def update(self, prompt: str, llm_string: str, return_val: list[Generation]) -> None:
        """Store response in cache."""
        key = self._make_key(prompt, llm_string)
        data = [gen.dict() for gen in return_val]
        self.redis.setex(key, self.ttl, json.dumps(data))
    
    def clear(self, **kwargs) -> None:
        """Clear all cached responses."""
        for key in self.redis.scan_iter("llm_cache:*"):
            self.redis.delete(key)

# Enable Redis caching
set_llm_cache(RedisCache(redis_url="redis://localhost:6379", ttl=7200))

Cache Parameters

prompt
str
Serialized prompt string. For chat models, this includes message history.
llm_string
str
Serialized model configuration (model name, temperature, max tokens, etc.). Ensures different configs use separate cache entries.
return_val
list[Generation]
List of Generation objects to cache. Contains text, metadata, and token usage.

Async Execution

Use async patterns for concurrent tool execution and reduced latency:
from langchain.agents import create_agent
from langchain_core.tools import tool
import aiohttp

@tool
async def fetch_weather(city: str) -> str:
    """Get weather for a city."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.weather.com/{city}") as resp:
            data = await resp.json()
            return data["description"]

@tool
async def fetch_news(topic: str) -> str:
    """Get latest news for a topic."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.news.com/{topic}") as resp:
            data = await resp.json()
            return data["headline"]

agent = create_agent(
    model="openai:gpt-4",
    tools=[fetch_weather, fetch_news],  # Both async
)

# Async invocation enables concurrent tool execution
response = await agent.ainvoke({
    "messages": [HumanMessage("Get weather in SF and latest tech news")]
})
When the model calls multiple tools in one turn, async tools execute concurrently, dramatically reducing total execution time.

Batching Requests

Process multiple inputs efficiently with batching:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

# Sequential (slow)
results = [model.invoke(msg) for msg in messages]  # N separate API calls

# Batched (fast)
results = model.batch(messages)  # 1 batch API call

# Async batching (fastest for large batches)
results = await model.abatch(messages)

Agent Batching

Batch multiple agent invocations:
agent = create_agent(model="openai:gpt-4", tools=[search_tool])

inputs = [
    {"messages": [HumanMessage("Weather in NYC?")]},
    {"messages": [HumanMessage("Capital of France?")]},
    {"messages": [HumanMessage("2+2=")]},
]

# Process all inputs in parallel
results = await agent.abatch(inputs)

Streaming for Perceived Performance

Stream responses to improve perceived latency:
async for chunk in agent.astream(
    {"messages": [HumanMessage("Explain quantum computing")]}
):
    print(chunk, end="", flush=True)
Stream mode options:
async for mode, event in agent.astream(
    {"messages": [HumanMessage("Hello")]},
    stream_mode=["messages", "updates", "custom"],
):
    if mode == "messages":
        # Individual message chunks (fastest feedback)
        print(event.content, end="")
    elif mode == "updates":
        # State updates after each node
        print(f"Node update: {event}")
    elif mode == "custom":
        # Custom events from middleware
        print(f"Status: {event}")

Parallel Tool Execution

LangChain automatically executes multiple tool calls concurrently when using async:
# If LLM returns multiple tool calls in one response:
# [
#   {"name": "search", "args": {"query": "Python"}},
#   {"name": "weather", "args": {"city": "NYC"}},
#   {"name": "news", "args": {"topic": "AI"}},
# ]
# All three execute concurrently with async tools!

@tool
async def search(query: str) -> str:
    """Search the web."""
    # 2 second API call
    return results

@tool
async def weather(city: str) -> str:
    """Get weather."""
    # 1 second API call
    return conditions

@tool  
async def news(topic: str) -> str:
    """Get news."""
    # 3 second API call
    return articles

# Sequential: 2s + 1s + 3s = 6 seconds total
# Concurrent: max(2s, 1s, 3s) = 3 seconds total (2x faster!)

Model Configuration Tuning

Optimize model parameters for your use case:
from langchain_openai import ChatOpenAI

# Fast responses (lower quality)
fast_model = ChatOpenAI(
    model="gpt-4o-mini",  # Smaller, faster model
    temperature=0.3,  # Lower temperature = faster sampling
    max_tokens=500,  # Limit output length
)

# High quality (slower)
quality_model = ChatOpenAI(
    model="gpt-4o",
    temperature=0.7,
    max_tokens=2000,
)

# Use fast model for simple tasks, quality model for complex ones
from langchain.agents.middleware import wrap_model_call

@wrap_model_call
def adaptive_model_selection(request, handler):
    """Use fast model for simple queries."""
    user_message = request.messages[-1].content
    
    if len(user_message) < 50 and "?" in user_message:
        # Simple question: use fast model
        return handler(request.override(model=fast_model))
    else:
        # Complex task: use quality model
        return handler(request)

Middleware Caching

Implement custom caching with middleware:
from langchain.agents.middleware import wrap_model_call
from langchain.agents.middleware.types import ModelRequest, ModelResponse
import hashlib
import json

class ModelCacheMiddleware:
    """Cache model responses based on messages."""
    
    def __init__(self):
        self.cache = {}
    
    def _make_key(self, messages: list) -> str:
        """Generate cache key from messages."""
        content = json.dumps([m.content for m in messages])
        return hashlib.md5(content.encode()).hexdigest()
    
    def __call__(self, request: ModelRequest, handler) -> ModelResponse:
        key = self._make_key(request.messages)
        
        # Check cache
        if key in self.cache:
            print(f"Cache hit for key {key[:8]}...")
            return self.cache[key]
        
        # Execute and cache
        response = handler(request)
        self.cache[key] = response
        print(f"Cached response for key {key[:8]}...")
        return response

cache = ModelCacheMiddleware()

agent = create_agent(
    model="openai:gpt-4",
    middleware=[cache],
)

Connection Pooling

Reuse HTTP connections for better performance:
import httpx
from langchain_openai import ChatOpenAI

# Create persistent HTTP client
client = httpx.Client(
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20,
    ),
    timeout=httpx.Timeout(30.0),
)

# Models share the client
model = ChatOpenAI(
    model="gpt-4",
    http_client=client,  # Reuse connections
)

Monitoring Performance

Track performance metrics with middleware:
from langchain.agents.middleware import AgentMiddleware
import time
import statistics

class PerformanceMonitor(AgentMiddleware):
    """Monitor agent execution performance."""
    
    def __init__(self):
        super().__init__()
        self.model_times = []
        self.tool_times = []
    
    def wrap_model_call(self, request, handler):
        start = time.time()
        response = handler(request)
        duration = time.time() - start
        
        self.model_times.append(duration)
        return response
    
    def wrap_tool_call(self, request, handler):
        start = time.time()
        result = handler(request)
        duration = time.time() - start
        
        self.tool_times.append(duration)
        return result
    
    def after_agent(self, state, runtime):
        if self.model_times:
            print(f"\n=== Performance Report ===")
            print(f"Model calls: {len(self.model_times)}")
            print(f"  Avg: {statistics.mean(self.model_times):.2f}s")
            print(f"  P95: {statistics.quantiles(self.model_times, n=20)[18]:.2f}s")
        
        if self.tool_times:
            print(f"Tool calls: {len(self.tool_times)}")
            print(f"  Avg: {statistics.mean(self.tool_times):.2f}s")
            print(f"  P95: {statistics.quantiles(self.tool_times, n=20)[18]:.2f}s")

monitor = PerformanceMonitor()

agent = create_agent(
    model="openai:gpt-4",
    tools=[search_tool],
    middleware=[monitor],
)

Best Practices

Measure where time is actually spent:
import cProfile
import pstats

profiler = cProfile.Profile()
profiler.enable()

# Run agent
response = agent.invoke({"messages": [HumanMessage("Hello")]})

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)  # Top 20 slowest functions
Evaluate if smaller, faster models meet your quality bar:
# Test against your evaluation set
models_to_test = [
    "gpt-4o-mini",      # Fastest
    "gpt-4o",           # Balanced
    "gpt-4-turbo",      # High quality
]

for model_name in models_to_test:
    model = ChatOpenAI(model=model_name)
    results = evaluate(model, test_cases)
    print(f"{model_name}: {results['accuracy']:.2%} accuracy, {results['avg_latency']:.2f}s")
Cache at multiple levels:
# 1. Model-level cache (built-in)
set_llm_cache(RedisCache(...))

# 2. Tool-level cache (custom)
@tool
@functools.lru_cache(maxsize=1000)
def expensive_api_call(query: str) -> str:
    return api.search(query)

# 3. Agent-level cache (middleware)
agent = create_agent(
    model=model,
    middleware=[CacheMiddleware()],
)
Prevent unbounded execution:
import asyncio

# Timeout for entire agent execution
try:
    response = await asyncio.wait_for(
        agent.ainvoke({"messages": [HumanMessage("Hello")]}),
        timeout=30.0,  # 30 second max
    )
except asyncio.TimeoutError:
    print("Agent execution timed out")

# Timeout for individual tools
@tool
async def search_with_timeout(query: str) -> str:
    """Search with 10 second timeout."""
    try:
        return await asyncio.wait_for(
            api.search(query),
            timeout=10.0,
        )
    except asyncio.TimeoutError:
        return "Search timed out"

Common Performance Pitfalls

Avoid These Mistakes:
  1. Sync tools with async agent: Implement async versions of I/O-bound tools
  2. No caching: Repeated identical calls waste money and time
  3. Sequential execution: Use abatch() instead of loops
  4. Large context windows: Trim conversation history to essential messages
  5. No timeouts: Implement timeouts at tool and agent levels

Performance Checklist

  • Enable LLM caching for repeated queries
  • Implement async tools for I/O operations
  • Use abatch() for multiple inputs
  • Stream responses for better UX
  • Set appropriate timeouts
  • Monitor latency with middleware
  • Profile to identify bottlenecks
  • Use smaller models where quality permits
  • Implement connection pooling
  • Cache tool results when appropriate

Next Steps

Middleware System

Build performance monitoring middleware

Rate Limiting

Balance performance with API quotas

Custom Tools

Optimize tool implementations

Build docs developers (and LLMs) love