The bot uses Telegram’s forum topics feature to organize conversations with individual users. Each user gets their own topic in the operator group, creating a clean, organized support interface.
Topic creation
Topics are created on-demand when a user first messages the bot:
support_bot/topic_manager.py
async def ensure_topic(self, bot: Bot, user: User) -> TopicRef:
lock = await self._lock_for(user.id)
async with lock:
existing = await self._db.get_active_conversation(user.id)
if existing:
return TopicRef(user_id=user.id, topic_id=existing.topic_id)
topic = await bot.create_forum_topic(
chat_id=self._operator_group_id,
name=_topic_name(user),
)
await self._db.set_conversation(user_id=user.id, topic_id=topic.message_thread_id, active=True)
username_line = f"@{user.username}" if user.username else "—"
await bot.send_message(
chat_id=self._operator_group_id,
message_thread_id=topic.message_thread_id,
text=(
"New conversation.\n"
f"User: {user.full_name}\n"
f"ID: <code>{user.id}</code>\n"
f"Username: {username_line}"
),
)
return TopicRef(user_id=user.id, topic_id=topic.message_thread_id)
First-time flow
- User sends their first message
ensure_topic acquires a user-specific lock
- Database is checked for an active conversation
- If none exists, a new forum topic is created
- Conversation is stored in the database
- A welcome message is sent to the topic with user details
The topic includes the user’s ID in brackets so operators can easily identify users even if they change their display name.
Naming convention
Topic names follow a specific format defined in the _topic_name function:
support_bot/topic_manager.py
def _topic_name(user: User) -> str:
base = user.full_name or "User"
if user.username:
base = f"{base} (@{user.username})"
name = f"{base} [{user.id}]"
return name[:128]
Naming examples
- User with full name and username:
John Smith (@johnsmith) [123456789]
- User with only full name:
Jane Doe [987654321]
- User with no full name:
User [555555555]
Character limit
Telegram limits topic names to 128 characters, so the name is truncated:
This ensures topics are always created successfully even with very long names.
Topic lifecycle
Topics progress through several states during their lifetime:
Active state
When a topic is first created, it’s marked as active in the database:
async def set_conversation(self, user_id: int, topic_id: int, active: bool = True) -> None:
now = _now_iso()
await self.conn.execute(
"""
INSERT INTO conversations (user_id, topic_id, active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
topic_id=excluded.topic_id,
active=excluded.active,
updated_at=excluded.updated_at
""",
(user_id, topic_id, 1 if active else 0, now, now),
)
await self.conn.commit()
Deactivated state
Topics can be deactivated when they’re closed or deleted:
async def deactivate_conversation(self, user_id: int) -> None:
now = _now_iso()
await self.conn.execute(
"UPDATE conversations SET active=0, updated_at=? WHERE user_id=?",
(now, user_id),
)
await self.conn.commit()
When a topic is deactivated:
- The conversation record remains in the database
- The user will get a new topic on their next message
- Old message links are preserved for historical reference
Checking active conversations
The database only returns active conversations:
async def get_active_conversation(self, user_id: int) -> Conversation | None:
cur = await self.conn.execute(
"SELECT user_id, topic_id, active FROM conversations WHERE user_id = ?",
(user_id,),
)
row = await cur.fetchone()
await cur.close()
if not row:
return None
conversation = Conversation(user_id=int(row[0]), topic_id=int(row[1]), active=bool(row[2]))
if not conversation.active:
return None
return conversation
Handling closed topics
When operators close topics or they get deleted, the bot detects this and handles it gracefully.
Detecting missing topics
The bot checks for specific error messages from Telegram:
support_bot/topic_manager.py
def _is_thread_missing(err: TelegramBadRequest) -> bool:
msg = (getattr(err, "message", None) or "").lower()
return (
"message thread not found" in msg
or "message thread is not found" in msg
or "thread not found" in msg
or ("topic" in msg and "closed" in msg)
)
This catches various error formats that indicate a topic is unavailable.
Automatic recreation
When a missing topic is detected, the bot automatically creates a new one:
support_bot/topic_manager.py
except TelegramBadRequest as err:
if not _is_thread_missing(err):
# Handle other errors
try:
await bot.send_message(
chat_id=self._operator_group_id,
message_thread_id=topic.topic_id,
text=(
"Failed to copy the user's message.\n"
f"type={message.content_type}, message_id={message.message_id}\n"
f"error={getattr(err, 'message', str(err))}"
),
)
except Exception:
pass
return topic
# Deactivate old conversation and create new topic
await self._db.deactivate_conversation(message.from_user.id)
topic = await self.ensure_topic(bot, message.from_user)
reply_params = None # Can't preserve replies across topics
copy_result = await bot.copy_message(
chat_id=self._operator_group_id,
from_chat_id=message.chat.id,
message_id=message.message_id,
message_thread_id=topic.topic_id,
reply_parameters=reply_params,
)
When a topic is recreated, the bot cannot preserve reply chains because the original messages no longer exist.
Concurrent access locks
The TopicManager uses per-user locks to prevent race conditions:
support_bot/topic_manager.py
class TopicManager:
def __init__(self, db: Database, operator_group_id: int) -> None:
self._db = db
self._operator_group_id = operator_group_id
self._locks: dict[int, asyncio.Lock] = {}
self._locks_guard = asyncio.Lock()
Lock acquisition
Locks are created lazily and stored in a dictionary:
support_bot/topic_manager.py
async def _lock_for(self, user_id: int) -> asyncio.Lock:
async with self._locks_guard:
lock = self._locks.get(user_id)
if lock is None:
lock = asyncio.Lock()
self._locks[user_id] = lock
return lock
Why locks matter
Without locks, the following race condition could occur:
- User sends message A and B simultaneously
- Both handlers check for existing topic (none found)
- Both handlers create a topic
- Two topics exist for the same user
With locks:
- User sends message A and B simultaneously
- Handler A acquires lock, checks database, creates topic
- Handler B waits for lock
- Handler B acquires lock, finds existing topic, uses it
- Only one topic exists
Locks are per-user, so different users can create topics concurrently without blocking each other.
Lock scope
The lock is only held during topic creation:
support_bot/topic_manager.py
async def ensure_topic(self, bot: Bot, user: User) -> TopicRef:
lock = await self._lock_for(user.id)
async with lock:
# Check database and create topic if needed
existing = await self._db.get_active_conversation(user.id)
if existing:
return TopicRef(user_id=user.id, topic_id=existing.topic_id)
# Create new topic
topic = await bot.create_forum_topic(
chat_id=self._operator_group_id,
name=_topic_name(user),
)
await self._db.set_conversation(user_id=user.id, topic_id=topic.message_thread_id, active=True)
# ...
# Lock is released here
Once the topic exists, subsequent operations don’t need locking because they work with the existing topic ID.
Topic reference
The TopicManager returns a TopicRef object:
support_bot/topic_manager.py
@dataclass(frozen=True)
class TopicRef:
user_id: int
topic_id: int
This lightweight object links a user to their topic thread ID, which is used for all message operations.
Copying messages to topics
The main entry point for copying user messages is:
support_bot/topic_manager.py
async def copy_user_message_to_topic(self, bot: Bot, message: Message) -> TopicRef:
if message.from_user is None:
raise RuntimeError("Message has no from_user")
topic = await self.ensure_topic(bot, message.from_user)
reply_params = await self._build_reply_params(
source_chat_id=message.chat.id,
source_message=message.reply_to_message,
target_chat_id=self._operator_group_id,
)
try:
copy_result = await bot.copy_message(
chat_id=self._operator_group_id,
from_chat_id=message.chat.id,
message_id=message.message_id,
message_thread_id=topic.topic_id,
reply_parameters=reply_params,
)
This method:
- Ensures a topic exists
- Builds reply parameters if the message is a reply
- Copies the message to the topic
- Logs the message link for future reply handling
- Handles various error cases
See Message flow for complete details on message routing.
Looking up users by topic
Operators’ messages need to be routed back to the correct user:
async def find_user_id_by_topic(self, topic_id: int) -> int | None:
cur = await self.conn.execute(
"SELECT user_id FROM conversations WHERE topic_id = ? AND active = 1",
(topic_id,),
)
row = await cur.fetchone()
await cur.close()
return int(row[0]) if row else None
This query is used by the operator handler to determine where to send replies:
support_bot/handlers/operator.py
user_id = await db.find_user_id_by_topic(int(message.message_thread_id))
if user_id is None:
return
await bot.copy_message(
chat_id=user_id,
from_chat_id=message.chat.id,
message_id=message.message_id,
)
If no user is found for a topic (e.g., if the topic was manually created), the message is silently ignored.