Database Persistence Layer
MoFA provides a flexible persistence layer for storing LLM interactions, API call metrics, and session data. The persistence layer supports multiple database backends through a unified trait-based API.Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Persistence Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Core Traits (Microkernel) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │MessageStore │ │ ApiCallStore│ │ SessionStore │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Backend Implementations │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ PostgreSQL │ │ MySQL │ │ SQLite │ │ │
│ │ │ (sqlx-pg) │ │ (sqlx-mysql)│ │ (sqlx-sqlite) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ In-Memory (default) │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Feature Flags
Backend support is controlled via feature flags:[dependencies]
mofa-foundation = { version = "*", features = ["persistence-postgres"] }
# Or:
# mofa-foundation = { version = "*", features = ["persistence-mysql"] }
# mofa-foundation = { version = "*", features = ["persistence-sqlite"] }
PostgreSQL Backend
Setup
use mofa_foundation::persistence::PostgresStore;
use uuid::Uuid;
// Connect to PostgreSQL
let store = PostgresStore::connect("postgres://user:pass@localhost/mofa").await?;
// Or from environment variable
let store = PostgresStore::from_env().await?;
// With custom connection pool
let store = PostgresStore::connect_with_options(
"postgres://localhost/mofa",
20 // max connections
).await?;
Schema
PostgreSQL uses native JSON types for optimal performance:CREATE TABLE entity_llm_message (
id UUID PRIMARY KEY,
chat_session_id UUID NOT NULL,
agent_id UUID NOT NULL,
user_id UUID NOT NULL,
tenant_id UUID,
parent_message_id UUID,
role TEXT NOT NULL,
content JSONB NOT NULL, -- Native JSON
create_time TIMESTAMPTZ NOT NULL,
update_time TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_message_session ON entity_llm_message(chat_session_id);
Store Messages
use mofa_foundation::persistence::{LLMMessage, MessageContent, MessageRole};
use chrono::Utc;
let message = LLMMessage {
id: Uuid::new_v4(),
chat_session_id: session_id,
agent_id: agent_id,
user_id: user_id,
tenant_id: Uuid::nil(),
parent_message_id: None,
role: MessageRole::User,
content: MessageContent::Text("What is Rust?".to_string()),
create_time: Utc::now(),
update_time: Utc::now(),
};
store.save_message(&message).await?;
Query Messages
// Get single message
let msg = store.get_message(message_id).await?;
// Get all messages in a session
let messages = store.get_session_messages(session_id).await?;
// Paginated query
let messages = store.get_session_messages_paginated(
session_id,
offset: 0,
limit: 50
).await?;
// Count messages
let count = store.count_session_messages(session_id).await?;
MySQL Backend
Setup
use mofa_foundation::persistence::MySqlStore;
let store = MySqlStore::connect("mysql://user:pass@localhost/mofa").await?;
// With custom pool size
let store = MySqlStore::connect_with_options(
"mysql://localhost/mofa",
15 // max connections
).await?;
Schema Differences
MySQL stores JSON as TEXT (serialized):CREATE TABLE entity_llm_message (
id VARCHAR(36) PRIMARY KEY,
chat_session_id VARCHAR(36) NOT NULL,
agent_id VARCHAR(36) NOT NULL,
user_id VARCHAR(36) NOT NULL,
tenant_id VARCHAR(36),
parent_message_id VARCHAR(36),
role VARCHAR(50) NOT NULL,
content TEXT NOT NULL, -- JSON as TEXT
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_message_session (chat_session_id)
);
Usage
API is identical to PostgreSQL:store.save_message(&message).await?;
let messages = store.get_session_messages(session_id).await?;
SQLite Backend
Setup
use mofa_foundation::persistence::SqliteStore;
// File-based database
let store = SqliteStore::connect("sqlite:./data.db").await?;
// In-memory database (for testing)
let store = SqliteStore::in_memory().await?;
Auto-migrations
SQLite automatically runs migrations on connect:let store = SqliteStore::connect("sqlite:./app.db").await?;
// Tables are created automatically
Schema
CREATE TABLE IF NOT EXISTS entity_llm_message (
id TEXT PRIMARY KEY,
chat_session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
user_id TEXT NOT NULL,
tenant_id TEXT,
parent_message_id TEXT,
role TEXT NOT NULL,
content TEXT NOT NULL,
create_time TEXT NOT NULL,
update_time TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_message_session
ON entity_llm_message(chat_session_id);
API Call Tracking
Track LLM API calls for monitoring and cost analysis:use mofa_foundation::persistence::{LLMApiCall, ApiCallStatus, TokenDetails};
let api_call = LLMApiCall {
id: Uuid::new_v4(),
chat_session_id: session_id,
agent_id: agent_id,
user_id: user_id,
tenant_id: Uuid::nil(),
request_message_id: request_id,
response_message_id: response_id,
model_name: "gpt-4".to_string(),
status: ApiCallStatus::Success,
error_message: None,
error_code: None,
prompt_tokens: 150,
completion_tokens: 280,
total_tokens: 430,
prompt_tokens_details: Some(TokenDetails {
cached_tokens: Some(50),
audio_tokens: None,
}),
completion_tokens_details: None,
total_price: Some(0.0129), // USD
price_details: None,
latency_ms: Some(1245),
time_to_first_token_ms: Some(340),
tokens_per_second: Some(35.5),
api_response_id: Some("chatcmpl-abc123".to_string()),
create_time: Utc::now(),
update_time: Utc::now(),
};
store.save_api_call(&api_call).await?;
Query API Calls
use mofa_foundation::persistence::QueryFilter;
// Filter by user
let filter = QueryFilter {
user_id: Some(user_id),
session_id: None,
agent_id: None,
start_time: None,
end_time: None,
status: None,
model_name: None,
limit: Some(100),
offset: Some(0),
};
let calls = store.query_api_calls(&filter).await?;
// Get usage statistics
let stats = store.get_statistics(&filter).await?;
println!("Total calls: {}", stats.total_calls);
println!("Success rate: {:.2}%",
stats.success_count as f64 / stats.total_calls as f64 * 100.0);
println!("Total tokens: {}", stats.total_tokens);
println!("Total cost: ${:.4}", stats.total_cost.unwrap_or(0.0));
println!("Avg latency: {:.0}ms", stats.avg_latency_ms.unwrap_or(0.0));
Session Management
Create Session
use mofa_foundation::persistence::ChatSession;
use std::collections::HashMap;
let mut metadata = HashMap::new();
metadata.insert("client".to_string(), json!("web"));
metadata.insert("version".to_string(), json!("1.0.0"));
let session = ChatSession {
id: Uuid::new_v4(),
user_id: user_id,
agent_id: agent_id,
tenant_id: Uuid::nil(),
title: Some("Research Session".to_string()),
metadata,
create_time: Utc::now(),
update_time: Utc::now(),
};
store.create_session(&session).await?;
Query Sessions
// Get session by ID
let session = store.get_session(session_id).await?;
// Get all user sessions
let sessions = store.get_user_sessions(user_id).await?;
// Update session
session.title = Some("Updated Title".to_string());
session.update_time = Utc::now();
store.update_session(&session).await?;
// Delete session
let deleted = store.delete_session(session_id).await?;
Plugin Integration
Integrate persistence into LLM agents:use mofa_foundation::persistence::PersistencePlugin;
let store = PostgresStore::from_env().await?;
let plugin = PersistencePlugin::from_store(
"persistence-plugin",
store,
session_id,
agent_id,
user_id,
tenant_id,
);
let agent = LLMAgentBuilder::new()
.with_provider(provider)
.with_plugin(Arc::new(plugin))
.build();
// Messages are automatically persisted
let response = agent.chat("Hello!").await?;
In-Memory Store
For testing and development:use mofa_foundation::persistence::InMemoryStore;
let store = InMemoryStore::new();
store.save_message(&message).await?;
let messages = store.get_session_messages(session_id).await?;
// No database required!
Agent & Provider Stores
Store agent and provider configurations:// Get agent by code
let agent = store.get_agent_by_code("research-agent").await?;
// Get agent with provider (JOIN query)
let config = store.get_agent_by_code_with_provider("research-agent").await?;
println!("Agent: {}", config.agent.agent_name);
println!("Provider: {}", config.provider.provider_name);
println!("Model: {}", config.agent.model_name);
// List active agents for a tenant
let agents = store.get_active_agents(tenant_id).await?;
// Get enabled providers
let providers = store.get_enabled_providers(tenant_id).await?;
Data Cleanup
Manage data retention:use chrono::Duration;
// Delete API calls older than 90 days
let cutoff = Utc::now() - Duration::days(90);
let deleted = store.cleanup_old_records(cutoff).await?;
println!("Deleted {} old records", deleted);
// Delete session and all related data
let message_count = store.delete_session_messages(session_id).await?;
store.delete_session(session_id).await?;
println!("Deleted session with {} messages", message_count);
Connection Pooling
All backends use connection pooling:// PostgreSQL - default 10 connections
let store = PostgresStore::connect(url).await?;
// MySQL - custom pool size
let store = MySqlStore::connect_with_options(url, 20).await?;
// SQLite - default 5 connections
let store = SqliteStore::connect(url).await?;
// Get pool reference
let pool = store.pool();
Error Handling
use mofa_foundation::persistence::{PersistenceError, PersistenceResult};
match store.save_message(&message).await {
Ok(_) => println!("Saved"),
Err(PersistenceError::Connection(e)) => eprintln!("Connection: {}", e),
Err(PersistenceError::Query(e)) => eprintln!("Query: {}", e),
Err(PersistenceError::Serialization(e)) => eprintln!("Serialization: {}", e),
Err(PersistenceError::NotFound(e)) => eprintln!("Not found: {}", e),
Err(e) => eprintln!("Other: {}", e),
}
Best Practices
- Use connection pooling: Reuse store instances across requests
- Index appropriately: Add indexes for common query patterns
- Clean old data: Implement retention policies
- Monitor costs: Track token usage and API costs
- Secure credentials: Use environment variables for connection strings
- Handle errors: Implement retry logic for transient failures
- Use transactions: Batch related operations
- Paginate queries: Avoid loading large result sets
Environment Variables
# PostgreSQL
export DATABASE_URL="postgres://user:pass@localhost/mofa"
# MySQL
export DATABASE_URL="mysql://user:pass@localhost/mofa"
# SQLite
export DATABASE_URL="sqlite:./data.db"
See Also
- Workflow Orchestration - Persist workflow state
- Monitoring - Track persistence metrics
- Security - Secure database connections