Skip to main content
The Database class provides an async interface to the SQLite database that stores user information, active conversations, messages, and message links.

Class: Database

Constructor

Database(path: str)
Creates a new Database instance.
path
str
required
Path to the SQLite database file
Example:
db = Database("./support_bot.sqlite3")

Connection Management

connect()

await db.connect() -> None
Establishes connection to the database and enables WAL mode and foreign keys. Example:
await db.connect()
await db.init()  # Initialize schema

close()

await db.close() -> None
Closes the database connection if open. Example:
await db.close()

transaction()

async with db.transaction():
    # Your operations here
Provides a transaction context manager that automatically commits on success or rolls back on exception. Uses a lock to prevent concurrent transactions. Example:
async with db.transaction():
    await db.upsert_user(user_id=123, username="alice", first_name="Alice", last_name=None, commit=False)
    await db.log_message(
        user_id=123,
        direction="user",
        chat_id=123,
        message_id=456,
        content_type="text",
        text="Hello",
        caption=None,
        file_id=None,
        payload_json=None,
        commit=False
    )

Schema Initialization

init()

await db.init() -> None
Initializes the database schema by creating tables and indexes:
  • users: Stores user profiles (user_id, username, first_name, last_name)
  • conversations: Stores active conversation mappings (user_id → topic_id)
  • messages: Logs all messages with full metadata
  • message_links: Maps messages between chats for reply threading
Example:
await db.connect()
await db.init()

User Management

upsert_user()

await db.upsert_user(
    user_id: int,
    username: Optional[str],
    first_name: Optional[str],
    last_name: Optional[str],
    *,
    commit: bool = True
) -> None
Inserts or updates a user record. If the user exists, updates their profile fields.
user_id
int
required
Telegram user ID
username
Optional[str]
required
Telegram username (without @)
first_name
Optional[str]
required
User’s first name
last_name
Optional[str]
required
User’s last name
commit
bool
default:"True"
Whether to commit immediately (set to False when used in a transaction)
Example:
await db.upsert_user(
    user_id=123456789,
    username="alice",
    first_name="Alice",
    last_name="Smith"
)

Conversation Management

get_active_conversation()

await db.get_active_conversation(user_id: int) -> Conversation | None
Retrieves the active conversation for a user, if one exists.
user_id
int
required
Telegram user ID
return
Conversation | None
A Conversation object with user_id, topic_id, and active fields, or None if no active conversation exists
Example:
conversation = await db.get_active_conversation(user_id=123456789)
if conversation:
    print(f"User {conversation.user_id} has topic {conversation.topic_id}")

set_conversation()

await db.set_conversation(user_id: int, topic_id: int, active: bool = True) -> None
Creates or updates a conversation mapping between a user and a forum topic.
user_id
int
required
Telegram user ID
topic_id
int
required
Forum topic (thread) ID in the operator group
active
bool
default:"True"
Whether the conversation is active
Example:
await db.set_conversation(user_id=123456789, topic_id=456, active=True)

deactivate_conversation()

await db.deactivate_conversation(user_id: int) -> None
Marks a user’s conversation as inactive. Used when a topic is closed or deleted.
user_id
int
required
Telegram user ID
Example:
await db.deactivate_conversation(user_id=123456789)

find_user_id_by_topic()

await db.find_user_id_by_topic(topic_id: int) -> int | None
Finds the user ID associated with an active topic in the operator group.
topic_id
int
required
Forum topic (thread) ID
return
int | None
The user ID associated with the topic, or None if not found or inactive
Example:
user_id = await db.find_user_id_by_topic(topic_id=456)
if user_id:
    print(f"Topic 456 belongs to user {user_id}")

Message Logging

log_message()

