TypeAgent supports two storage backends: in-memory for ephemeral data and SQLite for persistent storage.
Storage Provider Overview
Both storage providers implement the same IStorageProvider interface:
from typeagent.knowpro.interfaces import IStorageProvider
# Common operations across all providers:
# - get_message_collection()
# - get_semantic_ref_collection()
# - get_semantic_ref_index()
# - get_message_text_index()
# - get_related_terms_index()
# - get_conversation_metadata()
# - is_source_ingested()
# - mark_source_ingested()
Memory Storage Provider
The MemoryStorageProvider stores all data in RAM. Ideal for:
- Testing and development
- Temporary analysis
- Small datasets
- Prototyping
Creating Memory Storage
from typeagent.storage.memory import MemoryStorageProvider
from typeagent.knowpro.convsettings import ConversationSettings
from typeagent.transcripts.transcript import TranscriptMessage
settings = ConversationSettings()
provider = MemoryStorageProvider(
message_text_settings=settings.message_text_index_settings,
related_terms_settings=settings.related_term_index_settings
)
settings.storage_provider = provider
Using Memory Storage with create_conversation
from typeagent import create_conversation
from typeagent.transcripts.transcript import TranscriptMessage
# Pass None for dbname to use in-memory storage
conversation = await create_conversation(
dbname=None, # In-memory storage
message_type=TranscriptMessage,
name="Temporary Analysis"
)
Memory Storage Characteristics
- Fast: No disk I/O overhead
- Simple: No database files to manage
- Isolated: Each instance is independent
- Testing: Clean state for each test
# Perfect for unit tests
async def test_message_ingestion():
conversation = await create_conversation(
None, # Memory storage
TranscriptMessage
)
messages = [create_test_message()]
result = await conversation.add_messages_with_indexing(messages)
assert result.messages_added == 1
- Volatile: Data lost when process ends
- Memory-bound: Limited by available RAM
- No persistence: Cannot save and reload
- Single-process: Cannot share across processes
Memory storage is NOT suitable for production applications or large datasets.
SQLite Storage Provider
The SqliteStorageProvider uses SQLite for persistent storage. Ideal for:
- Production applications
- Large datasets (millions of messages)
- Persistent analysis
- Shareable databases
Creating SQLite Storage
from typeagent.storage.sqlite import SqliteStorageProvider
from typeagent.knowpro.convsettings import ConversationSettings
from typeagent.transcripts.transcript import TranscriptMessage
settings = ConversationSettings()
provider = SqliteStorageProvider(
db_path="conversation.db",
message_type=TranscriptMessage,
message_text_index_settings=settings.message_text_index_settings,
related_term_index_settings=settings.related_term_index_settings
)
settings.storage_provider = provider
Using SQLite with create_conversation
from typeagent import create_conversation
from typeagent.transcripts.transcript import TranscriptMessage
# Provide database path for SQLite storage
conversation = await create_conversation(
dbname="conversation.db", # SQLite file
message_type=TranscriptMessage,
name="Production Conversation"
)
SQLite Optimization
TypeAgent automatically configures SQLite for optimal performance:
# These optimizations are applied automatically:
# - PRAGMA foreign_keys = ON
# - PRAGMA synchronous = NORMAL
# - PRAGMA journal_mode = WAL # Write-Ahead Logging
# - PRAGMA cache_size = -64000 # 64MB cache
# - PRAGMA temp_store = MEMORY
# - PRAGMA mmap_size = 268435456 # 256MB memory-mapped I/O
Transaction Management
Always use async context managers for write operations to ensure proper transaction handling.
from typeagent.storage.sqlite import SqliteStorageProvider
provider = SqliteStorageProvider(db_path="demo.db")
# Automatic transaction management
async with provider:
# All operations in this block are in a transaction
await provider.get_message_collection().extend(messages)
await provider.mark_source_ingested(source_id)
# Commits on success, rolls back on exception
# Close when done
await provider.close()
SQLite Schema Versioning
TypeAgent tracks database schema versions:
provider = SqliteStorageProvider(db_path="conversation.db")
# Get schema version
version = provider.get_db_version()
print(f"Database schema version: {version}")
# Metadata includes schema version
metadata = await provider.get_conversation_metadata()
print(f"Stored schema version: {metadata.schema_version}")
Switching Between Storage Providers
Step 1: Development with Memory Storage
Start with memory storage for development:
# Development configuration
dev_conversation = await create_conversation(
dbname=None, # Memory
message_type=TranscriptMessage,
name="Development"
)
# Test your ingestion pipeline
messages = load_test_messages()
await dev_conversation.add_messages_with_indexing(messages)
# Test queries
answer = await dev_conversation.query("test question")
Step 2: Production with SQLite Storage
Switch to SQLite for production:
# Production configuration
prod_conversation = await create_conversation(
dbname="production.db", # SQLite
message_type=TranscriptMessage,
name="Production"
)
# Same ingestion code
messages = load_production_messages()
await prod_conversation.add_messages_with_indexing(messages)
# Same query code
answer = await prod_conversation.query("production question")
Step 3: Configuration Flexibility
Use environment variables to switch:
import os
from typeagent import create_conversation
# Use environment variable to control storage
db_path = os.getenv("TYPEAGENT_DB_PATH", None)
conversation = await create_conversation(
dbname=db_path, # None for memory, path for SQLite
message_type=TranscriptMessage,
name="Flexible Storage"
)
Storage Provider Utilities
The create_storage_provider utility automatically selects the right provider:
from typeagent.storage.utils import create_storage_provider
from typeagent.knowpro.convsettings import ConversationSettings
settings = ConversationSettings()
# Automatically creates MemoryStorageProvider
memory_provider = await create_storage_provider(
settings.message_text_index_settings,
settings.related_term_index_settings,
dbname=None, # None = Memory
message_type=TranscriptMessage
)
# Automatically creates SqliteStorageProvider
sqlite_provider = await create_storage_provider(
settings.message_text_index_settings,
settings.related_term_index_settings,
dbname="data.db", # Path = SQLite
message_type=TranscriptMessage
)
Source ID Tracking
Both providers support source ID tracking to prevent duplicate ingestion:
# Check if source was already ingested
if await provider.is_source_ingested("email_12345"):
print("Already ingested, skipping")
else:
# Ingest message
await conversation.add_messages_with_indexing([message])
# Mark as ingested
async with provider:
await provider.mark_source_ingested("email_12345")
Source Status Tracking
# Get ingestion status
status = await provider.get_source_status("email_12345")
if status is None:
print("Not ingested")
elif status == "ingested":
print("Successfully ingested")
else:
print(f"Failed with: {status}")
Marking Failed Sources
try:
email = import_email_from_file(file_path)
await conversation.add_messages_with_indexing([email])
async with provider:
await provider.mark_source_ingested(source_id, "ingested")
except Exception as e:
# Mark as failed with exception type
async with provider:
await provider.mark_source_ingested(
source_id,
status=e.__class__.__name__
)
Embedding Consistency
SQLite storage validates embedding model consistency. You cannot change embedding models after creating a database.
from typeagent.storage.sqlite import SqliteStorageProvider
from typeagent.aitools.model_adapters import create_embedding_model
# First time: creates database with text-embedding-ada-002
model1 = create_embedding_model("openai:text-embedding-ada-002")
provider1 = SqliteStorageProvider(
db_path="demo.db",
message_text_index_settings=MessageTextIndexSettings(
TextEmbeddingIndexSettings(model1)
)
)
# Later: trying to open with different model
model2 = create_embedding_model("openai:text-embedding-3-small")
provider2 = SqliteStorageProvider(
db_path="demo.db", # Same database
message_text_index_settings=MessageTextIndexSettings(
TextEmbeddingIndexSettings(model2)
)
)
# Raises ValueError: embedding model mismatch
Storage Provider Comparison
| Feature | MemoryStorageProvider | SqliteStorageProvider |
|---|
| Persistence | No | Yes |
| Performance | Fastest | Fast with WAL |
| Capacity | RAM-limited | Disk-limited |
| Transactions | No-op | Full ACID |
| Concurrency | Single process | WAL enables readers during writes |
| Portability | N/A | Single .db file |
| Use Case | Testing, prototypes | Production, large data |
Database File Management
Database Location
# Relative path
conversation = await create_conversation(
dbname="data/conversations/demo.db",
message_type=TranscriptMessage
)
# Absolute path
import os
db_path = os.path.join(os.getcwd(), "databases", "demo.db")
conversation = await create_conversation(
dbname=db_path,
message_type=TranscriptMessage
)
Database Backup
import shutil
from datetime import datetime
# Create backup
original = "conversation.db"
backup = f"conversation.backup.{datetime.now():%Y%m%d_%H%M%S}.db"
shutil.copy2(original, backup)
print(f"Backed up to {backup}")
Database Inspection
# Use SQLite CLI to inspect
sqlite3 conversation.db
# List tables
.tables
# Show schema
.schema Messages
# Query data
SELECT COUNT(*) FROM Messages;
SELECT * FROM ConversationMetadata;
Advanced Storage Operations
Clearing Data
# Clear all data from storage
await provider.clear()
print(f"Messages: {await provider.get_message_collection().size()}") # 0
print(f"Semantic refs: {await provider.get_semantic_ref_collection().size()}") # 0
Serialization
SQLite provider supports serialization:
# Serialize storage data
data = provider.serialize()
# Deserialize into another provider
new_provider = SqliteStorageProvider(db_path=":memory:")
await new_provider.deserialize(data)
Best Practices
import pytest
from typeagent import create_conversation
@pytest.mark.asyncio
async def test_message_ingestion():
# Clean state for each test
conv = await create_conversation(None, TranscriptMessage)
# ... test code
Use SQLite for Production
# Production setup
conversation = await create_conversation(
dbname="/var/lib/typeagent/production.db",
message_type=EmailMessage,
name="Production Email Archive"
)
try:
provider = SqliteStorageProvider(db_path="demo.db")
# ... work with provider
finally:
await provider.close()
Use Transactions for Writes
# Group related operations in transactions
async with provider:
for message in batch:
await conversation.add_messages_with_indexing([message])
await provider.mark_source_ingested(message.src_url)
Next Steps