Skip to main content

Overview

The AsyncOpenAI client enables asynchronous, non-blocking API calls using Python’s async/await syntax. This is ideal for:
  • Making concurrent API requests
  • Building async web applications (FastAPI, Sanic, etc.)
  • Handling high-throughput workloads
  • Avoiding blocking I/O in event loops

Basic Usage

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello!"}],
    )
    print(response.choices[0].message.content)

asyncio.run(main())
The AsyncOpenAI client provides an identical API to OpenAI, with all methods returning awaitable coroutines.

Concurrent Requests

Make multiple API calls concurrently using asyncio.gather():
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def create_completion(prompt: str):
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing",
        "What is machine learning?",
        "Describe neural networks",
    ]
    
    # Run all requests concurrently
    results = await asyncio.gather(
        *[create_completion(prompt) for prompt in prompts]
    )
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}")
        print(f"Response: {result}\n")

asyncio.run(main())

Streaming with Async

Stream responses asynchronously using async for:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Write a story"}],
        stream=True,
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="")

asyncio.run(main())
See Streaming for more details on streaming responses.

Context Manager

Use async context managers for automatic resource cleanup:
import asyncio
from openai import AsyncOpenAI

async def main():
    async with AsyncOpenAI() as client:
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": "Hello!"}],
        )
        print(response.choices[0].message.content)
    # Client is automatically closed

asyncio.run(main())

Integration with Web Frameworks

FastAPI

from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat")
async def chat(message: str):
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": message}],
    )
    return {"response": response.choices[0].message.content}

@app.on_event("shutdown")
async def shutdown():
    await client.close()

Sanic

from sanic import Sanic, response
from openai import AsyncOpenAI

app = Sanic("openai-app")
client = AsyncOpenAI()

@app.post("/chat")
async def chat(request):
    message = request.json.get("message")
    
    completion = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": message}],
    )
    
    return response.json({
        "response": completion.choices[0].message.content
    })

@app.listener("before_server_stop")
async def cleanup(app, loop):
    await client.close()

Using aiohttp Transport

For improved concurrency performance, use the aiohttp HTTP transport:
pip install openai[aiohttp]
import asyncio
from openai import AsyncOpenAI, DefaultAioHttpClient

async def main():
    async with AsyncOpenAI(
        http_client=DefaultAioHttpClient(),
    ) as client:
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": "Hello!"}],
        )
        print(response.choices[0].message.content)

asyncio.run(main())
The aiohttp transport can provide better performance for high-concurrency workloads compared to the default httpx transport.

Error Handling

Handle errors in async code using try/except blocks:
import asyncio
import openai
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def safe_completion(prompt: str):
    try:
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content
    except openai.APIConnectionError as e:
        print(f"Connection error: {e}")
        return None
    except openai.RateLimitError as e:
        print(f"Rate limit exceeded: {e}")
        return None
    except openai.APIStatusError as e:
        print(f"API error {e.status_code}: {e.message}")
        return None

async def main():
    result = await safe_completion("Hello!")
    if result:
        print(result)

asyncio.run(main())
See Error Handling for comprehensive error handling patterns.

Rate Limiting with Async

Implement rate limiting for concurrent requests:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

class RateLimiter:
    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute(self, coro):
        async with self.semaphore:
            return await coro

async def main():
    limiter = RateLimiter(max_concurrent=5)
    
    prompts = [f"Question {i}" for i in range(20)]
    
    async def create_completion(prompt):
        return await limiter.execute(
            client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
            )
        )
    
    results = await asyncio.gather(
        *[create_completion(p) for p in prompts]
    )
    
    print(f"Completed {len(results)} requests")

asyncio.run(main())

Pagination with Async

Iterate through paginated results asynchronously:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    all_jobs = []
    
    # Automatically fetches more pages as needed
    async for job in client.fine_tuning.jobs.list(limit=20):
        all_jobs.append(job)
    
    print(f"Total jobs: {len(all_jobs)}")

asyncio.run(main())

Task Groups (Python 3.11+)

Use task groups for structured concurrency:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": "Hello"}],
            )
        )
        task2 = tg.create_task(
            client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": "Hi"}],
            )
        )
    
    print(task1.result().choices[0].message.content)
    print(task2.result().choices[0].message.content)

asyncio.run(main())

Best Practices

  • Always use await with async methods - forgetting it will return a coroutine object instead of the result
  • Use asyncio.gather() for concurrent requests, not sequential await calls
  • Close the client properly using context managers or explicit await client.close()
  • Be mindful of rate limits when making many concurrent requests

Build docs developers (and LLMs) love