Skip to main content

Overview

LiteLLM supports async operations for all API calls, allowing you to make concurrent requests and improve throughput in async applications.

Basic Usage

import asyncio
from litellm import acompletion

async def main():
    response = await acompletion(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello, how are you?"}]
    )
    print(response.choices[0].message.content)

asyncio.run(main())

Async Functions

All LiteLLM functions have async equivalents prefixed with a:
  • completion()acompletion()
  • embedding()aembedding()
  • image_generation()aimage_generation()
  • transcription()atranscription()
  • speech()aspeech()

Function Signature

async def acompletion(
    model: str,
    messages: List[Dict[str, str]],
    # Same parameters as completion()
    timeout: Optional[Union[float, int]] = None,
    temperature: Optional[float] = None,
    stream: Optional[bool] = None,
    # ... all other completion() parameters
    **kwargs
) -> Union[ModelResponse, CustomStreamWrapper]

Examples

Single Async Request

import asyncio
from litellm import acompletion

async def get_response():
    response = await acompletion(
        model="gpt-4",
        messages=[{"role": "user", "content": "Explain async programming"}],
        temperature=0.7
    )
    return response.choices[0].message.content

result = asyncio.run(get_response())
print(result)

Concurrent Requests

import asyncio
from litellm import acompletion

async def get_multiple_responses():
    # Create multiple tasks
    tasks = [
        acompletion(
            model="gpt-4",
            messages=[{"role": "user", "content": f"What is {i} + {i}?"}]
        )
        for i in range(5)
    ]
    
    # Run concurrently
    responses = await asyncio.gather(*tasks)
    
    # Process results
    for i, response in enumerate(responses):
        print(f"Response {i}: {response.choices[0].message.content}")

asyncio.run(get_multiple_responses())

Multiple Models Concurrently

import asyncio
from litellm import acompletion

async def compare_models():
    question = "What is the capital of France?"
    
    # Query different models concurrently
    tasks = [
        acompletion(model="gpt-4", messages=[{"role": "user", "content": question}]),
        acompletion(model="claude-3-5-sonnet-20241022", messages=[{"role": "user", "content": question}]),
        acompletion(model="gemini-pro", messages=[{"role": "user", "content": question}]),
    ]
    
    responses = await asyncio.gather(*tasks)
    
    models = ["GPT-4", "Claude", "Gemini"]
    for model, response in zip(models, responses):
        print(f"{model}: {response.choices[0].message.content}")

asyncio.run(compare_models())

Error Handling with Async

import asyncio
from litellm import acompletion
from litellm.exceptions import RateLimitError, APIError

async def robust_completion(model: str, messages: list):
    try:
        response = await acompletion(model=model, messages=messages)
        return response.choices[0].message.content
    except RateLimitError:
        print(f"Rate limit hit for {model}, retrying...")
        await asyncio.sleep(5)
        return await robust_completion(model, messages)
    except APIError as e:
        print(f"API error for {model}: {e}")
        return None

async def main():
    result = await robust_completion(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}]
    )
    print(result)

asyncio.run(main())

Async with Retries

import asyncio
from litellm import acompletion

async def completion_with_retry(model: str, messages: list, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            response = await acompletion(
                model=model,
                messages=messages,
                timeout=30.0
            )
            return response
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            print(f"Attempt {attempt + 1} failed: {e}")
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

async def main():
    response = await completion_with_retry(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}]
    )
    print(response.choices[0].message.content)

asyncio.run(main())

Async Streaming

Basic Async Streaming

import asyncio
from litellm import acompletion

async def stream_response():
    response = await acompletion(
        model="gpt-4",
        messages=[{"role": "user", "content": "Write a short poem"}],
        stream=True
    )
    
    async for chunk in response:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(stream_response())

Multiple Concurrent Streams

import asyncio
from litellm import acompletion

