Skip to main content
TypeAgent provides flexible ingestion APIs for adding messages to conversations. Messages are processed, indexed, and enriched with semantic knowledge extraction.

Basic Message Ingestion

The primary method for adding messages is add_messages_with_indexing(), which handles both storage and index building.
from typeagent import create_conversation
from typeagent.transcripts.transcript import TranscriptMessage, TranscriptMessageMeta

# Create conversation
conversation = await create_conversation(
    "demo.db",
    TranscriptMessage,
    name="Demo Conversation"
)

# Create messages
messages = [
    TranscriptMessage(
        text_chunks=["Welcome to the Python programming tutorial."],
        metadata=TranscriptMessageMeta(speaker="Instructor")
    ),
    TranscriptMessage(
        text_chunks=["Today we'll learn about async/await in Python."],
        metadata=TranscriptMessageMeta(speaker="Instructor")
    )
]

# Add messages with indexing
result = await conversation.add_messages_with_indexing(messages)
print(f"Added {result.messages_added} messages")
print(f"Extracted {result.semrefs_added} semantic refs")

Message Formats

TypeAgent supports multiple message types for different use cases.

Transcript Messages

For conversations, meetings, or podcasts:
from typeagent.transcripts.transcript import TranscriptMessage, TranscriptMessageMeta

message = TranscriptMessage(
    text_chunks=["The async keyword is used to define asynchronous functions."],
    metadata=TranscriptMessageMeta(
        speaker="Instructor",
        recipients=["Students"]
    ),
    timestamp="2024-01-15T10:30:00Z"
)

Email Messages

For email ingestion:
from typeagent.emails.email_message import EmailMessage, EmailMessageMeta

message = EmailMessage(
    text_chunks=["Subject: Meeting Tomorrow", "Let's meet at 2pm to discuss the project."],
    metadata=EmailMessageMeta(
        sender="[email protected]",
        recipients=["[email protected]"],
        subject="Meeting Tomorrow"
    ),
    timestamp="2024-01-15T09:00:00Z"
)

Universal Messages

For generic conversation messages:
from typeagent.knowpro.universal_message import ConversationMessage, ConversationMessageMeta

message = ConversationMessage(
    text_chunks=["Hello, how can I help you today?"],
    metadata=ConversationMessageMeta(
        speaker="Agent",
        recipients=["User"]
    )
)

Batch Processing

1
Step 1: Configure Batch Size
2
Set the batch size in conversation settings:
3
from typeagent.knowpro.convsettings import ConversationSettings

settings = ConversationSettings()
settings.semantic_ref_index_settings.batch_size = 4  # Process 4 messages at a time
4
Step 2: Process in Batches
5
For large datasets, process messages in batches:
6
import time

batch_size = 50
total_messages = len(messages)

for i in range(0, total_messages, batch_size):
    batch = messages[i:i + batch_size]
    t0 = time.time()
    
    result = await conversation.add_messages_with_indexing(batch)
    
    elapsed = time.time() - t0
    print(f"Processed {i+len(batch)}/{total_messages} messages in {elapsed:.1f}s")
7
Step 3: Monitor Progress
8
Track ingestion progress:
9
msg_count = await conversation.messages.size()
semref_count = await conversation.semantic_refs.size()

if conversation.secondary_indexes and conversation.secondary_indexes.message_index:
    index_size = await conversation.secondary_indexes.message_index.size()
    print(f"Message index has {index_size} entries")

Text Chunking

Messages can be split into multiple chunks for better processing:
def read_messages(filename) -> list[TranscriptMessage]:
    messages = []
    with open(filename, "r") as f:
        for line in f:
            speaker, text_chunk = line.split(None, 1)
            message = TranscriptMessage(
                text_chunks=[text_chunk],
                metadata=TranscriptMessageMeta(speaker=speaker)
            )
            messages.append(message)
    return messages

messages = read_messages("transcript.txt")
results = await conversation.add_messages_with_indexing(messages)

Source ID Tracking

Always use source IDs when ingesting external content to prevent duplicate ingestion.
TypeAgent tracks ingested sources to prevent duplicates:
from typeagent.emails.email_memory import EmailMemory

email_memory = await EmailMemory.create(settings)

# Ingest with source ID tracking
await email_memory.add_messages_with_indexing(
    [email],
    source_ids=[str(email_file)]  # Typically file path or email ID
)

# Check if source was already ingested
if await storage_provider.is_source_ingested(source_id):
    print("Already ingested, skipping")
    continue

Knowledge Extraction

Knowledge extraction is automatically enabled when using add_messages_with_indexing().
Messages are automatically enriched with semantic knowledge:
# Knowledge extraction happens automatically
result = await conversation.add_messages_with_indexing(messages)

# The result contains extracted semantic references
print(f"Extracted {result.semrefs_added} semantic references")
print(f"Including entities, actions, topics from the messages")
Extracted knowledge includes:
  • Entities: People, organizations, locations
  • Actions: Activities, events, communications
  • Topics: Subjects, themes discussed
  • Timestamps: Temporal information

Error Handling

Handle ingestion errors gracefully:
import traceback

success_count = 0
failed_count = 0

for source_id, email_file in email_files:
    try:
        email = import_email_from_file(str(email_file))
        await email_memory.add_messages_with_indexing(
            [email],
            source_ids=[source_id]
        )
        success_count += 1
    except Exception as e:
        failed_count += 1
        print(f"Error processing {source_id}: {e}")
        if verbose:
            traceback.print_exc()
        
        # Mark as failed to skip on retry
        async with storage_provider:
            await storage_provider.mark_source_ingested(
                source_id,
                status=e.__class__.__name__
            )

print(f"Successfully imported {success_count} messages")
print(f"Failed to import {failed_count} messages")

Performance Tips

For optimal performance with large datasets:
  • Use batch sizes of 4-50 messages
  • Enable SQLite WAL mode (automatic with SqliteStorageProvider)
  • Process messages sequentially to avoid lock contention
# SQLite optimization is automatic in SqliteStorageProvider
# But you can verify it's enabled:

# These are set automatically:
# PRAGMA synchronous = NORMAL
# PRAGMA journal_mode = WAL
# PRAGMA cache_size = -64000  # 64MB cache

Next Steps

Build docs developers (and LLMs) love