Skip to main content

Database Persistence Layer

MoFA provides a flexible persistence layer for storing LLM interactions, API call metrics, and session data. The persistence layer supports multiple database backends through a unified trait-based API.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   Persistence Architecture                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                   Core Traits (Microkernel)              │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │   │
│  │  │MessageStore │ │ ApiCallStore│ │ SessionStore         │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────┘ │   │
│  └──────────────────────────────────────────────────────────┘   │
│                               │                                 │
│                               ▼                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                   Backend Implementations                │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │   │
│  │  │ PostgreSQL  │ │   MySQL     │ │     SQLite          │ │   │
│  │  │  (sqlx-pg)  │ │ (sqlx-mysql)│ │  (sqlx-sqlite)      │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────┘ │   │
│  │                 ┌─────────────────────────────────────┐   │   │
│  │                 │         In-Memory (default)         │   │   │
│  │                 └─────────────────────────────────────┘   │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Feature Flags

Backend support is controlled via feature flags:
[dependencies]
mofa-foundation = { version = "*", features = ["persistence-postgres"] }
# Or:
# mofa-foundation = { version = "*", features = ["persistence-mysql"] }
# mofa-foundation = { version = "*", features = ["persistence-sqlite"] }

PostgreSQL Backend

Setup

use mofa_foundation::persistence::PostgresStore;
use uuid::Uuid;

// Connect to PostgreSQL
let store = PostgresStore::connect("postgres://user:pass@localhost/mofa").await?;

// Or from environment variable
let store = PostgresStore::from_env().await?;

// With custom connection pool
let store = PostgresStore::connect_with_options(
    "postgres://localhost/mofa",
    20  // max connections
).await?;

Schema

