Overview
Rate limiting controls how frequently your application makes API calls to LLM providers, preventing quota exhaustion, managing costs, and avoiding service throttling. LangChain provides both model-level and agent-level rate limiting.
Model-Level Rate Limiting
Apply rate limits directly to chat models using the built-in InMemoryRateLimiter:
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_anthropic import ChatAnthropic
# Limit to 1 request per 10 seconds
rate_limiter = InMemoryRateLimiter(
requests_per_second = 0.1 , # 1 request every 10 seconds
check_every_n_seconds = 0.1 , # Check every 100ms
max_bucket_size = 10 , # Allow bursts up to 10 requests
)
model = ChatAnthropic(
model = "claude-sonnet-4-5-20250929" ,
rate_limiter = rate_limiter,
)
# Calls are automatically rate limited
for i in range ( 5 ):
response = model.invoke( "Hello" )
print ( f "Request { i + 1 } completed" )
The model blocks until tokens are available. If you need 5 requests and limit is 0.1 requests/second, it takes ~50 seconds to complete.
InMemoryRateLimiter
Token bucket algorithm for time-based rate limiting.
Parameters
Number of requests allowed per second. Fractional values enable slower rates:
1.0: 1 request per second
0.1: 1 request per 10 seconds
10.0: 10 requests per second
How often to check for available tokens (in seconds). Lower values provide more granular timing but slightly more overhead.
Maximum tokens that can accumulate. Controls burst behavior:
1.0: No burst, strictly sequential
10.0: Allow bursts of up to 10 requests
Useful when rate limit is 10/second but you want to allow 20 requests immediately if tokens have accumulated.
Token Bucket Algorithm
The rate limiter uses a token bucket:
Bucket fills with tokens at requests_per_second rate
Each request consumes 1 token
If bucket is empty, request blocks until token available
Bucket capacity capped at max_bucket_size
# Example: 2 requests/second, max 5 tokens
rate_limiter = InMemoryRateLimiter(
requests_per_second = 2.0 ,
max_bucket_size = 5.0 ,
)
# Scenario:
# - Start: 0 tokens
# - Wait 2.5 seconds: 5 tokens (capped at max_bucket_size)
# - Make 5 rapid requests: All succeed immediately (burst)
# - 6th request: Blocks for 0.5 seconds until next token
Blocking vs Non-Blocking
Control whether to wait for tokens or fail immediately:
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
# Blocking (default): Waits for token
if rate_limiter.acquire( blocking = True ):
make_request()
# Non-blocking: Returns False immediately if no token
if rate_limiter.acquire( blocking = False ):
make_request()
else :
print ( "Rate limit reached, skipping request" )
# Async version
if await rate_limiter.aacquire( blocking = True ):
await make_async_request()
Custom Rate Limiter
Implement BaseRateLimiter for custom strategies (distributed rate limiting, API-specific quotas, etc.):
from langchain_core.rate_limiters import BaseRateLimiter
import redis
import time
class RedisRateLimiter ( BaseRateLimiter ):
"""Distributed rate limiter using Redis."""
def __init__ ( self , key : str , max_requests : int , window_seconds : int ):
self .redis = redis.Redis( host = 'localhost' , port = 6379 )
self .key = key
self .max_requests = max_requests
self .window_seconds = window_seconds
def acquire ( self , * , blocking : bool = True ) -> bool :
"""Acquire token using Redis sliding window."""
current_time = time.time()
window_start = current_time - self .window_seconds
# Remove old entries
self .redis.zremrangebyscore( self .key, 0 , window_start)
# Count requests in current window
current_count = self .redis.zcard( self .key)
if current_count < self .max_requests:
# Add current request
self .redis.zadd( self .key, { str (current_time): current_time})
self .redis.expire( self .key, self .window_seconds)
return True
if not blocking:
return False
# Wait for window to slide
oldest = self .redis.zrange( self .key, 0 , 0 , withscores = True )
if oldest:
wait_time = oldest[ 0 ][ 1 ] + self .window_seconds - current_time
if wait_time > 0 :
time.sleep(wait_time)
return self .acquire( blocking = True )
async def aacquire ( self , * , blocking : bool = True ) -> bool :
"""Async version using async Redis client."""
# Implementation with aioredis
pass
# Usage with shared state across processes
rate_limiter = RedisRateLimiter(
key = "api_calls:openai" ,
max_requests = 100 ,
window_seconds = 60 ,
)
model = ChatOpenAI( model = "gpt-4" , rate_limiter = rate_limiter)
BaseRateLimiter Interface
Synchronous token acquisition. Parameters:
blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)
Async token acquisition. Parameters:
blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)
Agent-Level Rate Limiting
Combine rate limiting with middleware for finer control:
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call
from langchain.agents.middleware.types import ModelRequest, ModelResponse
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter( requests_per_second = 2.0 )
@wrap_model_call
def rate_limit_model ( request : ModelRequest, handler ) -> ModelResponse:
"""Rate limit all model calls in agent."""
# Wait for rate limit
rate_limiter.acquire( blocking = True )
# Proceed with request
return handler(request)
agent = create_agent(
model = "openai:gpt-4" ,
tools = [search_tool],
middleware = [rate_limit_model],
)
Limit specific tool execution rates:
from langchain.agents.middleware import wrap_tool_call
from langchain.agents.middleware.types import ToolCallRequest
from langchain_core.rate_limiters import InMemoryRateLimiter
# Separate rate limiters per tool
api_limiter = InMemoryRateLimiter( requests_per_second = 5.0 )
db_limiter = InMemoryRateLimiter( requests_per_second = 10.0 )
@wrap_tool_call
def rate_limit_tools ( request : ToolCallRequest, handler ):
"""Apply different rate limits per tool."""
tool_name = request.tool.name if request.tool else request.tool_call[ "name" ]
if tool_name == "api_search" :
api_limiter.acquire( blocking = True )
elif tool_name == "database_query" :
db_limiter.acquire( blocking = True )
return handler(request)
agent = create_agent(
model = "openai:gpt-4" ,
tools = [api_search, database_query],
middleware = [rate_limit_tools],
)
Dynamic Rate Limiting
Adjust rate limits based on context:
from langchain.agents.middleware import wrap_model_call
class DynamicRateLimiter :
"""Rate limiter that adjusts based on user tier."""
def __init__ ( self ):
self .limiters = {
"free" : InMemoryRateLimiter( requests_per_second = 0.5 ),
"pro" : InMemoryRateLimiter( requests_per_second = 5.0 ),
"enterprise" : InMemoryRateLimiter( requests_per_second = 50.0 ),
}
def get_limiter ( self , user_tier : str ) -> InMemoryRateLimiter:
return self .limiters.get(user_tier, self .limiters[ "free" ])
dynamic_limiter = DynamicRateLimiter()
@wrap_model_call
def tiered_rate_limit ( request : ModelRequest, handler ) -> ModelResponse:
"""Apply rate limit based on user tier."""
user_tier = request.runtime.config.get( "configurable" , {}).get( "tier" , "free" )
limiter = dynamic_limiter.get_limiter(user_tier)
limiter.acquire( blocking = True )
return handler(request)
# Usage
agent = create_agent(
model = "openai:gpt-4" ,
middleware = [tiered_rate_limit],
)
# Set user tier in config
response = agent.invoke(
{ "messages" : [HumanMessage( "Hello" )]},
config = { "configurable" : { "tier" : "pro" }},
)
Combining with Retry Logic
Use rate limiting with retry middleware for resilient API calls:
from langchain.agents.middleware import ModelRetryMiddleware
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
retry_middleware = ModelRetryMiddleware(
max_retries = 3 ,
backoff_factor = 2.0 ,
)
model = ChatOpenAI(
model = "gpt-4" ,
rate_limiter = rate_limiter, # Rate limit at model level
)
agent = create_agent(
model = model,
middleware = [retry_middleware], # Retry on errors
)
Monitoring Rate Limit Usage
Track rate limit consumption:
from langchain.agents.middleware import AgentMiddleware
from langchain_core.rate_limiters import InMemoryRateLimiter
import time
class RateLimitMonitor ( AgentMiddleware ):
"""Monitor rate limit token consumption."""
def __init__ ( self , rate_limiter : InMemoryRateLimiter):
super (). __init__ ()
self .rate_limiter = rate_limiter
self .wait_time_total = 0
def wrap_model_call ( self , request , handler ):
start = time.time()
# Acquire with monitoring
self .rate_limiter.acquire( blocking = True )
wait_time = time.time() - start
self .wait_time_total += wait_time
if wait_time > 0 :
print ( f "Waited { wait_time :.2f} s for rate limit" )
return handler(request)
def after_agent ( self , state , runtime ):
print ( f "Total rate limit wait time: { self .wait_time_total :.2f} s" )
print ( f "Available tokens: { self .rate_limiter.available_tokens :.2f} " )
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
monitor = RateLimitMonitor(rate_limiter)
agent = create_agent(
model = "openai:gpt-4" ,
middleware = [monitor],
)
Best Practices
Start with conservative limits and increase based on monitoring: # Start conservative
rate_limiter = InMemoryRateLimiter(
requests_per_second = 1.0 , # 1 req/sec initially
max_bucket_size = 2.0 , # Limited burst
)
# Monitor and adjust
# If no rate limit errors and fast enough, increase to 2.0 req/sec
# If hitting provider limits, decrease to 0.5 req/sec
Account for Burst Traffic
Set max_bucket_size to handle expected burst patterns: # Handle morning traffic spike
rate_limiter = InMemoryRateLimiter(
requests_per_second = 5.0 , # Average rate
max_bucket_size = 50.0 , # Allow 50-request burst
)
Use Different Limits per Environment
Production and development should have different limits: import os
if os.getenv( "ENV" ) == "production" :
rate_limiter = InMemoryRateLimiter( requests_per_second = 10.0 )
else :
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
Track rate limit hits and set up alerts: @wrap_model_call
def monitored_rate_limit ( request : ModelRequest, handler ) -> ModelResponse:
start = time.time()
rate_limiter.acquire( blocking = True )
wait_time = time.time() - start
if wait_time > 5.0 : # Alert if waiting >5 seconds
alert_ops_team( f "High rate limit wait: { wait_time :.2f} s" )
return handler(request)
Limitations
InMemoryRateLimiter is in-memory only:
Does NOT work across multiple processes/servers
Resets on application restart
Thread-safe but not process-safe
For distributed systems, implement a custom BaseRateLimiter using Redis, DynamoDB, or similar.
Next Steps
Middleware System Build custom rate limiting middleware
Performance Optimize performance with caching and batching
Custom Tools Rate limit specific tools