TypeAgent provides flexible ingestion APIs for adding messages to conversations. Messages are processed, indexed, and enriched with semantic knowledge extraction.
Basic Message Ingestion
The primary method for adding messages is add_messages_with_indexing(), which handles both storage and index building.
from typeagent import create_conversation
from typeagent.transcripts.transcript import TranscriptMessage, TranscriptMessageMeta
# Create conversation
conversation = await create_conversation(
"demo.db",
TranscriptMessage,
name="Demo Conversation"
)
# Create messages
messages = [
TranscriptMessage(
text_chunks=["Welcome to the Python programming tutorial."],
metadata=TranscriptMessageMeta(speaker="Instructor")
),
TranscriptMessage(
text_chunks=["Today we'll learn about async/await in Python."],
metadata=TranscriptMessageMeta(speaker="Instructor")
)
]
# Add messages with indexing
result = await conversation.add_messages_with_indexing(messages)
print(f"Added {result.messages_added} messages")
print(f"Extracted {result.semrefs_added} semantic refs")
TypeAgent supports multiple message types for different use cases.
Transcript Messages
For conversations, meetings, or podcasts:
from typeagent.transcripts.transcript import TranscriptMessage, TranscriptMessageMeta
message = TranscriptMessage(
text_chunks=["The async keyword is used to define asynchronous functions."],
metadata=TranscriptMessageMeta(
speaker="Instructor",
recipients=["Students"]
),
timestamp="2024-01-15T10:30:00Z"
)
Email Messages
For email ingestion:
from typeagent.emails.email_message import EmailMessage, EmailMessageMeta
message = EmailMessage(
text_chunks=["Subject: Meeting Tomorrow", "Let's meet at 2pm to discuss the project."],
metadata=EmailMessageMeta(
sender="[email protected]",
recipients=["[email protected]"],
subject="Meeting Tomorrow"
),
timestamp="2024-01-15T09:00:00Z"
)
Universal Messages
For generic conversation messages:
from typeagent.knowpro.universal_message import ConversationMessage, ConversationMessageMeta
message = ConversationMessage(
text_chunks=["Hello, how can I help you today?"],
metadata=ConversationMessageMeta(
speaker="Agent",
recipients=["User"]
)
)
Batch Processing
Set the batch size in conversation settings:
from typeagent.knowpro.convsettings import ConversationSettings
settings = ConversationSettings()
settings.semantic_ref_index_settings.batch_size = 4 # Process 4 messages at a time
Step 2: Process in Batches
For large datasets, process messages in batches:
import time
batch_size = 50
total_messages = len(messages)
for i in range(0, total_messages, batch_size):
batch = messages[i:i + batch_size]
t0 = time.time()
result = await conversation.add_messages_with_indexing(batch)
elapsed = time.time() - t0
print(f"Processed {i+len(batch)}/{total_messages} messages in {elapsed:.1f}s")
Track ingestion progress:
msg_count = await conversation.messages.size()
semref_count = await conversation.semantic_refs.size()
if conversation.secondary_indexes and conversation.secondary_indexes.message_index:
index_size = await conversation.secondary_indexes.message_index.size()
print(f"Message index has {index_size} entries")
Text Chunking
Messages can be split into multiple chunks for better processing:
def read_messages(filename) -> list[TranscriptMessage]:
messages = []
with open(filename, "r") as f:
for line in f:
speaker, text_chunk = line.split(None, 1)
message = TranscriptMessage(
text_chunks=[text_chunk],
metadata=TranscriptMessageMeta(speaker=speaker)
)
messages.append(message)
return messages
messages = read_messages("transcript.txt")
results = await conversation.add_messages_with_indexing(messages)
Source ID Tracking
Always use source IDs when ingesting external content to prevent duplicate ingestion.
TypeAgent tracks ingested sources to prevent duplicates:
from typeagent.emails.email_memory import EmailMemory
email_memory = await EmailMemory.create(settings)
# Ingest with source ID tracking
await email_memory.add_messages_with_indexing(
[email],
source_ids=[str(email_file)] # Typically file path or email ID
)
# Check if source was already ingested
if await storage_provider.is_source_ingested(source_id):
print("Already ingested, skipping")
continue
Knowledge extraction is automatically enabled when using add_messages_with_indexing().
Messages are automatically enriched with semantic knowledge:
# Knowledge extraction happens automatically
result = await conversation.add_messages_with_indexing(messages)
# The result contains extracted semantic references
print(f"Extracted {result.semrefs_added} semantic references")
print(f"Including entities, actions, topics from the messages")
Extracted knowledge includes:
- Entities: People, organizations, locations
- Actions: Activities, events, communications
- Topics: Subjects, themes discussed
- Timestamps: Temporal information
Error Handling
Handle ingestion errors gracefully:
import traceback
success_count = 0
failed_count = 0
for source_id, email_file in email_files:
try:
email = import_email_from_file(str(email_file))
await email_memory.add_messages_with_indexing(
[email],
source_ids=[source_id]
)
success_count += 1
except Exception as e:
failed_count += 1
print(f"Error processing {source_id}: {e}")
if verbose:
traceback.print_exc()
# Mark as failed to skip on retry
async with storage_provider:
await storage_provider.mark_source_ingested(
source_id,
status=e.__class__.__name__
)
print(f"Successfully imported {success_count} messages")
print(f"Failed to import {failed_count} messages")
For optimal performance with large datasets:
- Use batch sizes of 4-50 messages
- Enable SQLite WAL mode (automatic with SqliteStorageProvider)
- Process messages sequentially to avoid lock contention
# SQLite optimization is automatic in SqliteStorageProvider
# But you can verify it's enabled:
# These are set automatically:
# PRAGMA synchronous = NORMAL
# PRAGMA journal_mode = WAL
# PRAGMA cache_size = -64000 # 64MB cache
Next Steps