Skip to main content

Class Definition

class ConversationBase[
    TMessage: IMessage
](
    Generic[TMessage],
    IConversation[TMessage, ITermToSemanticRefIndex]
)
Represents a conversation which holds ingested messages and the extracted and indexed knowledge. Created via the create_conversation factory function.

Import

from typeagent.knowpro.conversation_base import ConversationBase
Best Practice: Use the factory function typeagent.create_conversation() instead of constructing ConversationBase directly.

Properties

settings
ConversationSettings
Configuration settings for knowledge extraction and indexing behavior.
storage_provider
IStorageProvider[TMessage]
The storage backend (SQLite or in-memory) managing persistence.
name_tag
str
The conversation’s name identifier.
tags
list[str]
Tags associated with the entire conversation.
messages
IMessageCollection[TMessage]
Append-only collection of all messages in the conversation.
semantic_refs
ISemanticRefCollection
Collection of extracted semantic references (entities, actions, topics, tags).
semantic_ref_index
ITermToSemanticRefIndex
Index mapping terms to semantic references for knowledge lookup.
secondary_indexes
IConversationSecondaryIndexes[TMessage] | None
Collection of all 6 secondary indexes for optimized querying.

Methods

query

async def query(
    self,
    question: str,
    search_options: LanguageSearchOptions | None = None,
    answer_options: AnswerContextOptions | None = None,
) -> str
Run an end-to-end natural language query on the conversation. Performs semantic search and generates an answer based on indexed knowledge.
question
str
required
The natural language question to answer.Example: "What topics were discussed?"
search_options
LanguageSearchOptions | None
default:"None"
Advanced search configuration. If None, uses defaults:
  • exact_scope=False
  • verb_scope=True
  • max_message_matches=25
answer_options
AnswerContextOptions | None
default:"None"
Context filtering for answer generation. If None, uses defaults:
  • entities_top_k=50
  • topics_top_k=50
  • messages_top_k=None
answer
str
Natural language answer string. If no answer is found, returns a string starting with "No answer found:" followed by an explanation.
Example:
conv = await create_conversation(
    dbname=None,
    message_type=ConversationMessage
)

# Add messages...
await conv.add_messages_with_indexing(messages)

# Query the conversation
answer = await conv.query("Who spoke about machine learning?")
print(answer)  # "Alice and Bob discussed machine learning..."

add_messages_with_indexing

async def add_messages_with_indexing(
    self,
    messages: list[TMessage],
    *,
    source_ids: list[str] | None = None,
) -> AddMessagesResult
Add messages and build all indexes incrementally in a single transaction. This is the primary method for ingesting content.
messages
list[TMessage]
required
Messages to add to the conversation. Must be instances of the message_type specified during creation.
source_ids
list[str] | None
default:"None"
Optional list of external source identifiers (e.g., email IDs, file paths) to mark as ingested. Must match the length of messages. These are marked within the same transaction, ensuring atomic ingestion tracking.Example: ["email:123", "email:124"]
result
AddMessagesResult
Object with two fields:
  • messages_added: int - Number of messages successfully added
  • semrefs_added: int - Number of semantic references extracted
Transaction Behavior:
  • SQLite: Either completely succeeds (all changes committed) or raises an exception (all changes rolled back)
  • In-memory: No rollback support; partial changes may remain on error
Indexing Pipeline:
  1. Messages appended to collection
  2. Metadata knowledge extracted (speakers, recipients)
  3. LLM knowledge extracted (if auto_extract_knowledge=True)
  4. All 6 secondary indexes updated incrementally
  5. Conversation timestamp updated
Example:
from typeagent.knowpro.universal_message import (
    ConversationMessage,
    ConversationMessageMeta
)

messages = [
    ConversationMessage(
        text_chunks=["Let's discuss the new API design."],
        metadata=ConversationMessageMeta(speaker="Alice"),
        timestamp="2024-03-06T10:00:00Z"
    ),
    ConversationMessage(
        text_chunks=["I think we should use REST instead of GraphQL."],
        metadata=ConversationMessageMeta(speaker="Bob"),
        timestamp="2024-03-06T10:01:00Z"
    )
]

result = await conv.add_messages_with_indexing(
    messages,
    source_ids=["msg:001", "msg:002"]
)

print(f"Added {result.messages_added} messages")
print(f"Extracted {result.semrefs_added} semantic references")
Error Handling:
try:
    result = await conv.add_messages_with_indexing(messages)
except ValueError as e:
    # Validation errors (e.g., source_ids length mismatch)
    print(f"Invalid input: {e}")
except Exception as e:
    # Database errors, LLM failures, etc.
    # For SQLite: all changes are rolled back
    print(f"Indexing failed: {e}")

add_metadata_to_index

async def add_metadata_to_index(self) -> None
Add metadata knowledge (speakers, recipients) to the semantic reference index. Called automatically by add_messages_with_indexing.

Class Method

create

@classmethod
async def create(
    cls,
    settings: ConversationSettings,
    name: str | None = None,
    tags: list[str] | None = None,
) -> ConversationBase[TMessage]
Alternative factory method. Prefer using typeagent.create_conversation() instead, which handles storage provider creation automatically.

Usage Patterns

Complete Conversation Lifecycle

from typeagent import create_conversation
from typeagent.knowpro.universal_message import (
    ConversationMessage,
    ConversationMessageMeta
)

# 1. Create conversation
conv = await create_conversation(
    dbname="chat.db",
    message_type=ConversationMessage,
    name="team_discussion"
)

# 2. Add messages with indexing
messages = [
    ConversationMessage(
        text_chunks=["We need to optimize the database queries."],
        metadata=ConversationMessageMeta(speaker="Alice"),
        tags=["performance", "database"]
    ),
    ConversationMessage(
        text_chunks=["I suggest adding an index on the user_id column."],
        metadata=ConversationMessageMeta(speaker="Bob")
    )
]

result = await conv.add_messages_with_indexing(messages)
print(f"Indexed {result.semrefs_added} knowledge items")

# 3. Query the conversation
answer = await conv.query("What performance improvements were suggested?")
print(answer)

# 4. Access collections directly
num_messages = await conv.messages.size()
num_semrefs = await conv.semantic_refs.size()
print(f"Total: {num_messages} messages, {num_semrefs} semantic refs")

Batch Processing with Source Tracking

import asyncio
from pathlib import Path

async def process_transcript_files(conv: ConversationBase, directory: Path):
    """Process multiple transcript files, skipping already-ingested ones."""
    for file_path in directory.glob("*.json"):
        source_id = f"file:{file_path}"
        
        # Check if already processed
        if await conv.storage_provider.is_source_ingested(source_id):
            print(f"Skipping {file_path.name} (already ingested)")
            continue
        
        # Load and parse messages
        messages = load_messages_from_file(file_path)
        
        # Ingest with source tracking
        try:
            result = await conv.add_messages_with_indexing(
                messages,
                source_ids=[source_id] * len(messages)
            )
            print(f"Processed {file_path.name}: {result.messages_added} messages")
        except Exception as e:
            print(f"Failed to process {file_path.name}: {e}")

Build docs developers (and LLMs) love