For better performance with I/O-bound database operations, FastAPI supports async database operations using async SQLAlchemy and asyncio.
Async database operations are especially beneficial when:
- Handling many concurrent requests
- Working with slow database queries
- Building high-throughput APIs
Why Async Databases?
Async operations allow your application to handle other requests while waiting for database I/O:
- Sync: Request waits for database → blocks thread → limited concurrency
- Async: Request waits for database → thread handles other requests → higher concurrency
If your database operations are fast and your traffic is low, regular sync operations might be simpler and sufficient.
Install Dependencies
Install SQLModel with async support:
pip install sqlmodel aiosqlite # For async SQLite
# or
pip install sqlmodel asyncpg # For async PostgreSQL
Async Database Setup
Create an Async Engine
Use create_async_engine instead of create_engine:from sqlalchemy.ext.asyncio import create_async_engine
# SQLite with aiosqlite
sqlite_url = "sqlite+aiosqlite:///./database.db"
engine = create_async_engine(sqlite_url, echo=True)
# PostgreSQL with asyncpg
# postgres_url = "postgresql+asyncpg://user:password@localhost/dbname"
# engine = create_async_engine(postgres_url)
Notice the different URL format: sqlite+aiosqlite:// or postgresql+asyncpg:// - the driver must support async operations.
Define Models (Same as Sync)
Model definitions remain the same:from sqlmodel import SQLModel, Field
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
age: int | None = Field(default=None, index=True)
secret_name: str
Create Tables Asynchronously
Use an async function to create tables:async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
@app.on_event("startup")
async def on_startup():
await create_db_and_tables()
Note the async/await keywords and run_sync() wrapper.
Async Session Dependency
Create an async session dependency for FastAPI:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from typing import AsyncGenerator
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
Using Annotated for Type Hints
from typing import Annotated
from fastapi import Depends
AsyncSessionDep = Annotated[AsyncSession, Depends(get_session)]
expire_on_commit=False prevents SQLAlchemy from expiring objects after commit, which is useful when you want to access them after the transaction.
Async CRUD Operations
All database operations must use async/await syntax.
Create - Async Add Records
@app.post("/heroes/")
async def create_hero(hero: Hero, session: AsyncSessionDep) -> Hero:
session.add(hero)
await session.commit()
await session.refresh(hero)
return hero
Key differences from sync:
- Function is
async def
await session.commit()
await session.refresh()
Read - Async Queries
from sqlmodel import select
from fastapi import Query
@app.get("/heroes/")
async def read_heroes(
session: AsyncSessionDep,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
result = await session.execute(
select(Hero).offset(offset).limit(limit)
)
heroes = result.scalars().all()
return heroes
Notable changes:
- Use
session.execute() with await
- Call
.scalars().all() to get the results
Get by Primary Key
from fastapi import HTTPException
@app.get("/heroes/{hero_id}")
async def read_hero(hero_id: int, session: AsyncSessionDep) -> Hero:
hero = await session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
session.get() becomes await session.get().
Update - Async Modifications
@app.patch("/heroes/{hero_id}")
async def update_hero(
hero_id: int,
hero: HeroUpdate,
session: AsyncSessionDep
):
hero_db = await session.get(Hero, hero_id)
if not hero_db:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.model_dump(exclude_unset=True)
hero_db.sqlmodel_update(hero_data)
session.add(hero_db)
await session.commit()
await session.refresh(hero_db)
return hero_db
Delete - Async Removal
@app.delete("/heroes/{hero_id}")
async def delete_hero(hero_id: int, session: AsyncSessionDep):
hero = await session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
await session.delete(hero)
await session.commit()
return {"ok": True}
Complete Async Example
from typing import Annotated, AsyncGenerator
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker
)
from sqlmodel import Field, SQLModel, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
age: int | None = Field(default=None, index=True)
secret_name: str
sqlite_url = "sqlite+aiosqlite:///./database.db"
engine = create_async_engine(sqlite_url, echo=True)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
AsyncSessionDep = Annotated[AsyncSession, Depends(get_session)]
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await create_db_and_tables()
@app.post("/heroes/")
async def create_hero(hero: Hero, session: AsyncSessionDep) -> Hero:
session.add(hero)
await session.commit()
await session.refresh(hero)
return hero
@app.get("/heroes/")
async def read_heroes(
session: AsyncSessionDep,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
result = await session.execute(
select(Hero).offset(offset).limit(limit)
)
heroes = result.scalars().all()
return heroes
@app.get("/heroes/{hero_id}")
async def read_hero(hero_id: int, session: AsyncSessionDep) -> Hero:
hero = await session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.delete("/heroes/{hero_id}")
async def delete_hero(hero_id: int, session: AsyncSessionDep):
hero = await session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
await session.delete(hero)
await session.commit()
return {"ok": True}
Advanced Async Patterns
Concurrent Queries
Run multiple independent queries in parallel:
import asyncio
@app.get("/dashboard")
async def get_dashboard(session: AsyncSessionDep):
# Run queries concurrently
heroes_task = session.execute(select(Hero))
count_task = session.execute(select(func.count(Hero.id)))
heroes_result, count_result = await asyncio.gather(
heroes_task,
count_task
)
return {
"heroes": heroes_result.scalars().all(),
"total": count_result.scalar()
}
Transaction Management
Explicitly manage transactions for complex operations:
@app.post("/transfer")
async def transfer_power(from_id: int, to_id: int, session: AsyncSessionDep):
async with session.begin():
hero_from = await session.get(Hero, from_id)
hero_to = await session.get(Hero, to_id)
if not hero_from or not hero_to:
raise HTTPException(status_code=404, detail="Hero not found")
# Your complex transaction logic here
# Will auto-commit if no exception, auto-rollback on exception
Async databases aren’t always faster for single requests - they excel at handling many concurrent requests.
| Scenario | Sync Performance | Async Performance |
|---|
| Single request | Similar | Similar |
| 100 concurrent requests | Limited by threads | Much better |
| CPU-heavy operations | Better | No benefit |
| I/O-heavy operations | Good | Excellent |
Database Connection Pooling
Configure connection pool for production:
engine = create_async_engine(
database_url,
pool_size=20, # Number of connections to maintain
max_overflow=10, # Additional connections when needed
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
)
Common Pitfalls
Don’t mix sync and async:
- Don’t call async functions without
await
- Don’t use sync database calls in async endpoints
- Don’t use async database calls in sync endpoints
# ❌ Wrong - Missing await
async def wrong():
hero = session.get(Hero, 1) # Missing await!
# ✅ Correct
async def correct():
hero = await session.get(Hero, 1)
Next Steps
- Explore database migrations with Alembic
- Learn about SQLModel relationships and joins
- Implement caching strategies for better performance
- Set up database connection pooling for production