The Database class provides an async interface to the SQLite database that stores user information, active conversations, messages, and message links.
Class: Database
Constructor
Creates a new Database instance.
Path to the SQLite database file
Example:
db = Database("./support_bot.sqlite3")
Connection Management
connect()
await db.connect() -> None
Establishes connection to the database and enables WAL mode and foreign keys.
Example:
await db.connect()
await db.init() # Initialize schema
close()
Closes the database connection if open.
Example:
transaction()
async with db.transaction():
# Your operations here
Provides a transaction context manager that automatically commits on success or rolls back on exception. Uses a lock to prevent concurrent transactions.
Example:
async with db.transaction():
await db.upsert_user(user_id=123, username="alice", first_name="Alice", last_name=None, commit=False)
await db.log_message(
user_id=123,
direction="user",
chat_id=123,
message_id=456,
content_type="text",
text="Hello",
caption=None,
file_id=None,
payload_json=None,
commit=False
)
Schema Initialization
init()
Initializes the database schema by creating tables and indexes:
users: Stores user profiles (user_id, username, first_name, last_name)
conversations: Stores active conversation mappings (user_id → topic_id)
messages: Logs all messages with full metadata
message_links: Maps messages between chats for reply threading
Example:
await db.connect()
await db.init()
User Management
upsert_user()
await db.upsert_user(
user_id: int,
username: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
*,
commit: bool = True
) -> None
Inserts or updates a user record. If the user exists, updates their profile fields.
Telegram username (without @)
Whether to commit immediately (set to False when used in a transaction)
Example:
await db.upsert_user(
user_id=123456789,
username="alice",
first_name="Alice",
last_name="Smith"
)
Conversation Management
get_active_conversation()
await db.get_active_conversation(user_id: int) -> Conversation | None
Retrieves the active conversation for a user, if one exists.
A Conversation object with user_id, topic_id, and active fields, or None if no active conversation exists
Example:
conversation = await db.get_active_conversation(user_id=123456789)
if conversation:
print(f"User {conversation.user_id} has topic {conversation.topic_id}")
set_conversation()
await db.set_conversation(user_id: int, topic_id: int, active: bool = True) -> None
Creates or updates a conversation mapping between a user and a forum topic.
Forum topic (thread) ID in the operator group
Whether the conversation is active
Example:
await db.set_conversation(user_id=123456789, topic_id=456, active=True)
deactivate_conversation()
await db.deactivate_conversation(user_id: int) -> None
Marks a user’s conversation as inactive. Used when a topic is closed or deleted.
Example:
await db.deactivate_conversation(user_id=123456789)
find_user_id_by_topic()
await db.find_user_id_by_topic(topic_id: int) -> int | None
Finds the user ID associated with an active topic in the operator group.
The user ID associated with the topic, or None if not found or inactive
Example:
user_id = await db.find_user_id_by_topic(topic_id=456)
if user_id:
print(f"Topic 456 belongs to user {user_id}")
Message Logging
log_message()
await db.log_message(
*,
user_id: int,
direction: str,
chat_id: int,
message_id: int,
content_type: str,
text: str | None,
caption: str | None,
file_id: str | None,
payload_json: str | None,
commit: bool = True
) -> None
Logs a message to the database. If a message with the same chat_id and message_id already exists, the insert is ignored.
Message direction: "user" or "operator"
Telegram chat ID where the message was sent
Message content type (e.g., "text", "photo", "document")
Media caption if applicable
Telegram file ID for media messages
JSON-serialized message payload for complex content
Whether to commit immediately
Example:
await db.log_message(
user_id=123456789,
direction="user",
chat_id=123456789,
message_id=789,
content_type="text",
text="Hello, I need help",
caption=None,
file_id=None,
payload_json=None
)
log_message_link()
await db.log_message_link(
*,
user_id: int,
source_chat_id: int,
source_message_id: int,
target_chat_id: int,
target_message_id: int,
commit: bool = True
) -> None
Creates a bidirectional link between a source message and a target message. Used for preserving reply chains when copying messages between chats.
Whether to commit immediately
Example:
# Link user message (chat 123, msg 456) to operator topic message (chat -100789, msg 789)
await db.log_message_link(
user_id=123,
source_chat_id=123,
source_message_id=456,
target_chat_id=-100789,
target_message_id=789
)
find_linked_message_id()
await db.find_linked_message_id(
*,
source_chat_id: int,
source_message_id: int,
target_chat_id: int | None = None
) -> int | None
Finds the target message ID linked to a source message.
Optional target chat ID to filter by specific chat
The linked message ID in the target chat, or None if no link exists
Example:
# Find the operator topic message linked to a user message
target_msg_id = await db.find_linked_message_id(
source_chat_id=123,
source_message_id=456,
target_chat_id=-100789
)
log_user_message()
await db.log_user_message(
*,
user_id: int,
username: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
direction: str,
chat_id: int,
message_id: int,
content_type: str,
text: str | None,
caption: str | None,
file_id: str | None,
payload_json: str | None
) -> None
Convenience method that atomically upserts the user and logs their message in a single transaction.
Message direction: "user" or "operator"
Example:
# From handlers/user.py
await db.log_user_message(
user_id=message.from_user.id,
username=message.from_user.username,
first_name=message.from_user.first_name,
last_name=message.from_user.last_name,
direction="user",
chat_id=message.chat.id,
message_id=message.message_id,
content_type=message.content_type,
text=message.text,
caption=message.caption,
file_id=extract_file_id(message),
payload_json=safe_payload_json(message)
)
Health Check
healthcheck()
await db.healthcheck() -> dict[str, Any]
Performs a simple database health check by executing a query.
Dictionary with "ok" key set to True if the database is responding
Example:
result = await db.healthcheck()
if result["ok"]:
print("Database is healthy")
Data Models
Conversation
@dataclass(frozen=True)
class Conversation:
user_id: int
topic_id: int
active: bool
Represents an active conversation between a user and an operator topic.
Forum topic ID in the operator group
Whether the conversation is currently active