Skip to main content
Asynchronous programming with async/await allows Python code to handle concurrent operations efficiently, particularly for I/O-bound tasks like network requests and file operations.

Core Concepts

Event Loop

The event loop is the central coordinator that manages and schedules async tasks:
import asyncio

# Create and run an event loop
event_loop = asyncio.new_event_loop()
event_loop.run_forever()
In practice, use asyncio.run() which handles the event loop for you:
import asyncio

async def main():
    print("Hello async world!")

asyncio.run(main())

Coroutines

Coroutines are functions defined with async def:
# Regular function
def regular_function():
    return "I run immediately"

# Coroutine function
async def coroutine_function():
    return "I need to be awaited"

# Calling creates a coroutine object, doesn't run it
coro = coroutine_function()
type(coro)  # <class 'coroutine'>

# Must await or schedule to actually run
result = asyncio.run(coroutine_function())

Getting Started

1
Create an Async Function
2
import asyncio

async def fetch_data():
    print("Start fetching")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Done fetching")
    return {"data": 42}
3
Run the Coroutine
4
# Method 1: Using asyncio.run() (recommended)
result = asyncio.run(fetch_data())

# Method 2: In an existing async context
async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())
5
Run Multiple Coroutines
6
import asyncio

async def task_one():
    await asyncio.sleep(1)
    return "Task one complete"

async def task_two():
    await asyncio.sleep(2)
    return "Task two complete"

async def main():
    # Run concurrently
    results = await asyncio.gather(
        task_one(),
        task_two()
    )
    print(results)
    # ['Task one complete', 'Task two complete']

asyncio.run(main())

The await Keyword

Awaiting Tasks

await pauses the current coroutine and lets the event loop run other tasks:
async def make_coffee():
    print("Grinding beans...")
    await asyncio.sleep(1)  # Yields control to event loop
    print("Brewing coffee...")
    await asyncio.sleep(2)  # Yields control again
    print("Coffee ready!")
    return "☕"

async def make_toast():
    print("Toasting bread...")
    await asyncio.sleep(1.5)
    print("Toast ready!")
    return "🍞"

async def make_breakfast():
    # Run both concurrently
    coffee, toast = await asyncio.gather(
        make_coffee(),
        make_toast()
    )
    print(f"Breakfast ready: {coffee} {toast}")

asyncio.run(make_breakfast())
Output:
Grinding beans...
Toasting bread...
Brewing coffee...
Toast ready!
Coffee ready!
Breakfast ready: ☕ 🍞

Awaiting Coroutines vs Tasks

Important distinction:
import asyncio

async def work():
    await asyncio.sleep(1)
    print("Work done")

async def main():
    # Awaiting coroutine - runs sequentially, doesn't yield control
    await work()
    await work()
    # Takes 2 seconds total
    
    # Creating tasks - runs concurrently
    task1 = asyncio.create_task(work())
    task2 = asyncio.create_task(work())
    await task1
    await task2
    # Takes 1 second total

Tasks

Creating Tasks

Tasks wrap coroutines and schedule them for execution:
import asyncio

async def fetch_user(user_id):
    await asyncio.sleep(1)
    return f"User {user_id} data"

async def main():
    # Create tasks
    task1 = asyncio.create_task(fetch_user(1))
    task2 = asyncio.create_task(fetch_user(2))
    task3 = asyncio.create_task(fetch_user(3))
    
    # Wait for all tasks
    results = await asyncio.gather(task1, task2, task3)
    print(results)
    # ['User 1 data', 'User 2 data', 'User 3 data']

asyncio.run(main())

Task Management

import asyncio

async def long_task():
    try:
        await asyncio.sleep(10)
        return "Task completed"
    except asyncio.CancelledError:
        print("Task was cancelled")
        raise

async def main():
    task = asyncio.create_task(long_task())
    
    # Wait a bit
    await asyncio.sleep(1)
    
    # Cancel the task
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Caught cancellation")

asyncio.run(main())

Waiting Strategies

asyncio.gather()

Run multiple coroutines concurrently, wait for all:
import asyncio

async def fetch(url):
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main():
    results = await asyncio.gather(
        fetch("url1"),
        fetch("url2"),
        fetch("url3")
    )
    print(results)
    # ['Content from url1', 'Content from url2', 'Content from url3']

asyncio.run(main())

asyncio.wait()

More control over completion:
import asyncio

async def task(n):
    await asyncio.sleep(n)
    return f"Task {n}"

async def main():
    tasks = [
        asyncio.create_task(task(1)),
        asyncio.create_task(task(2)),
        asyncio.create_task(task(3))
    ]
    
    # Wait for first task to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"Done: {len(done)}, Pending: {len(pending)}")
    # Done: 1, Pending: 2
    
    # Cancel pending tasks
    for task in pending:
        task.cancel()

asyncio.run(main())

asyncio.wait_for()