async def stream_task(task_id: int, prompt: str):
    print(f"\nStarting task {task_id}...")
    response = await acompletion(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    result = ""
    async for chunk in response:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            result += content
    
    print(f"\nTask {task_id} complete: {result[:50]}...")
    return result

async def main():
    tasks = [
        stream_task(1, "Count to 5"),
        stream_task(2, "Name 3 colors"),
        stream_task(3, "Say hello in 3 languages")
    ]
    
    results = await asyncio.gather(*tasks)
    print("\n\nAll tasks completed!")

asyncio.run(main())

Async with Other Operations

Async Embeddings

import asyncio
from litellm import aembedding

async def get_embeddings():
    texts = [
        "The quick brown fox",
        "jumps over the lazy dog",
        "Machine learning is fascinating"
    ]
    
    tasks = [
        aembedding(model="text-embedding-ada-002", input=[text])
        for text in texts
    ]
    
    responses = await asyncio.gather(*tasks)
    
    for text, response in zip(texts, responses):
        embedding = response.data[0].embedding
        print(f"Text: {text}")
        print(f"Embedding dimensions: {len(embedding)}\n")

asyncio.run(get_embeddings())

Async Image Generation

import asyncio
from litellm import aimage_generation

async def generate_images():
    prompts = [
        "A serene mountain landscape",
        "A futuristic cityscape",
        "An abstract painting"
    ]
    
    tasks = [
        aimage_generation(model="dall-e-3", prompt=prompt)
        for prompt in prompts
    ]
    
    responses = await asyncio.gather(*tasks)
    
    for prompt, response in zip(prompts, responses):
        print(f"Generated image for: {prompt}")
        print(f"URL: {response.data[0].url}\n")

asyncio.run(generate_images())

Performance Optimization

Semaphore for Rate Limiting

import asyncio
from litellm import acompletion

async def limited_completion(semaphore: asyncio.Semaphore, model: str, messages: list):
    async with semaphore:
        response = await acompletion(model=model, messages=messages)
        return response.choices[0].message.content

async def main():
    # Limit to 5 concurrent requests
    semaphore = asyncio.Semaphore(5)
    
    tasks = [
        limited_completion(
            semaphore,
            "gpt-4",
            [{"role": "user", "content": f"What is {i}?"}]
        )
        for i in range(20)
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} requests")

asyncio.run(main())

Timeout and Cancellation

import asyncio
from litellm import acompletion

async def main():
    try:
        # Set overall timeout for the operation
        response = await asyncio.wait_for(
            acompletion(
                model="gpt-4",
                messages=[{"role": "user", "content": "Write a long story"}],
                max_tokens=1000
            ),
            timeout=10.0  # 10 second timeout
        )
        print(response.choices[0].message.content)
    except asyncio.TimeoutError:
        print("Request timed out")

asyncio.run(main())

Batch Processing with Chunks

import asyncio
from litellm import acompletion
from typing import List

async def process_batch(items: List[str], batch_size: int = 5):
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        print(f"Processing batch {i // batch_size + 1}...")
        
        tasks = [
            acompletion(
                model="gpt-4",
                messages=[{"role": "user", "content": item}]
            )
            for item in batch
        ]
        
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
        
        # Small delay between batches
        if i + batch_size < len(items):
            await asyncio.sleep(1)
    
    return results

async def main():
    items = [f"Question {i}" for i in range(15)]
    results = await process_batch(items, batch_size=5)
    print(f"Processed {len(results)} items")

asyncio.run(main())

Integration Examples

With FastAPI

from fastapi import FastAPI
from litellm import acompletion
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4"

@app.post("/chat")
async def chat(request: ChatRequest):
    response = await acompletion(
        model=request.model,
        messages=[{"role": "user", "content": request.message}]
    )
    return {"response": response.choices[0].message.content}

# Run with: uvicorn app:app --reload

With asyncio and aiohttp

import asyncio
import aiohttp
from litellm import acompletion

async def fetch_and_analyze(session: aiohttp.ClientSession, url: str):
    # Fetch content
    async with session.get(url) as response:
        content = await response.text()
    
    # Analyze with LLM
    llm_response = await acompletion(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"Summarize this: {content[:500]}"
        }]
    )
    
    return llm_response.choices[0].message.content

async def main():
    urls = [
        "https://example.com/article1",
        "https://example.com/article2"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_and_analyze(session, url) for url in urls]
        summaries = await asyncio.gather(*tasks)
        
        for url, summary in zip(urls, summaries):
            print(f"{url}:\n{summary}\n")

asyncio.run(main())

Best Practices

  1. Use connection pooling: Reuse HTTP connections for better performance
  2. Set appropriate timeouts: Prevent hanging requests with timeout parameters
  3. Implement rate limiting: Use semaphores to control concurrent requests
  4. Handle errors gracefully: Always wrap async calls in try-except blocks
  5. Use asyncio.gather(): For concurrent operations with no dependencies
  6. Monitor resource usage: Async doesn’t mean unlimited concurrency

Troubleshooting

Common Issues

RuntimeError: This event loop is already running
# Don't nest asyncio.run()
# Instead, await directly if already in async context
async def main():
    response = await acompletion(...)  # ✅ Correct
    # asyncio.run(acompletion(...))     # ❌ Wrong
Memory Issues with Many Concurrent Requests
# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
Timeout Errors
# Set appropriate timeouts
response = await acompletion(
    model="gpt-4",
    messages=[...],
    timeout=60.0  # 60 seconds
)

Build docs developers (and LLMs) love