Skip to main content

Overview

Storage providers manage conversation persistence and provide access to collections and indexes. TypeAgent supports two implementations:
  • MemoryStorageProvider - Fast, in-memory storage (data lost on exit)
  • SqliteStorageProvider - Persistent SQLite storage with transactions

IStorageProvider Protocol

from typeagent.knowpro.interfaces import IStorageProvider
Base protocol defining the storage provider interface. All providers implement this.

Type Parameters

TMessage
IMessage
The message type this provider stores (e.g., ConversationMessage).

Collection Getters

get_message_collection

async def get_message_collection(self) -> IMessageCollection[TMessage]
Retrieve the append-only collection of messages.
collection
IMessageCollection[TMessage]
Message collection supporting append, size, get_item, get_slice operations.

get_semantic_ref_collection

async def get_semantic_ref_collection(self) -> ISemanticRefCollection
Retrieve the collection of extracted semantic references.
collection
ISemanticRefCollection
Semantic reference collection containing entities, actions, topics, and tags.

Index Getters

All 6 index types are accessible:

get_semantic_ref_index

async def get_semantic_ref_index(self) -> ITermToSemanticRefIndex
Get the term-to-semantic-reference mapping index for knowledge lookup.

get_property_index

async def get_property_index(self) -> IPropertyToSemanticRefIndex
Get the property index for fast name/value lookups.

get_timestamp_index

async def get_timestamp_index(self) -> ITimestampToTextRangeIndex
Get the timestamp index for date range queries.

get_message_text_index

async def get_message_text_index(self) -> IMessageTextIndex[TMessage]
Get the message text index for semantic search over message content.
async def get_related_terms_index(self) -> ITermToRelatedTermsIndex
Get the related terms index for synonyms and fuzzy matching.

get_conversation_threads

async def get_conversation_threads(self) -> IConversationThreads
Get the conversation threads index for thread-based organization.

Metadata Management

get_conversation_metadata

async def get_conversation_metadata(self) -> ConversationMetadata
Get conversation metadata (name, timestamps, embedding model, etc.).
metadata
ConversationMetadata
Metadata object with fields:
  • name_tag: str | None - Conversation name
  • schema_version: int | None - Database schema version
  • created_at: datetime | None - Creation timestamp
  • updated_at: datetime | None - Last update timestamp
  • embedding_model: str | None - Embedding model name
  • tags: list[str] | None - Conversation tags
  • extra: dict[str, str] | None - Custom metadata

set_conversation_metadata

async def set_conversation_metadata(
    self,
    **kwds: str | list[str] | None
) -> None
Set or update conversation metadata key-value pairs.
**kwds
str | list[str] | None
Metadata keys and values:
  • str value: Sets a single key-value pair (replaces existing)
  • list[str] value: Sets multiple values for the same key
  • None value: Deletes all rows for the given key

update_conversation_timestamps

async def update_conversation_timestamps(
    self,
    created_at: datetime | None = None,
    updated_at: datetime | None = None,
) -> None
Update conversation timestamps.

Source Tracking

Track which external sources (files, emails, etc.) have been ingested to prevent duplicates.

is_source_ingested

async def is_source_ingested(self, source_id: str) -> bool
Check if a source has already been ingested.
source_id
str
required
External source identifier (e.g., "email:12345", "file:/path/to/transcript.txt").

get_source_status

async def get_source_status(self, source_id: str) -> str | None
Get the ingestion status of a source.
status
str | None
Status string (e.g., "ingested") or None if not found.

mark_source_ingested

async def mark_source_ingested(
    self,
    source_id: str,
    status: str = STATUS_INGESTED
) -> None
Mark a source as ingested. Must be called within a transaction context.
source_id
str
required
External source identifier.
status
str
default:"STATUS_INGESTED"
Status string (default: "ingested").

Transaction Management

Providers support async context manager protocol for transactions.
async with storage_provider:
    # All operations here are atomic
    await storage_provider.mark_source_ingested("source:123")
    await collection.extend(messages)
    # Commits on success, rolls back on exception

__aenter__

async def __aenter__(self) -> Self
Enter transaction context. Calls begin_transaction() for SQLite providers.

__aexit__

async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None
Exit transaction context. Commits on success, rolls back on exception.

close

async def close(self) -> None
Close the storage provider and release resources.

MemoryStorageProvider

from typeagent.storage.memory import MemoryStorageProvider
In-memory storage provider with no persistence. Fast but data is lost when the process exits.

Constructor

def __init__(
    self,
    message_text_settings: MessageTextIndexSettings,
    related_terms_settings: RelatedTermIndexSettings,
    metadata: ConversationMetadata | None = None,
) -> None
message_text_settings
MessageTextIndexSettings
required
Configuration for message text embedding index.
Configuration for related terms embedding index.
metadata
ConversationMetadata | None
default:"None"
Optional conversation metadata. If None, creates empty metadata.

Characteristics

Performance

Fastest storage option with no disk I/O

Persistence

No persistence - data lost on exit

Transactions

No-op transactions (always succeeds)

Rollback

No rollback support on errors

Example

from typeagent.storage.memory import MemoryStorageProvider
from typeagent.knowpro.convsettings import (
    MessageTextIndexSettings,
    RelatedTermIndexSettings
)
from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings
from typeagent.aitools.model_adapters import create_embedding_model

# Create embedding settings
model = create_embedding_model("openai:text-embedding-3-small")
embedding_settings = TextEmbeddingIndexSettings(model)