Set a timeout:
import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "Done"

async def main():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Async Context Managers

Use async with for resources that need async setup/cleanup:
import asyncio

class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(1)
        print("Connected!")
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)
        print("Closed!")
    
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"

async def main():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

asyncio.run(main())

Async Iterators

Use async for to iterate over async data sources:
import asyncio

class AsyncRange:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.current = start
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.5)  # Simulate async work
        value = self.current
        self.current += 1
        return value

async def main():
    async for num in AsyncRange(1, 5):
        print(num)

asyncio.run(main())

Error Handling

Try-Except with Async

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong!")

async def main():
    try:
        await risky_operation()
    except ValueError as e:
        print(f"Caught error: {e}")

asyncio.run(main())

Gathering with Exceptions

import asyncio

async def task_success():
    await asyncio.sleep(1)
    return "Success"

async def task_failure():
    await asyncio.sleep(1)
    raise ValueError("Failed")

async def main():
    # return_exceptions=True prevents gather from raising
    results = await asyncio.gather(
        task_success(),
        task_failure(),
        task_success(),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(main())

Real-World Examples

Concurrent HTTP Requests

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://api.example.com/users/1',
        'https://api.example.com/users/2',
        'https://api.example.com/users/3',
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for url, response in zip(urls, responses):
            print(f"{url}: {len(response)} bytes")

asyncio.run(main())

Async Database Operations

import asyncio
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect(
        user='user',
        password='password',
        database='mydb',
        host='localhost'
    )
    
    try:
        rows = await conn.fetch('SELECT * FROM users')
        return [dict(row) for row in rows]
    finally:
        await conn.close()

async def main():
    users = await fetch_users()
    for user in users:
        print(user)

asyncio.run(main())

Producer-Consumer Pattern

import asyncio
import random

async def producer(queue, n):
    for i in range(n):
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(random.random())
    
    # Signal completion
    await queue.put(None)

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        
        print(f"Consumed: {item}")
        await asyncio.sleep(random.random())
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # Run producer and consumer concurrently
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue)
    )

asyncio.run(main())

Custom Async Sleep

Understanding how async operations work internally:
import asyncio
import time

class YieldToEventLoop:
    """Simple awaitable that yields control."""
    def __await__(self):
        yield

async def custom_sleep(seconds):
    """Custom implementation of asyncio.sleep."""
    future = asyncio.Future()
    time_to_wake = time.time() + seconds
    
    async def watcher():
        while True:
            if time.time() >= time_to_wake:
                future.set_result(None)
                break
            else:
                await YieldToEventLoop()
    
    asyncio.create_task(watcher())
    await future

async def main():
    print(f"Starting at {time.strftime('%H:%M:%S')}")
    await custom_sleep(2)
    print(f"Finished at {time.strftime('%H:%M:%S')}")

asyncio.run(main())

Best Practices

When to use async/await:
  • ✅ I/O-bound operations (network requests, file I/O, database queries)
  • ✅ Handling many concurrent connections
  • ✅ Web servers and APIs
  • ✅ Websockets and real-time applications
  • ❌ CPU-bound tasks (use multiprocessing instead)
  • ❌ Simple scripts (adds unnecessary complexity)
  • ❌ Blocking libraries (use asyncio-compatible alternatives)
Common pitfalls:
  1. Blocking the event loop:
    # Bad - blocks event loop
    async def bad():
        time.sleep(1)  # Blocks!
    
    # Good - yields control
    async def good():
        await asyncio.sleep(1)
    
  2. Forgetting to await:
    # Bad - creates coroutine but doesn't run it
    async def bad():
        fetch_data()  # Warning: coroutine never awaited
    
    # Good
    async def good():
        await fetch_data()
    
  3. Not creating tasks for concurrency:
    # Bad - runs sequentially
    async def bad():
        await task1()
        await task2()
    
    # Good - runs concurrently
    async def good():
        await asyncio.gather(task1(), task2())
    

Debugging

Enable Debug Mode

import asyncio

async def main():
    # Your async code
    pass

# Enable debug mode
asyncio.run(main(), debug=True)
Debug mode will:
  • Log slow coroutines (>100ms)
  • Warn about unawaited coroutines
  • Track task creation locations

Check Running Tasks

import asyncio

async def monitor_tasks():
    while True:
        tasks = asyncio.all_tasks()
        print(f"Active tasks: {len(tasks)}")
        for task in tasks:
            print(f"  - {task.get_name()}")
        await asyncio.sleep(5)

Summary

Key takeaways:
  1. Use async def to create coroutines
  2. Use await to call async functions and yield control
  3. Create tasks with asyncio.create_task() for concurrency
  4. Use asyncio.gather() to wait for multiple tasks
  5. Always await coroutines or create tasks from them
  6. Never use blocking operations in async code
  7. Use asyncio.run() to start your async program

Build docs developers (and LLMs) love