Skip to main content
For better performance with I/O-bound database operations, FastAPI supports async database operations using async SQLAlchemy and asyncio.
Async database operations are especially beneficial when:
  • Handling many concurrent requests
  • Working with slow database queries
  • Building high-throughput APIs

Why Async Databases?

Async operations allow your application to handle other requests while waiting for database I/O:
  • Sync: Request waits for database → blocks thread → limited concurrency
  • Async: Request waits for database → thread handles other requests → higher concurrency
If your database operations are fast and your traffic is low, regular sync operations might be simpler and sufficient.

Install Dependencies

Install SQLModel with async support:
pip install sqlmodel aiosqlite  # For async SQLite
# or
pip install sqlmodel asyncpg   # For async PostgreSQL

Async Database Setup

1

Create an Async Engine

Use create_async_engine instead of create_engine:
from sqlalchemy.ext.asyncio import create_async_engine

# SQLite with aiosqlite
sqlite_url = "sqlite+aiosqlite:///./database.db"
engine = create_async_engine(sqlite_url, echo=True)

# PostgreSQL with asyncpg
# postgres_url = "postgresql+asyncpg://user:password@localhost/dbname"
# engine = create_async_engine(postgres_url)
Notice the different URL format: sqlite+aiosqlite:// or postgresql+asyncpg:// - the driver must support async operations.
2

Define Models (Same as Sync)

Model definitions remain the same:
from sqlmodel import SQLModel, Field

class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    age: int | None = Field(default=None, index=True)
    secret_name: str
3

Create Tables Asynchronously

Use an async function to create tables:
async def create_db_and_tables():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

@app.on_event("startup")
async def on_startup():
    await create_db_and_tables()
Note the async/await keywords and run_sync() wrapper.

Async Session Dependency

Create an async session dependency for FastAPI:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from typing import AsyncGenerator

async_session = async_sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session

Using Annotated for Type Hints

from typing import Annotated
from fastapi import Depends

AsyncSessionDep = Annotated[AsyncSession, Depends(get_session)]
expire_on_commit=False prevents SQLAlchemy from expiring objects after commit, which is useful when you want to access them after the transaction.

Async CRUD Operations

All database operations must use async/await syntax.

Create - Async Add Records

@app.post("/heroes/")
async def create_hero(hero: Hero, session: AsyncSessionDep) -> Hero:
    session.add(hero)
    await session.commit()
    await session.refresh(hero)
    return hero
Key differences from sync:
  • Function is async def
  • await session.commit()
  • await session.refresh()

Read - Async Queries

from sqlmodel import select
from fastapi import Query

@app.get("/heroes/")
async def read_heroes(
    session: AsyncSessionDep,
    offset: int = 0,
    limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
    result = await session.execute(
        select(Hero).offset(offset).limit(limit)
    )
    heroes = result.scalars().all()
    return heroes
Notable changes:
  • Use session.execute() with await
  • Call .scalars().all() to get the results

Get by Primary Key

from fastapi import HTTPException

@app.get("/heroes/{hero_id}")
async def read_hero(hero_id: int, session: AsyncSessionDep) -> Hero:
    hero = await session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero
session.get() becomes await session.get().

Update - Async Modifications

@app.patch("/heroes/{hero_id}")
async def update_hero(
    hero_id: int, 
    hero: HeroUpdate, 
    session: AsyncSessionDep
):
    hero_db = await session.get(Hero, hero_id)
    if not hero_db:
        raise HTTPException(status_code=404, detail="Hero not found")
    
    hero_data = hero.model_dump(exclude_unset=True)
    hero_db.sqlmodel_update(hero_data)
    
    session.add(hero_db)
    await session.commit()
    await session.refresh(hero_db)
    return hero_db

Delete - Async Removal

@app.delete("/heroes/{hero_id}")
async def delete_hero(hero_id: int, session: AsyncSessionDep):
    hero = await session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    
    await session.delete(hero)
    await session.commit()
    return {"ok": True}

Complete Async Example

from typing import Annotated, AsyncGenerator
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlalchemy.ext.asyncio import (
    AsyncSession, 
    create_async_engine,
    async_sessionmaker
)
from sqlmodel import Field, SQLModel, select

class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    age: int | None = Field(default=None, index=True)
    secret_name: str

sqlite_url = "sqlite+aiosqlite:///./database.db"
engine = create_async_engine(sqlite_url, echo=True)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

async def create_db_and_tables():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session

AsyncSessionDep = Annotated[AsyncSession, Depends(get_session)]

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    await create_db_and_tables()

@app.post("/heroes/")
async def create_hero(hero: Hero, session: AsyncSessionDep) -> Hero:
    session.add(hero)
    await session.commit()
    await session.refresh(hero)
    return hero

@app.get("/heroes/")
async def read_heroes(
    session: AsyncSessionDep,
    offset: int = 0,
    limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
    result = await session.execute(
        select(Hero).offset(offset).limit(limit)
    )
    heroes = result.scalars().all()
    return heroes

@app.get("/heroes/{hero_id}")
async def read_hero(hero_id: int, session: AsyncSessionDep) -> Hero:
    hero = await session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero

@app.delete("/heroes/{hero_id}")
async def delete_hero(hero_id: int, session: AsyncSessionDep):
    hero = await session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    await session.delete(hero)
    await session.commit()
    return {"ok": True}

Advanced Async Patterns

Concurrent Queries

Run multiple independent queries in parallel:
import asyncio

@app.get("/dashboard")
async def get_dashboard(session: AsyncSessionDep):
    # Run queries concurrently
    heroes_task = session.execute(select(Hero))
    count_task = session.execute(select(func.count(Hero.id)))
    
    heroes_result, count_result = await asyncio.gather(
        heroes_task,
        count_task
    )
    
    return {
        "heroes": heroes_result.scalars().all(),
        "total": count_result.scalar()
    }

Transaction Management

Explicitly manage transactions for complex operations:
@app.post("/transfer")
async def transfer_power(from_id: int, to_id: int, session: AsyncSessionDep):
    async with session.begin():
        hero_from = await session.get(Hero, from_id)
        hero_to = await session.get(Hero, to_id)
        
        if not hero_from or not hero_to:
            raise HTTPException(status_code=404, detail="Hero not found")
        
        # Your complex transaction logic here
        # Will auto-commit if no exception, auto-rollback on exception

Performance Comparison

Async databases aren’t always faster for single requests - they excel at handling many concurrent requests.
ScenarioSync PerformanceAsync Performance
Single requestSimilarSimilar
100 concurrent requestsLimited by threadsMuch better
CPU-heavy operationsBetterNo benefit
I/O-heavy operationsGoodExcellent

Database Connection Pooling

Configure connection pool for production:
engine = create_async_engine(
    database_url,
    pool_size=20,          # Number of connections to maintain
    max_overflow=10,       # Additional connections when needed
    pool_pre_ping=True,    # Verify connections before use
    pool_recycle=3600,     # Recycle connections after 1 hour
)

Common Pitfalls

Don’t mix sync and async:
  • Don’t call async functions without await
  • Don’t use sync database calls in async endpoints
  • Don’t use async database calls in sync endpoints
# ❌ Wrong - Missing await
async def wrong():
    hero = session.get(Hero, 1)  # Missing await!

# ✅ Correct
async def correct():
    hero = await session.get(Hero, 1)

Next Steps

  • Explore database migrations with Alembic
  • Learn about SQLModel relationships and joins
  • Implement caching strategies for better performance
  • Set up database connection pooling for production

Build docs developers (and LLMs) love