Skip to main content

Overview

Rate limiting controls how frequently your application makes API calls to LLM providers, preventing quota exhaustion, managing costs, and avoiding service throttling. LangChain provides both model-level and agent-level rate limiting.

Model-Level Rate Limiting

Apply rate limits directly to chat models using the built-in InMemoryRateLimiter:
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_anthropic import ChatAnthropic

# Limit to 1 request per 10 seconds
rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.1,  # 1 request every 10 seconds
    check_every_n_seconds=0.1,  # Check every 100ms
    max_bucket_size=10,  # Allow bursts up to 10 requests
)

model = ChatAnthropic(
    model="claude-sonnet-4-5-20250929",
    rate_limiter=rate_limiter,
)

# Calls are automatically rate limited
for i in range(5):
    response = model.invoke("Hello")
    print(f"Request {i+1} completed")
The model blocks until tokens are available. If you need 5 requests and limit is 0.1 requests/second, it takes ~50 seconds to complete.

InMemoryRateLimiter

Token bucket algorithm for time-based rate limiting.

Parameters

requests_per_second
float
default:"1.0"
Number of requests allowed per second. Fractional values enable slower rates:
  • 1.0: 1 request per second
  • 0.1: 1 request per 10 seconds
  • 10.0: 10 requests per second
check_every_n_seconds
float
default:"0.1"
How often to check for available tokens (in seconds). Lower values provide more granular timing but slightly more overhead.
max_bucket_size
float
default:"1.0"
Maximum tokens that can accumulate. Controls burst behavior:
  • 1.0: No burst, strictly sequential
  • 10.0: Allow bursts of up to 10 requests
Useful when rate limit is 10/second but you want to allow 20 requests immediately if tokens have accumulated.

Token Bucket Algorithm

The rate limiter uses a token bucket:
  1. Bucket fills with tokens at requests_per_second rate
  2. Each request consumes 1 token
  3. If bucket is empty, request blocks until token available
  4. Bucket capacity capped at max_bucket_size
# Example: 2 requests/second, max 5 tokens
rate_limiter = InMemoryRateLimiter(
    requests_per_second=2.0,
    max_bucket_size=5.0,
)

# Scenario:
# - Start: 0 tokens
# - Wait 2.5 seconds: 5 tokens (capped at max_bucket_size)
# - Make 5 rapid requests: All succeed immediately (burst)
# - 6th request: Blocks for 0.5 seconds until next token

Blocking vs Non-Blocking

Control whether to wait for tokens or fail immediately:
rate_limiter = InMemoryRateLimiter(requests_per_second=1.0)

# Blocking (default): Waits for token
if rate_limiter.acquire(blocking=True):
    make_request()

# Non-blocking: Returns False immediately if no token
if rate_limiter.acquire(blocking=False):
    make_request()
else:
    print("Rate limit reached, skipping request")

# Async version
if await rate_limiter.aacquire(blocking=True):
    await make_async_request()

Custom Rate Limiter

Implement BaseRateLimiter for custom strategies (distributed rate limiting, API-specific quotas, etc.):
from langchain_core.rate_limiters import BaseRateLimiter
import redis
import time

class RedisRateLimiter(BaseRateLimiter):
    """Distributed rate limiter using Redis."""
    
    def __init__(self, key: str, max_requests: int, window_seconds: int):
        self.redis = redis.Redis(host='localhost', port=6379)
        self.key = key
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    
    def acquire(self, *, blocking: bool = True) -> bool:
        """Acquire token using Redis sliding window."""
        current_time = time.time()
        window_start = current_time - self.window_seconds
        
        # Remove old entries
        self.redis.zremrangebyscore(self.key, 0, window_start)
        
        # Count requests in current window  
        current_count = self.redis.zcard(self.key)
        
        if current_count < self.max_requests:
            # Add current request
            self.redis.zadd(self.key, {str(current_time): current_time})
            self.redis.expire(self.key, self.window_seconds)
            return True
        
        if not blocking:
            return False
        
        # Wait for window to slide
        oldest = self.redis.zrange(self.key, 0, 0, withscores=True)
        if oldest:
            wait_time = oldest[0][1] + self.window_seconds - current_time
            if wait_time > 0:
                time.sleep(wait_time)
        
        return self.acquire(blocking=True)
    
    async def aacquire(self, *, blocking: bool = True) -> bool:
        """Async version using async Redis client."""
        # Implementation with aioredis
        pass

# Usage with shared state across processes
rate_limiter = RedisRateLimiter(
    key="api_calls:openai",
    max_requests=100,
    window_seconds=60,
)

model = ChatOpenAI(model="gpt-4", rate_limiter=rate_limiter)

BaseRateLimiter Interface

acquire
method
required
Synchronous token acquisition.Parameters:
  • blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)
aacquire
method
required
Async token acquisition.Parameters:
  • blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)

Agent-Level Rate Limiting

Combine rate limiting with middleware for finer control:
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call
from langchain.agents.middleware.types import ModelRequest, ModelResponse
from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(requests_per_second=2.0)

@wrap_model_call
def rate_limit_model(request: ModelRequest, handler) -> ModelResponse:
    """Rate limit all model calls in agent."""
    # Wait for rate limit
    rate_limiter.acquire(blocking=True)
    
    # Proceed with request
    return handler(request)

agent = create_agent(
    model="openai:gpt-4",
    tools=[search_tool],
    middleware=[rate_limit_model],
)

Per-Tool Rate Limiting

Limit specific tool execution rates:
from langchain.agents.middleware import wrap_tool_call
from langchain.agents.middleware.types import ToolCallRequest
from langchain_core.rate_limiters import InMemoryRateLimiter