PostgreSQL uses native JSON types for optimal performance:
CREATE TABLE entity_llm_message (
    id UUID PRIMARY KEY,
    chat_session_id UUID NOT NULL,
    agent_id UUID NOT NULL,
    user_id UUID NOT NULL,
    tenant_id UUID,
    parent_message_id UUID,
    role TEXT NOT NULL,
    content JSONB NOT NULL,  -- Native JSON
    create_time TIMESTAMPTZ NOT NULL,
    update_time TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_message_session ON entity_llm_message(chat_session_id);

Store Messages

use mofa_foundation::persistence::{LLMMessage, MessageContent, MessageRole};
use chrono::Utc;

let message = LLMMessage {
    id: Uuid::new_v4(),
    chat_session_id: session_id,
    agent_id: agent_id,
    user_id: user_id,
    tenant_id: Uuid::nil(),
    parent_message_id: None,
    role: MessageRole::User,
    content: MessageContent::Text("What is Rust?".to_string()),
    create_time: Utc::now(),
    update_time: Utc::now(),
};

store.save_message(&message).await?;

Query Messages

// Get single message
let msg = store.get_message(message_id).await?;

// Get all messages in a session
let messages = store.get_session_messages(session_id).await?;

// Paginated query
let messages = store.get_session_messages_paginated(
    session_id,
    offset: 0,
    limit: 50
).await?;

// Count messages
let count = store.count_session_messages(session_id).await?;

MySQL Backend

Setup

use mofa_foundation::persistence::MySqlStore;

let store = MySqlStore::connect("mysql://user:pass@localhost/mofa").await?;

// With custom pool size
let store = MySqlStore::connect_with_options(
    "mysql://localhost/mofa",
    15  // max connections
).await?;

Schema Differences

MySQL stores JSON as TEXT (serialized):
CREATE TABLE entity_llm_message (
    id VARCHAR(36) PRIMARY KEY,
    chat_session_id VARCHAR(36) NOT NULL,
    agent_id VARCHAR(36) NOT NULL,
    user_id VARCHAR(36) NOT NULL,
    tenant_id VARCHAR(36),
    parent_message_id VARCHAR(36),
    role VARCHAR(50) NOT NULL,
    content TEXT NOT NULL,  -- JSON as TEXT
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    INDEX idx_message_session (chat_session_id)
);

Usage

API is identical to PostgreSQL:
store.save_message(&message).await?;
let messages = store.get_session_messages(session_id).await?;

SQLite Backend

Setup

use mofa_foundation::persistence::SqliteStore;

// File-based database
let store = SqliteStore::connect("sqlite:./data.db").await?;

// In-memory database (for testing)
let store = SqliteStore::in_memory().await?;

Auto-migrations

SQLite automatically runs migrations on connect:
let store = SqliteStore::connect("sqlite:./app.db").await?;
// Tables are created automatically

Schema

CREATE TABLE IF NOT EXISTS entity_llm_message (
    id TEXT PRIMARY KEY,
    chat_session_id TEXT NOT NULL,
    agent_id TEXT NOT NULL,
    user_id TEXT NOT NULL,
    tenant_id TEXT,
    parent_message_id TEXT,
    role TEXT NOT NULL,
    content TEXT NOT NULL,
    create_time TEXT NOT NULL,
    update_time TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_message_session 
    ON entity_llm_message(chat_session_id);

API Call Tracking

Track LLM API calls for monitoring and cost analysis:
use mofa_foundation::persistence::{LLMApiCall, ApiCallStatus, TokenDetails};

let api_call = LLMApiCall {
    id: Uuid::new_v4(),
    chat_session_id: session_id,
    agent_id: agent_id,
    user_id: user_id,
    tenant_id: Uuid::nil(),
    request_message_id: request_id,
    response_message_id: response_id,
    model_name: "gpt-4".to_string(),
    status: ApiCallStatus::Success,
    error_message: None,
    error_code: None,
    prompt_tokens: 150,
    completion_tokens: 280,
    total_tokens: 430,
    prompt_tokens_details: Some(TokenDetails {
        cached_tokens: Some(50),
        audio_tokens: None,
    }),
    completion_tokens_details: None,
    total_price: Some(0.0129),  // USD
    price_details: None,
    latency_ms: Some(1245),
    time_to_first_token_ms: Some(340),
    tokens_per_second: Some(35.5),
    api_response_id: Some("chatcmpl-abc123".to_string()),
    create_time: Utc::now(),
    update_time: Utc::now(),
};

store.save_api_call(&api_call).await?;

Query API Calls

use mofa_foundation::persistence::QueryFilter;

// Filter by user
let filter = QueryFilter {
    user_id: Some(user_id),
    session_id: None,
    agent_id: None,
    start_time: None,
    end_time: None,
    status: None,
    model_name: None,
    limit: Some(100),
    offset: Some(0),
};

let calls = store.query_api_calls(&filter).await?;

// Get usage statistics
let stats = store.get_statistics(&filter).await?;
println!("Total calls: {}", stats.total_calls);
println!("Success rate: {:.2}%", 
    stats.success_count as f64 / stats.total_calls as f64 * 100.0);
println!("Total tokens: {}", stats.total_tokens);
println!("Total cost: ${:.4}", stats.total_cost.unwrap_or(0.0));
println!("Avg latency: {:.0}ms", stats.avg_latency_ms.unwrap_or(0.0));

Session Management

Create Session

use mofa_foundation::persistence::ChatSession;
use std::collections::HashMap;

let mut metadata = HashMap::new();
metadata.insert("client".to_string(), json!("web"));
metadata.insert("version".to_string(), json!("1.0.0"));

let session = ChatSession {
    id: Uuid::new_v4(),
    user_id: user_id,
    agent_id: agent_id,
    tenant_id: Uuid::nil(),
    title: Some("Research Session".to_string()),
    metadata,
    create_time: Utc::now(),
    update_time: Utc::now(),
};

store.create_session(&session).await?;

Query Sessions

// Get session by ID
let session = store.get_session(session_id).await?;

// Get all user sessions
let sessions = store.get_user_sessions(user_id).await?;

// Update session
session.title = Some("Updated Title".to_string());
session.update_time = Utc::now();
store.update_session(&session).await?;

// Delete session
let deleted = store.delete_session(session_id).await?;

Plugin Integration

Integrate persistence into LLM agents:
use mofa_foundation::persistence::PersistencePlugin;

let store = PostgresStore::from_env().await?;

let plugin = PersistencePlugin::from_store(
    "persistence-plugin",
    store,
    session_id,
    agent_id,
    user_id,
    tenant_id,
);

let agent = LLMAgentBuilder::new()
    .with_provider(provider)
    .with_plugin(Arc::new(plugin))
    .build();

// Messages are automatically persisted
let response = agent.chat("Hello!").await?;

In-Memory Store

For testing and development:
use mofa_foundation::persistence::InMemoryStore;

let store = InMemoryStore::new();

store.save_message(&message).await?;
let messages = store.get_session_messages(session_id).await?;

// No database required!

Agent & Provider Stores

Store agent and provider configurations:
// Get agent by code
let agent = store.get_agent_by_code("research-agent").await?;

// Get agent with provider (JOIN query)
let config = store.get_agent_by_code_with_provider("research-agent").await?;
println!("Agent: {}", config.agent.agent_name);
println!("Provider: {}", config.provider.provider_name);
println!("Model: {}", config.agent.model_name);

// List active agents for a tenant
let agents = store.get_active_agents(tenant_id).await?;

// Get enabled providers
let providers = store.get_enabled_providers(tenant_id).await?;

Data Cleanup

Manage data retention:
use chrono::Duration;

// Delete API calls older than 90 days
let cutoff = Utc::now() - Duration::days(90);
let deleted = store.cleanup_old_records(cutoff).await?;
println!("Deleted {} old records", deleted);

// Delete session and all related data
let message_count = store.delete_session_messages(session_id).await?;
store.delete_session(session_id).await?;
println!("Deleted session with {} messages", message_count);

Connection Pooling

All backends use connection pooling:
// PostgreSQL - default 10 connections
let store = PostgresStore::connect(url).await?;

// MySQL - custom pool size
let store = MySqlStore::connect_with_options(url, 20).await?;

// SQLite - default 5 connections
let store = SqliteStore::connect(url).await?;

// Get pool reference
let pool = store.pool();

Error Handling

use mofa_foundation::persistence::{PersistenceError, PersistenceResult};

match store.save_message(&message).await {
    Ok(_) => println!("Saved"),
    Err(PersistenceError::Connection(e)) => eprintln!("Connection: {}", e),
    Err(PersistenceError::Query(e)) => eprintln!("Query: {}", e),
    Err(PersistenceError::Serialization(e)) => eprintln!("Serialization: {}", e),
    Err(PersistenceError::NotFound(e)) => eprintln!("Not found: {}", e),
    Err(e) => eprintln!("Other: {}", e),
}

Best Practices

  1. Use connection pooling: Reuse store instances across requests
  2. Index appropriately: Add indexes for common query patterns
  3. Clean old data: Implement retention policies
  4. Monitor costs: Track token usage and API costs
  5. Secure credentials: Use environment variables for connection strings
  6. Handle errors: Implement retry logic for transient failures
  7. Use transactions: Batch related operations
  8. Paginate queries: Avoid loading large result sets

Environment Variables

# PostgreSQL
export DATABASE_URL="postgres://user:pass@localhost/mofa"

# MySQL
export DATABASE_URL="mysql://user:pass@localhost/mofa"

# SQLite
export DATABASE_URL="sqlite:./data.db"

See Also

Build docs developers (and LLMs) love