SQLite is synchronous, but OpenFang is async. The memory substrate bridges this with:
Arc<Mutex<Connection>>
All operations use tokio::task::spawn_blocking to run SQLite queries on the blocking thread pool:
let conn = self.conn.clone();let result = tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); conn.execute("INSERT INTO ...", params![])?; Ok(())}).await??;
This ensures:
Thread safety — no concurrent writes corrupt the database
Async compatibility — no blocking the Tokio runtime
CREATE TABLE structured_memory ( agent_id TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, -- JSON-serialized created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, PRIMARY KEY (agent_id, key));
// Store a valuememory.structured_set( agent_id, "preferences", serde_json::json!({"theme": "dark", "lang": "en"}))?;// Retrieve a valuelet value: Option<serde_json::Value> = memory.structured_get(agent_id, "preferences")?;// List all keys for an agentlet pairs: Vec<(String, serde_json::Value)> = memory.list_kv(agent_id)?;// Delete a keymemory.structured_delete(agent_id, "preferences")?;
A special shared namespace enables cross-agent data sharing:
const SHARED_AGENT_ID: &str = "00000000-0000-0000-0000-000000000001";// Any agent can write to shared memorymemory.structured_set( SHARED_AGENT_ID, "project_config", serde_json::json!({"name": "OpenFang", "version": "0.3.25"}))?;// Any agent can read from shared memorylet config = memory.structured_get(SHARED_AGENT_ID, "project_config")?;
Shared memory is not capability-gated by default. Agents with MemoryRead("*") can read it. Agents with MemoryWrite("shared.*") can write to it.
Vector embeddings for similarity-based memory retrieval. Documents are embedded using the configured embedding driver (OpenAI/Voyage/Cohere) and stored with their vectors.Queries are embedded at search time and matched by cosine similarity.
CREATE TABLE semantic_memory ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, content TEXT NOT NULL, embedding BLOB, -- Serialized vector (msgpack) metadata TEXT, -- JSON created_at INTEGER NOT NULL, access_count INTEGER DEFAULT 0, last_accessed INTEGER);CREATE INDEX idx_semantic_agent ON semantic_memory(agent_id);CREATE INDEX idx_semantic_created ON semantic_memory(created_at);
// Store a document with embeddinglet fragment = MemoryFragment { content: "OpenFang is an agent operating system built in Rust.".to_string(), source: MemorySource::UserInput, metadata: HashMap::new(),};memory.semantic_store(agent_id, fragment, Some(embedding_vector)).await?;// Search by similaritylet query_embedding = embedding_driver.embed("What is OpenFang?").await?;let results: Vec<MemoryFragment> = memory.semantic_search( agent_id, &query_embedding, 5 // top 5 results).await?;
Entity-relation storage for structured knowledge. Agents can store entities (with types and properties) and relations between them.Supports graph traversal queries.
CREATE TABLE knowledge_entities ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, entity_type TEXT NOT NULL, name TEXT NOT NULL, properties TEXT, -- JSON created_at INTEGER NOT NULL);CREATE TABLE knowledge_relations ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, from_entity TEXT NOT NULL, relation_type TEXT NOT NULL, to_entity TEXT NOT NULL, properties TEXT, -- JSON created_at INTEGER NOT NULL, FOREIGN KEY(from_entity) REFERENCES knowledge_entities(id), FOREIGN KEY(to_entity) REFERENCES knowledge_entities(id));
CREATE TABLE sessions ( session_id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, label TEXT, -- Optional user-facing label messages TEXT NOT NULL, -- JSON array of Message token_count INTEGER DEFAULT 0, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL);CREATE INDEX idx_sessions_agent ON sessions(agent_id);CREATE INDEX idx_sessions_label ON sessions(agent_id, label);
CREATE TABLE task_board ( task_id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL, -- pending | claimed | completed created_by TEXT, assigned_to TEXT, result TEXT, created_at INTEGER NOT NULL, claimed_at INTEGER, completed_at INTEGER);CREATE INDEX idx_task_status ON task_board(status);CREATE INDEX idx_task_assigned ON task_board(assigned_to);
Persists token counts and cost estimates for every LLM call:
CREATE TABLE usage_events ( id TEXT PRIMARY KEY, agent_id TEXT NOT NULL, model TEXT NOT NULL, prompt_tokens INTEGER NOT NULL, completion_tokens INTEGER NOT NULL, cost_usd REAL, timestamp INTEGER NOT NULL);CREATE INDEX idx_usage_agent ON usage_events(agent_id);CREATE INDEX idx_usage_timestamp ON usage_events(timestamp);
Cross-channel memory — tracks a user’s conversation context across multiple channels:
CREATE TABLE canonical_sessions ( user_id TEXT NOT NULL, agent_id TEXT NOT NULL, summary TEXT, -- Compacted context summary last_channel TEXT, message_count INTEGER DEFAULT 0, updated_at INTEGER NOT NULL, PRIMARY KEY (user_id, agent_id));
When a user messages an agent:
Load canonical session for (user_id, agent_id)
Inject summary into system prompt (if exists)
Run agent loop
Update canonical session with new message count
Compact canonical session if threshold exceeded
Result: User can start a conversation on Telegram, continue it on Discord, and the agent remembers the full context.Code reference: crates/openfang-memory/src/consolidation.rs
// Agent A posts a tasklet task_id = memory.task_post( "Analyze sales data", "Load Q4 CSV and compute trends", None // Any agent can claim).await?;// Agent B (running on a different machine) claims itlet task = memory.task_claim(agent_b_id).await?;// Agent B completes the tasklet result = analyze_sales(task.description)?;memory.task_complete(task_id, &result).await?;// Agent A checks completionlet completed_tasks = memory.task_list(MemoryFilter::Completed).await?;