The bot uses SQLite with Write-Ahead Logging (WAL) mode to store all persistent data. This page explains the database schema, design decisions, and transaction handling.
Why SQLite
SQLite is ideal for this bot because:
- Simple deployment - No separate database server required
- Zero configuration - Works out of the box with just a file path
- Reliable - ACID transactions ensure data consistency
- Fast - Local file access with no network overhead
- Sufficient scale - Handles thousands of users easily
db_path = os.getenv("DB_PATH", "./support_bot.sqlite3")
The database is a single file that can be easily backed up or moved.
WAL mode
The database is configured with Write-Ahead Logging for better concurrent access:
async def connect(self) -> None:
self._conn = await aiosqlite.connect(self._path)
await self._conn.execute("PRAGMA journal_mode=WAL;")
await self._conn.execute("PRAGMA foreign_keys=ON;")
await self._conn.commit()
Benefits of WAL mode
- Readers don’t block writers - Queries can run while updates happen
- Writers don’t block readers - Updates don’t pause queries
- Better concurrency - Multiple handlers can access the database simultaneously
- Crash recovery - Changes are durable even if the process crashes
WAL mode creates additional files (.wal and .shm) alongside the main database file. Don’t delete these while the bot is running.
Schema overview
The database has four main tables:
users # User profile information
conversations # Active topics for each user
messages # Complete message history (optional)
message_links # Maps messages between chats for replies
Users table
Stores user profile information:
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
Key points
user_id is the Telegram user ID (primary key)
username, first_name, and last_name can be NULL
- Timestamps use ISO 8601 format with timezone
- Records are updated on each message (username changes are tracked)
Upserting users
User records are created or updated automatically:
async def upsert_user(
self,
user_id: int,
username: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
*,
commit: bool = True,
) -> None:
now = _now_iso()
await self.conn.execute(
"""
INSERT INTO users (user_id, username, first_name, last_name, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
username=excluded.username,
first_name=excluded.first_name,
last_name=excluded.last_name,
updated_at=excluded.updated_at
""",
(user_id, username, first_name, last_name, now, now),
)
if commit:
await self.conn.commit()
The commit parameter allows upserting within a larger transaction.
Conversations table
Tracks which topic belongs to which user:
CREATE TABLE IF NOT EXISTS conversations (
user_id INTEGER PRIMARY KEY,
topic_id INTEGER NOT NULL,
active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(user_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_conversations_topic_id
ON conversations(topic_id);
Key points
- Each user can only have one active conversation (enforced by primary key)
topic_id is the Telegram forum topic thread ID
active is a boolean (1 = active, 0 = deactivated)
- Foreign key ensures user exists before creating conversation
- Index on
topic_id for fast operator→user lookups
Setting conversations
async def set_conversation(self, user_id: int, topic_id: int, active: bool = True) -> None:
now = _now_iso()
await self.conn.execute(
"""
INSERT INTO conversations (user_id, topic_id, active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
topic_id=excluded.topic_id,
active=excluded.active,
updated_at=excluded.updated_at
""",
(user_id, topic_id, 1 if active else 0, now, now),
)
await self.conn.commit()
Looking up conversations
By user ID:
async def get_active_conversation(self, user_id: int) -> Conversation | None:
cur = await self.conn.execute(
"SELECT user_id, topic_id, active FROM conversations WHERE user_id = ?",
(user_id,),
)
row = await cur.fetchone()
await cur.close()
if not row:
return None
conversation = Conversation(user_id=int(row[0]), topic_id=int(row[1]), active=bool(row[2]))
if not conversation.active:
return None
return conversation
By topic ID:
async def find_user_id_by_topic(self, topic_id: int) -> int | None:
cur = await self.conn.execute(
"SELECT user_id FROM conversations WHERE topic_id = ? AND active = 1",
(topic_id,),
)
row = await cur.fetchone()
await cur.close()
return int(row[0]) if row else None
Messages table
Optional table for logging all messages (enabled with LOG_MESSAGES=1):
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
direction TEXT NOT NULL, -- 'user' or 'operator'
chat_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
content_type TEXT NOT NULL,
text TEXT,
caption TEXT,
file_id TEXT,
payload_json TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(user_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_messages_user_id_created_at
ON messages(user_id, created_at);
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_chat_id_message_id_unique
ON messages(chat_id, message_id);
Key points
- Auto-incrementing
id for internal use
direction indicates whether message is from user or operator
chat_id and message_id uniquely identify the message in Telegram
content_type is the aiogram message type (text, photo, document, etc.)
text, caption, file_id store message content
payload_json stores the complete message object for advanced use cases
- Unique index prevents duplicate logging of the same message
Logging messages
async def log_message(
self,
*,
user_id: int,
direction: str,
chat_id: int,
message_id: int,
content_type: str,
text: str | None,
caption: str | None,
file_id: str | None,
payload_json: str | None,
commit: bool = True,
) -> None:
await self.conn.execute(
"""
INSERT INTO messages (
user_id, direction, chat_id, message_id, content_type,
text, caption, file_id, payload_json, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(chat_id, message_id) DO NOTHING
""",
(
user_id,
direction,
chat_id,
message_id,
content_type,
text,
caption,
file_id,
payload_json,
_now_iso(),
),
)
if commit:
await self.conn.commit()
Combined user and message logging
For convenience, there’s a method that upserts the user and logs the message in one transaction:
async def log_user_message(
self,
*,
user_id: int,
username: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
direction: str,
chat_id: int,
message_id: int,
content_type: str,
text: str | None,
caption: str | None,
file_id: str | None,
payload_json: str | None,
) -> None:
async with self.transaction():
await self.upsert_user(
user_id=user_id,
username=username,
first_name=first_name,
last_name=last_name,
commit=False,
)
await self.log_message(
user_id=user_id,
direction=direction,
chat_id=chat_id,
message_id=message_id,
content_type=content_type,
text=text,
caption=caption,
file_id=file_id,
payload_json=payload_json,
commit=False,
)
Message logging is optional. Set LOG_MESSAGES=0 to disable logging if you don’t need message history.
Message links table
Critical table for preserving reply chains across chats:
CREATE TABLE IF NOT EXISTS message_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
source_chat_id INTEGER NOT NULL,
source_message_id INTEGER NOT NULL,
target_chat_id INTEGER NOT NULL,
target_message_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(user_id) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_message_links_source_unique
ON message_links(source_chat_id, source_message_id);
CREATE INDEX IF NOT EXISTS idx_message_links_user_id
ON message_links(user_id);
Key points
- Links are stored bidirectionally (A→B and B→A)
- Unique index prevents duplicate links
- Lookups are fast due to the unique index
- Enables reply chains to work across different chats
Logging links
async def log_message_link(
self,
*,
user_id: int,
source_chat_id: int,
source_message_id: int,
target_chat_id: int,
target_message_id: int,
commit: bool = True,
) -> None:
await self.conn.execute(
"""
INSERT INTO message_links (
user_id, source_chat_id, source_message_id,
target_chat_id, target_message_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(source_chat_id, source_message_id) DO NOTHING
""",
(
user_id,
source_chat_id,
source_message_id,
target_chat_id,
target_message_id,
_now_iso(),
),
)
if commit:
await self.conn.commit()
Finding linked messages
async def find_linked_message_id(
self,
*,
source_chat_id: int,
source_message_id: int,
target_chat_id: int | None = None,
) -> int | None:
if target_chat_id is None:
cur = await self.conn.execute(
"""
SELECT target_message_id
FROM message_links
WHERE source_chat_id = ? AND source_message_id = ?
""",
(source_chat_id, source_message_id),
)
else:
cur = await self.conn.execute(
"""
SELECT target_message_id
FROM message_links
WHERE source_chat_id = ? AND source_message_id = ? AND target_chat_id = ?
""",
(source_chat_id, source_message_id, target_chat_id),
)
row = await cur.fetchone()
await cur.close()
return int(row[0]) if row else None
This enables the bot to find the corresponding message when handling replies. See Message flow for usage examples.
Transaction handling
The Database class provides a transaction context manager:
class Database:
def __init__(self, path: str) -> None:
self._path = path
self._conn: aiosqlite.Connection | None = None
self._tx_lock = asyncio.Lock()
@asynccontextmanager
async def transaction(self) -> Any:
async with self._tx_lock:
await self.conn.execute("BEGIN;")
try:
yield
except BaseException:
await self.conn.rollback()
raise
else:
await self.conn.commit()
Why transactions matter
Transactions ensure data consistency:
- Atomicity - All operations succeed or all fail
- Isolation - Concurrent transactions don’t interfere
- Durability - Committed changes persist even if the bot crashes
Example usage
Logging bidirectional message links in a transaction:
support_bot/topic_manager.py
async def _log_message_link(
self,
*,
user_id: int,
source_chat_id: int,
source_message_id: int,
target_chat_id: int,
target_message_id: int,
) -> None:
async with self._db.transaction():
await self._db.log_message_link(
user_id=user_id,
source_chat_id=source_chat_id,
source_message_id=source_message_id,
target_chat_id=target_chat_id,
target_message_id=target_message_id,
commit=False,
)
await self._db.log_message_link(
user_id=user_id,
source_chat_id=target_chat_id,
source_message_id=target_message_id,
target_chat_id=source_chat_id,
target_message_id=source_message_id,
commit=False,
)
If either insert fails, both are rolled back, ensuring links are always stored bidirectionally.
The transaction lock (_tx_lock) serializes all transactions. This prevents SQLite “database is locked” errors but means long-running transactions can block other operations.
Timestamps
All timestamps use ISO 8601 format with UTC timezone:
def _now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
Example: 2024-03-15T14:30:45.123456+00:00
This format:
- Is human-readable
- Sorts correctly as text
- Includes timezone information
- Is widely supported
Healthcheck
The database includes a simple healthcheck method:
async def healthcheck(self) -> dict[str, Any]:
cur = await self.conn.execute("SELECT 1;")
row = await cur.fetchone()
await cur.close()
return {"ok": row == (1,)}
This can be used by monitoring systems to verify the database is accessible.
Foreign keys
Foreign key constraints are enabled on connection:
await self._conn.execute("PRAGMA foreign_keys=ON;")
This ensures:
- Can’t create conversation for non-existent user
- Can’t log message for non-existent user
- Deleting a user cascades to their conversations, messages, and links
SQLite doesn’t enforce foreign keys by default, so this pragma is critical for data integrity.