Overview
LiteLLM supports async operations for all API calls, allowing you to make concurrent requests and improve throughput in async applications.Basic Usage
import asyncio
from litellm import acompletion
async def main():
response = await acompletion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello, how are you?"}]
)
print(response.choices[0].message.content)
asyncio.run(main())
Async Functions
All LiteLLM functions have async equivalents prefixed witha:
completion()→acompletion()embedding()→aembedding()image_generation()→aimage_generation()transcription()→atranscription()speech()→aspeech()
Function Signature
async def acompletion(
model: str,
messages: List[Dict[str, str]],
# Same parameters as completion()
timeout: Optional[Union[float, int]] = None,
temperature: Optional[float] = None,
stream: Optional[bool] = None,
# ... all other completion() parameters
**kwargs
) -> Union[ModelResponse, CustomStreamWrapper]
Examples
Single Async Request
import asyncio
from litellm import acompletion
async def get_response():
response = await acompletion(
model="gpt-4",
messages=[{"role": "user", "content": "Explain async programming"}],
temperature=0.7
)
return response.choices[0].message.content
result = asyncio.run(get_response())
print(result)
Concurrent Requests
import asyncio
from litellm import acompletion
async def get_multiple_responses():
# Create multiple tasks
tasks = [
acompletion(
model="gpt-4",
messages=[{"role": "user", "content": f"What is {i} + {i}?"}]
)
for i in range(5)
]
# Run concurrently
responses = await asyncio.gather(*tasks)
# Process results
for i, response in enumerate(responses):
print(f"Response {i}: {response.choices[0].message.content}")
asyncio.run(get_multiple_responses())
Multiple Models Concurrently
import asyncio
from litellm import acompletion
async def compare_models():
question = "What is the capital of France?"
# Query different models concurrently
tasks = [
acompletion(model="gpt-4", messages=[{"role": "user", "content": question}]),
acompletion(model="claude-3-5-sonnet-20241022", messages=[{"role": "user", "content": question}]),
acompletion(model="gemini-pro", messages=[{"role": "user", "content": question}]),
]
responses = await asyncio.gather(*tasks)
models = ["GPT-4", "Claude", "Gemini"]
for model, response in zip(models, responses):
print(f"{model}: {response.choices[0].message.content}")
asyncio.run(compare_models())
Error Handling with Async
import asyncio
from litellm import acompletion
from litellm.exceptions import RateLimitError, APIError
async def robust_completion(model: str, messages: list):
try:
response = await acompletion(model=model, messages=messages)
return response.choices[0].message.content
except RateLimitError:
print(f"Rate limit hit for {model}, retrying...")
await asyncio.sleep(5)
return await robust_completion(model, messages)
except APIError as e:
print(f"API error for {model}: {e}")
return None
async def main():
result = await robust_completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}]
)
print(result)
asyncio.run(main())
Async with Retries
import asyncio
from litellm import acompletion
async def completion_with_retry(model: str, messages: list, max_retries: int = 3):
for attempt in range(max_retries):
try:
response = await acompletion(
model=model,
messages=messages,
timeout=30.0
)
return response
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def main():
response = await completion_with_retry(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}]
)
print(response.choices[0].message.content)
asyncio.run(main())
Async Streaming
Basic Async Streaming
import asyncio
from litellm import acompletion
async def stream_response():
response = await acompletion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a short poem"}],
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_response())
Multiple Concurrent Streams
import asyncio
from litellm import acompletion
async def stream_task(task_id: int, prompt: str):
print(f"\nStarting task {task_id}...")
response = await acompletion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
result = ""
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
result += content
print(f"\nTask {task_id} complete: {result[:50]}...")
return result
async def main():
tasks = [
stream_task(1, "Count to 5"),
stream_task(2, "Name 3 colors"),
stream_task(3, "Say hello in 3 languages")
]
results = await asyncio.gather(*tasks)
print("\n\nAll tasks completed!")
asyncio.run(main())
Async with Other Operations
Async Embeddings
import asyncio
from litellm import aembedding
async def get_embeddings():
texts = [
"The quick brown fox",
"jumps over the lazy dog",
"Machine learning is fascinating"
]
tasks = [
aembedding(model="text-embedding-ada-002", input=[text])
for text in texts
]
responses = await asyncio.gather(*tasks)
for text, response in zip(texts, responses):
embedding = response.data[0].embedding
print(f"Text: {text}")
print(f"Embedding dimensions: {len(embedding)}\n")
asyncio.run(get_embeddings())
Async Image Generation
import asyncio
from litellm import aimage_generation
async def generate_images():
prompts = [
"A serene mountain landscape",
"A futuristic cityscape",
"An abstract painting"
]
tasks = [
aimage_generation(model="dall-e-3", prompt=prompt)
for prompt in prompts
]
responses = await asyncio.gather(*tasks)
for prompt, response in zip(prompts, responses):
print(f"Generated image for: {prompt}")
print(f"URL: {response.data[0].url}\n")
asyncio.run(generate_images())
Performance Optimization
Semaphore for Rate Limiting
import asyncio
from litellm import acompletion
async def limited_completion(semaphore: asyncio.Semaphore, model: str, messages: list):
async with semaphore:
response = await acompletion(model=model, messages=messages)
return response.choices[0].message.content
async def main():
# Limit to 5 concurrent requests
semaphore = asyncio.Semaphore(5)
tasks = [
limited_completion(
semaphore,
"gpt-4",
[{"role": "user", "content": f"What is {i}?"}]
)
for i in range(20)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())
Timeout and Cancellation
import asyncio
from litellm import acompletion
async def main():
try:
# Set overall timeout for the operation
response = await asyncio.wait_for(
acompletion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a long story"}],
max_tokens=1000
),
timeout=10.0 # 10 second timeout
)
print(response.choices[0].message.content)
except asyncio.TimeoutError:
print("Request timed out")
asyncio.run(main())
Batch Processing with Chunks
import asyncio
from litellm import acompletion
from typing import List
async def process_batch(items: List[str], batch_size: int = 5):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}...")
tasks = [
acompletion(
model="gpt-4",
messages=[{"role": "user", "content": item}]
)
for item in batch
]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
# Small delay between batches
if i + batch_size < len(items):
await asyncio.sleep(1)
return results
async def main():
items = [f"Question {i}" for i in range(15)]
results = await process_batch(items, batch_size=5)
print(f"Processed {len(results)} items")
asyncio.run(main())
Integration Examples
With FastAPI
from fastapi import FastAPI
from litellm import acompletion
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4"
@app.post("/chat")
async def chat(request: ChatRequest):
response = await acompletion(
model=request.model,
messages=[{"role": "user", "content": request.message}]
)
return {"response": response.choices[0].message.content}
# Run with: uvicorn app:app --reload
With asyncio and aiohttp
import asyncio
import aiohttp
from litellm import acompletion
async def fetch_and_analyze(session: aiohttp.ClientSession, url: str):
# Fetch content
async with session.get(url) as response:
content = await response.text()
# Analyze with LLM
llm_response = await acompletion(
model="gpt-4",
messages=[{
"role": "user",
"content": f"Summarize this: {content[:500]}"
}]
)
return llm_response.choices[0].message.content
async def main():
urls = [
"https://example.com/article1",
"https://example.com/article2"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_and_analyze(session, url) for url in urls]
summaries = await asyncio.gather(*tasks)
for url, summary in zip(urls, summaries):
print(f"{url}:\n{summary}\n")
asyncio.run(main())
Best Practices
- Use connection pooling: Reuse HTTP connections for better performance
- Set appropriate timeouts: Prevent hanging requests with timeout parameters
- Implement rate limiting: Use semaphores to control concurrent requests
- Handle errors gracefully: Always wrap async calls in try-except blocks
- Use asyncio.gather(): For concurrent operations with no dependencies
- Monitor resource usage: Async doesn’t mean unlimited concurrency
Troubleshooting
Common Issues
RuntimeError: This event loop is already running# Don't nest asyncio.run()
# Instead, await directly if already in async context
async def main():
response = await acompletion(...) # ✅ Correct
# asyncio.run(acompletion(...)) # ❌ Wrong
# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
# Set appropriate timeouts
response = await acompletion(
model="gpt-4",
messages=[...],
timeout=60.0 # 60 seconds
)