StoragePlugin provides persistent storage capabilities through the Memory trait, enabling agents to store and retrieve data across sessions.
Overview
Storage plugins implement theMemory trait from mofa-kernel and wrap it as an AgentPlugin, providing:
- Key-value storage
- Conversation history management
- Semantic search (with vector embeddings)
- Session management
- Statistical tracking
Memory Trait
The core storage interface defined inmofa-kernel:
#[async_trait]
pub trait Memory: Send + Sync {
async fn store(&mut self, key: &str, value: MemoryValue) -> AgentResult<()>;
async fn retrieve(&self, key: &str) -> AgentResult<Option<MemoryValue>>;
async fn remove(&mut self, key: &str) -> AgentResult<bool>;
async fn contains(&self, key: &str) -> AgentResult<bool>;
async fn search(&self, query: &str, limit: usize) -> AgentResult<Vec<MemoryItem>>;
async fn clear(&mut self) -> AgentResult<()>;
async fn get_history(&self, session_id: &str) -> AgentResult<Vec<Message>>;
async fn add_to_history(&mut self, session_id: &str, message: Message) -> AgentResult<()>;
async fn clear_history(&mut self, session_id: &str) -> AgentResult<()>;
async fn stats(&self) -> AgentResult<MemoryStats>;
fn memory_type(&self) -> &str;
}
Memory Value Types
pub enum MemoryValue {
Text(String),
Embedding(Vec<f32>),
Structured(serde_json::Value),
Binary(Vec<u8>),
TextWithEmbedding {
text: String,
embedding: Vec<f32>
},
}
Creating Memory Values
// Text value
let value = MemoryValue::text("Hello, world!");
// Embedding vector
let embedding = vec![0.1, 0.2, 0.3, /* ... */];
let value = MemoryValue::embedding(embedding);
// Structured data
let data = serde_json::json!({
"name": "John",
"age": 30
});
let value = MemoryValue::structured(data);
// Text with embedding (for semantic search)
let value = MemoryValue::text_with_embedding(
"Technical documentation",
embedding_vector
);
Creating a Storage Plugin
use mofa_kernel::agent::components::memory::Memory;
use mofa_kernel::plugin::{AgentPlugin, PluginMetadata, PluginType};
struct StoragePlugin {
metadata: PluginMetadata,
memory: Box<dyn Memory>,
state: PluginState,
}
impl StoragePlugin {
pub fn new<M: Memory + 'static>(memory: M) -> Self {
let metadata = PluginMetadata::new(
"storage",
"Storage Plugin",
PluginType::Storage,
);
Self {
metadata,
memory: Box::new(memory),
state: PluginState::Unloaded,
}
}
}
Storage Operations
Store Data
// Store simple text
memory.store("user_name", MemoryValue::text("Alice")).await?;
// Store structured data
let profile = serde_json::json!({
"name": "Alice",
"email": "[email protected]",
"preferences": {
"theme": "dark",
"language": "en"
}
});
memory.store("user_profile", MemoryValue::structured(profile)).await?;
// Store with embedding for semantic search
memory.store(
"doc_1",
MemoryValue::text_with_embedding(
"Rust is a systems programming language",
generate_embedding("Rust is a systems programming language")
)
).await?;
Retrieve Data
if let Some(value) = memory.retrieve("user_name").await? {
if let Some(text) = value.as_text() {
println!("User name: {}", text);
}
}
// Check existence
if memory.contains("user_profile").await? {
println!("Profile exists");
}
Search
Perform semantic search across stored items:let results = memory.search("programming languages", 10).await?;
for item in results {
println!("Key: {}, Score: {:.2}", item.key, item.score);
if let Some(text) = item.value.as_text() {
println!("Content: {}", text);
}
}
Remove Data
if memory.remove("old_key").await? {
println!("Item removed");
} else {
println!("Item not found");
}
// Clear all data
memory.clear().await?;
Conversation History
Add Messages
use mofa_kernel::agent::components::memory::{Message, MessageRole};
let session_id = "session_123";
// Add system message
memory.add_to_history(
session_id,
Message::system("You are a helpful assistant.")
).await?;
// Add user message
memory.add_to_history(
session_id,
Message::user("Hello!")
).await?;
// Add assistant message
memory.add_to_history(
session_id,
Message::assistant("Hi! How can I help you today?")
).await?;
// Add tool message
memory.add_to_history(
session_id,
Message::tool("calculator", "{\"result\": 42}")
).await?;
Retrieve History
let history = memory.get_history(session_id).await?;
for msg in history {
println!("{}: {}", msg.role, msg.content);
}
Clear History
memory.clear_history(session_id).await?;
Memory Item
Search results returnMemoryItem with metadata:
pub struct MemoryItem {
pub key: String,
pub value: MemoryValue,
pub score: f32, // Similarity score (0.0 - 1.0)
pub metadata: HashMap<String, String>,
pub created_at: u64, // Timestamp
pub last_accessed: u64, // Timestamp
}
Creating Memory Items
let item = MemoryItem::new("key1", MemoryValue::text("value"))
.with_score(0.95)
.with_metadata("category", "docs")
.with_metadata("author", "system");
Memory Statistics
let stats = memory.stats().await?;
println!("Total items: {}", stats.total_items);
println!("Total sessions: {}", stats.total_sessions);
println!("Total messages: {}", stats.total_messages);
println!("Memory usage: {} bytes", stats.memory_bytes);
Example: In-Memory Storage
use std::collections::HashMap;
use tokio::sync::RwLock;
struct InMemoryStorage {
data: Arc<RwLock<HashMap<String, MemoryValue>>>,
history: Arc<RwLock<HashMap<String, Vec<Message>>>>,
}
impl InMemoryStorage {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(HashMap::new())),
history: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl Memory for InMemoryStorage {
async fn store(&mut self, key: &str, value: MemoryValue) -> AgentResult<()> {
let mut data = self.data.write().await;
data.insert(key.to_string(), value);
Ok(())
}
async fn retrieve(&self, key: &str) -> AgentResult<Option<MemoryValue>> {
let data = self.data.read().await;
Ok(data.get(key).cloned())
}
async fn remove(&mut self, key: &str) -> AgentResult<bool> {
let mut data = self.data.write().await;
Ok(data.remove(key).is_some())
}
async fn search(&self, query: &str, limit: usize) -> AgentResult<Vec<MemoryItem>> {
// Simple text search (production: use vector similarity)
let data = self.data.read().await;
let mut results = Vec::new();
for (key, value) in data.iter() {
if let Some(text) = value.as_text() {
if text.contains(query) {
results.push(MemoryItem::new(key.clone(), value.clone())
.with_score(1.0));
}
}
}
results.truncate(limit);
Ok(results)
}
async fn clear(&mut self) -> AgentResult<()> {
self.data.write().await.clear();
Ok(())
}
async fn get_history(&self, session_id: &str) -> AgentResult<Vec<Message>> {
let history = self.history.read().await;
Ok(history.get(session_id).cloned().unwrap_or_default())
}
async fn add_to_history(
&mut self,
session_id: &str,
message: Message,
) -> AgentResult<()> {
let mut history = self.history.write().await;
history.entry(session_id.to_string())
.or_insert_with(Vec::new)
.push(message);
Ok(())
}
async fn clear_history(&mut self, session_id: &str) -> AgentResult<()> {
let mut history = self.history.write().await;
history.remove(session_id);
Ok(())
}
fn memory_type(&self) -> &str {
"in-memory"
}
}
Example: Redis Storage
use redis::AsyncCommands;
struct RedisStorage {
client: redis::Client,
}
#[async_trait]
impl Memory for RedisStorage {
async fn store(&mut self, key: &str, value: MemoryValue) -> AgentResult<()> {
let mut conn = self.client.get_async_connection().await?;
let serialized = serde_json::to_string(&value)?;
conn.set(key, serialized).await?;
Ok(())
}
async fn retrieve(&self, key: &str) -> AgentResult<Option<MemoryValue>> {
let mut conn = self.client.get_async_connection().await?;
let data: Option<String> = conn.get(key).await?;
match data {
Some(s) => Ok(Some(serde_json::from_str(&s)?)),
None => Ok(None),
}
}
// ... other methods
}
Complete Plugin Example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create storage backend
let storage = InMemoryStorage::new();
// Create storage plugin
let storage_plugin = StoragePlugin::new(storage);
// Add to agent
let runtime = AgentBuilder::new("assistant", "AI Assistant")
.with_plugin(Box::new(storage_plugin))
.with_agent(my_agent)
.await?;
// Use storage via plugin
let store_request = serde_json::json!({
"operation": "store",
"key": "user_prefs",
"value": {
"theme": "dark",
"language": "en"
}
});
runtime.execute_plugin(
"storage",
serde_json::to_string(&store_request)?
).await?;
Ok(())
}
Related
- AgentPlugin Trait - Base plugin interface
- Memory Component - Memory trait details
- Vector Databases - Semantic search setup