# Create provider
provider = MemoryStorageProvider(
    message_text_settings=MessageTextIndexSettings(embedding_settings),
    related_terms_settings=RelatedTermIndexSettings(embedding_settings)
)

# Use with conversation
conv = await ConversationBase.create(
    settings=ConversationSettings(storage_provider=provider)
)
Best Practice: Use create_conversation(dbname=None, ...) instead of manually creating MemoryStorageProvider.

SqliteStorageProvider

from typeagent.storage.sqlite import SqliteStorageProvider
Persistent SQLite-backed storage provider with full transaction support and rollback.

Constructor

def __init__(
    self,
    db_path: str = ":memory:",
    message_type: type[TMessage] = None,
    semantic_ref_type: type[SemanticRef] = None,
    message_text_index_settings: MessageTextIndexSettings | None = None,
    related_term_index_settings: RelatedTermIndexSettings | None = None,
    metadata: ConversationMetadata | None = None,
)
db_path
str
default:":memory:"
Path to SQLite database file, or ":memory:" for in-memory SQLite.Example: "conversations/chat.db"
message_type
type[TMessage]
default:"None"
Message type class for deserialization.
semantic_ref_type
type[SemanticRef]
default:"None"
Semantic reference type class for deserialization.
message_text_index_settings
MessageTextIndexSettings | None
default:"None"
Message text index configuration. If None, reads from database metadata or creates default.
Related terms index configuration. If None, uses same embedding model as message text index.
metadata
ConversationMetadata | None
default:"None"
Conversation metadata. For existing databases, this is validated against stored metadata.

Characteristics

Persistence

Full persistence to disk

Transactions

ACID transactions with rollback

Performance

Optimized with WAL mode and indexes

Consistency

Validates embedding model compatibility

Performance Optimizations

The provider automatically configures SQLite for optimal performance:
PRAGMA foreign_keys = ON
PRAGMA synchronous = NORMAL       # Faster than FULL, still safe
PRAGMA journal_mode = WAL         # Write-Ahead Logging
PRAGMA cache_size = -64000        # 64MB cache
PRAGMA temp_store = MEMORY        # Temp tables in memory
PRAGMA mmap_size = 268435456      # 256MB memory-mapped I/O

Embedding Model Consistency

The provider performs consistency checks on initialization:
  1. Validates existing embeddings have compatible dimensions
  2. Checks metadata for embedding model name
  3. Raises ValueError if mismatch detected
This prevents runtime errors from incompatible embeddings. Example Error:
ValueError: Conversation metadata embedding_model (text-embedding-ada-002) 
does not match provided embedding model (text-embedding-3-small).

Transaction Example

from typeagent.storage.sqlite import SqliteStorageProvider

provider = SqliteStorageProvider(
    db_path="chat.db",
    message_type=ConversationMessage
)

try:
    async with provider:
        # All operations in this block are atomic
        await provider.mark_source_ingested("source:123")
        
        messages_coll = await provider.get_message_collection()
        await messages_coll.extend(messages)
        
        semrefs_coll = await provider.get_semantic_ref_collection()
        await semrefs_coll.extend(semantic_refs)
        
        # Commits here if no exceptions
except Exception as e:
    # All changes rolled back automatically
    print(f"Transaction failed: {e}")
finally:
    await provider.close()

Database Schema

The provider uses the following tables:
  • Messages - Message data with JSON serialization
  • SemanticRefs - Extracted semantic references
  • SemanticRefIndex - Term-to-semref mappings
  • PropertyIndex - Property-value index
  • MessageTextIndex - Message text embeddings
  • RelatedTermsAliases - Term synonym mappings
  • RelatedTermsFuzzy - Fuzzy term matching embeddings
  • TimestampIndex - Timestamp-to-range mappings
  • ConversationMetadata - Metadata key-value pairs
  • IngestedSources - Source tracking

Example

from typeagent.storage.sqlite import SqliteStorageProvider
from typeagent.knowpro.universal_message import ConversationMessage

# Create persistent storage
provider = SqliteStorageProvider(
    db_path="conversations/team_chat.db",
    message_type=ConversationMessage
)

# Get collections
messages = await provider.get_message_collection()
semrefs = await provider.get_semantic_ref_collection()

print(f"Database contains {await messages.size()} messages")
print(f"Extracted {await semrefs.size()} semantic references")

# Check metadata
metadata = await provider.get_conversation_metadata()
print(f"Conversation: {metadata.name_tag}")
print(f"Created: {metadata.created_at}")
print(f"Embedding model: {metadata.embedding_model}")

await provider.close()
Best Practice: Use create_conversation(dbname="path.db", ...) instead of manually creating SqliteStorageProvider.

ConversationMetadata

from typeagent.knowpro.interfaces import ConversationMetadata
Storage-provider-agnostic metadata structure.

Fields

name_tag
str | None
default:"None"
Conversation name identifier.
schema_version
int | None
default:"None"
Database schema version number.
created_at
datetime | None
default:"None"
Conversation creation timestamp.
updated_at
datetime | None
default:"None"
Last update timestamp.
embedding_model
str | None
default:"None"
Embedding model name (e.g., "text-embedding-3-small").
tags
list[str] | None
default:"None"
Conversation-level tags.
extra
dict[str, str] | None
default:"None"
Custom metadata key-value pairs.

Build docs developers (and LLMs) love