# Separate rate limiters per tool
api_limiter = InMemoryRateLimiter(requests_per_second=5.0)
db_limiter = InMemoryRateLimiter(requests_per_second=10.0)

@wrap_tool_call
def rate_limit_tools(request: ToolCallRequest, handler):
    """Apply different rate limits per tool."""
    tool_name = request.tool.name if request.tool else request.tool_call["name"]
    
    if tool_name == "api_search":
        api_limiter.acquire(blocking=True)
    elif tool_name == "database_query":
        db_limiter.acquire(blocking=True)
    
    return handler(request)

agent = create_agent(
    model="openai:gpt-4",
    tools=[api_search, database_query],
    middleware=[rate_limit_tools],
)

Dynamic Rate Limiting

Adjust rate limits based on context:
from langchain.agents.middleware import wrap_model_call

class DynamicRateLimiter:
    """Rate limiter that adjusts based on user tier."""
    
    def __init__(self):
        self.limiters = {
            "free": InMemoryRateLimiter(requests_per_second=0.5),
            "pro": InMemoryRateLimiter(requests_per_second=5.0),
            "enterprise": InMemoryRateLimiter(requests_per_second=50.0),
        }
    
    def get_limiter(self, user_tier: str) -> InMemoryRateLimiter:
        return self.limiters.get(user_tier, self.limiters["free"])

dynamic_limiter = DynamicRateLimiter()

@wrap_model_call
def tiered_rate_limit(request: ModelRequest, handler) -> ModelResponse:
    """Apply rate limit based on user tier."""
    user_tier = request.runtime.config.get("configurable", {}).get("tier", "free")
    limiter = dynamic_limiter.get_limiter(user_tier)
    
    limiter.acquire(blocking=True)
    return handler(request)

# Usage
agent = create_agent(
    model="openai:gpt-4",
    middleware=[tiered_rate_limit],
)

# Set user tier in config
response = agent.invoke(
    {"messages": [HumanMessage("Hello")]},
    config={"configurable": {"tier": "pro"}},
)

Combining with Retry Logic

Use rate limiting with retry middleware for resilient API calls:
from langchain.agents.middleware import ModelRetryMiddleware

rate_limiter = InMemoryRateLimiter(requests_per_second=1.0)
retry_middleware = ModelRetryMiddleware(
    max_retries=3,
    backoff_factor=2.0,
)

model = ChatOpenAI(
    model="gpt-4",
    rate_limiter=rate_limiter,  # Rate limit at model level
)

agent = create_agent(
    model=model,
    middleware=[retry_middleware],  # Retry on errors
)

Monitoring Rate Limit Usage

Track rate limit consumption:
from langchain.agents.middleware import AgentMiddleware
from langchain_core.rate_limiters import InMemoryRateLimiter
import time

class RateLimitMonitor(AgentMiddleware):
    """Monitor rate limit token consumption."""
    
    def __init__(self, rate_limiter: InMemoryRateLimiter):
        super().__init__()
        self.rate_limiter = rate_limiter
        self.wait_time_total = 0
    
    def wrap_model_call(self, request, handler):
        start = time.time()
        
        # Acquire with monitoring
        self.rate_limiter.acquire(blocking=True)
        
        wait_time = time.time() - start
        self.wait_time_total += wait_time
        
        if wait_time > 0:
            print(f"Waited {wait_time:.2f}s for rate limit")
        
        return handler(request)
    
    def after_agent(self, state, runtime):
        print(f"Total rate limit wait time: {self.wait_time_total:.2f}s")
        print(f"Available tokens: {self.rate_limiter.available_tokens:.2f}")

rate_limiter = InMemoryRateLimiter(requests_per_second=1.0)
monitor = RateLimitMonitor(rate_limiter)

agent = create_agent(
    model="openai:gpt-4",
    middleware=[monitor],
)

Best Practices

Start with conservative limits and increase based on monitoring:
# Start conservative
rate_limiter = InMemoryRateLimiter(
    requests_per_second=1.0,  # 1 req/sec initially
    max_bucket_size=2.0,  # Limited burst
)

# Monitor and adjust
# If no rate limit errors and fast enough, increase to 2.0 req/sec
# If hitting provider limits, decrease to 0.5 req/sec
Set max_bucket_size to handle expected burst patterns:
# Handle morning traffic spike
rate_limiter = InMemoryRateLimiter(
    requests_per_second=5.0,  # Average rate
    max_bucket_size=50.0,  # Allow 50-request burst
)
Production and development should have different limits:
import os

if os.getenv("ENV") == "production":
    rate_limiter = InMemoryRateLimiter(requests_per_second=10.0)
else:
    rate_limiter = InMemoryRateLimiter(requests_per_second=1.0)
Track rate limit hits and set up alerts:
@wrap_model_call
def monitored_rate_limit(request: ModelRequest, handler) -> ModelResponse:
    start = time.time()
    rate_limiter.acquire(blocking=True)
    wait_time = time.time() - start
    
    if wait_time > 5.0:  # Alert if waiting >5 seconds
        alert_ops_team(f"High rate limit wait: {wait_time:.2f}s")
    
    return handler(request)

Limitations

InMemoryRateLimiter is in-memory only:
  • Does NOT work across multiple processes/servers
  • Resets on application restart
  • Thread-safe but not process-safe
For distributed systems, implement a custom BaseRateLimiter using Redis, DynamoDB, or similar.

Next Steps

Middleware System

Build custom rate limiting middleware

Performance

Optimize performance with caching and batching

Custom Tools

Rate limit specific tools

Build docs developers (and LLMs) love