Overview
The AsyncOpenAI client enables asynchronous, non-blocking API calls using Python’s async/await syntax. This is ideal for:
- Making concurrent API requests
- Building async web applications (FastAPI, Sanic, etc.)
- Handling high-throughput workloads
- Avoiding blocking I/O in event loops
Basic Usage
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
asyncio.run(main())
The AsyncOpenAI client provides an identical API to OpenAI, with all methods returning awaitable coroutines.
Concurrent Requests
Make multiple API calls concurrently using asyncio.gather():
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def create_completion(prompt: str):
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
async def main():
prompts = [
"Explain quantum computing",
"What is machine learning?",
"Describe neural networks",
]
# Run all requests concurrently
results = await asyncio.gather(
*[create_completion(prompt) for prompt in prompts]
)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}")
print(f"Response: {result}\n")
asyncio.run(main())
Streaming with Async
Stream responses asynchronously using async for:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story"}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
asyncio.run(main())
See Streaming for more details on streaming responses.
Context Manager
Use async context managers for automatic resource cleanup:
import asyncio
from openai import AsyncOpenAI
async def main():
async with AsyncOpenAI() as client:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
# Client is automatically closed
asyncio.run(main())
Integration with Web Frameworks
FastAPI
from fastapi import FastAPI
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.post("/chat")
async def chat(message: str):
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}],
)
return {"response": response.choices[0].message.content}
@app.on_event("shutdown")
async def shutdown():
await client.close()
Sanic
from sanic import Sanic, response
from openai import AsyncOpenAI
app = Sanic("openai-app")
client = AsyncOpenAI()
@app.post("/chat")
async def chat(request):
message = request.json.get("message")
completion = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}],
)
return response.json({
"response": completion.choices[0].message.content
})
@app.listener("before_server_stop")
async def cleanup(app, loop):
await client.close()
Using aiohttp Transport
For improved concurrency performance, use the aiohttp HTTP transport:
pip install openai[aiohttp]
import asyncio
from openai import AsyncOpenAI, DefaultAioHttpClient
async def main():
async with AsyncOpenAI(
http_client=DefaultAioHttpClient(),
) as client:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
asyncio.run(main())
The aiohttp transport can provide better performance for high-concurrency workloads compared to the default httpx transport.
Error Handling
Handle errors in async code using try/except blocks:
import asyncio
import openai
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def safe_completion(prompt: str):
try:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
except openai.APIConnectionError as e:
print(f"Connection error: {e}")
return None
except openai.RateLimitError as e:
print(f"Rate limit exceeded: {e}")
return None
except openai.APIStatusError as e:
print(f"API error {e.status_code}: {e.message}")
return None
async def main():
result = await safe_completion("Hello!")
if result:
print(result)
asyncio.run(main())
See Error Handling for comprehensive error handling patterns.
Rate Limiting with Async
Implement rate limiting for concurrent requests:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
class RateLimiter:
def __init__(self, max_concurrent: int):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute(self, coro):
async with self.semaphore:
return await coro
async def main():
limiter = RateLimiter(max_concurrent=5)
prompts = [f"Question {i}" for i in range(20)]
async def create_completion(prompt):
return await limiter.execute(
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
)
results = await asyncio.gather(
*[create_completion(p) for p in prompts]
)
print(f"Completed {len(results)} requests")
asyncio.run(main())
Iterate through paginated results asynchronously:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
all_jobs = []
# Automatically fetches more pages as needed
async for job in client.fine_tuning.jobs.list(limit=20):
all_jobs.append(job)
print(f"Total jobs: {len(all_jobs)}")
asyncio.run(main())
Task Groups (Python 3.11+)
Use task groups for structured concurrency:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
)
task2 = tg.create_task(
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hi"}],
)
)
print(task1.result().choices[0].message.content)
print(task2.result().choices[0].message.content)
asyncio.run(main())
Best Practices
- Always use
await with async methods - forgetting it will return a coroutine object instead of the result
- Use
asyncio.gather() for concurrent requests, not sequential await calls
- Close the client properly using context managers or explicit
await client.close()
- Be mindful of rate limits when making many concurrent requests