await db.log_message(
    *,
    user_id: int,
    direction: str,
    chat_id: int,
    message_id: int,
    content_type: str,
    text: str | None,
    caption: str | None,
    file_id: str | None,
    payload_json: str | None,
    commit: bool = True
) -> None
Logs a message to the database. If a message with the same chat_id and message_id already exists, the insert is ignored.
user_id
int
required
Telegram user ID
direction
str
required
Message direction: "user" or "operator"
chat_id
int
required
Telegram chat ID where the message was sent
message_id
int
required
Telegram message ID
content_type
str
required
Message content type (e.g., "text", "photo", "document")
text
str | None
required
Message text content
caption
str | None
required
Media caption if applicable
file_id
str | None
required
Telegram file ID for media messages
payload_json
str | None
required
JSON-serialized message payload for complex content
commit
bool
default:"True"
Whether to commit immediately
Example:
await db.log_message(
    user_id=123456789,
    direction="user",
    chat_id=123456789,
    message_id=789,
    content_type="text",
    text="Hello, I need help",
    caption=None,
    file_id=None,
    payload_json=None
)
await db.log_message_link(
    *,
    user_id: int,
    source_chat_id: int,
    source_message_id: int,
    target_chat_id: int,
    target_message_id: int,
    commit: bool = True
) -> None
Creates a bidirectional link between a source message and a target message. Used for preserving reply chains when copying messages between chats.
user_id
int
required
Telegram user ID
source_chat_id
int
required
Source chat ID
source_message_id
int
required
Source message ID
target_chat_id
int
required
Target chat ID
target_message_id
int
required
Target message ID
commit
bool
default:"True"
Whether to commit immediately
Example:
# Link user message (chat 123, msg 456) to operator topic message (chat -100789, msg 789)
await db.log_message_link(
    user_id=123,
    source_chat_id=123,
    source_message_id=456,
    target_chat_id=-100789,
    target_message_id=789
)

find_linked_message_id()

await db.find_linked_message_id(
    *,
    source_chat_id: int,
    source_message_id: int,
    target_chat_id: int | None = None
) -> int | None
Finds the target message ID linked to a source message.
source_chat_id
int
required
Source chat ID
source_message_id
int
required
Source message ID
target_chat_id
int | None
default:"None"
Optional target chat ID to filter by specific chat
return
int | None
The linked message ID in the target chat, or None if no link exists
Example:
# Find the operator topic message linked to a user message
target_msg_id = await db.find_linked_message_id(
    source_chat_id=123,
    source_message_id=456,
    target_chat_id=-100789
)

log_user_message()

await db.log_user_message(
    *,
    user_id: int,
    username: Optional[str],
    first_name: Optional[str],
    last_name: Optional[str],
    direction: str,
    chat_id: int,
    message_id: int,
    content_type: str,
    text: str | None,
    caption: str | None,
    file_id: str | None,
    payload_json: str | None
) -> None
Convenience method that atomically upserts the user and logs their message in a single transaction.
user_id
int
required
Telegram user ID
username
Optional[str]
required
Telegram username
first_name
Optional[str]
required
User’s first name
last_name
Optional[str]
required
User’s last name
direction
str
required
Message direction: "user" or "operator"
chat_id
int
required
Telegram chat ID
message_id
int
required
Telegram message ID
content_type
str
required
Message content type
text
str | None
required
Message text
caption
str | None
required
Media caption
file_id
str | None
required
Telegram file ID
payload_json
str | None
required
JSON-serialized payload
Example:
# From handlers/user.py
await db.log_user_message(
    user_id=message.from_user.id,
    username=message.from_user.username,
    first_name=message.from_user.first_name,
    last_name=message.from_user.last_name,
    direction="user",
    chat_id=message.chat.id,
    message_id=message.message_id,
    content_type=message.content_type,
    text=message.text,
    caption=message.caption,
    file_id=extract_file_id(message),
    payload_json=safe_payload_json(message)
)

Health Check

healthcheck()

await db.healthcheck() -> dict[str, Any]
Performs a simple database health check by executing a query.
return
dict[str, Any]
Dictionary with "ok" key set to True if the database is responding
Example:
result = await db.healthcheck()
if result["ok"]:
    print("Database is healthy")

Data Models

Conversation

@dataclass(frozen=True)
class Conversation:
    user_id: int
    topic_id: int
    active: bool
Represents an active conversation between a user and an operator topic.
user_id
int
Telegram user ID
topic_id
int
Forum topic ID in the operator group
active
bool
Whether the conversation is currently active

Build docs developers (and LLMs) love