Overview
Grip AI uses a dual-layer memory system for infinite context:- MEMORY.md — Structured long-term facts extracted from conversations
- HISTORY.md — Append-only log of conversation summaries
- KnowledgeBase — Typed entries (preferences, decisions, patterns) in JSON
- SemanticCache — SHA-256 keyed cache of recent LLM responses
Memory Manager
TheMemoryManager handles all memory operations:
class MemoryManager:
"""Manages MEMORY.md and HISTORY.md within the workspace.
Provides read/write for both files and an LLM-driven consolidation
process that extracts durable facts from old conversation messages.
"""
_HISTORY_MAX_BYTES: int = 512_000
def __init__(self, workspace_path: Path) -> None:
self._memory_dir = workspace_path / "memory"
self._memory_dir.mkdir(parents=True, exist_ok=True)
self._memory_path = self._memory_dir / "MEMORY.md"
self._history_path = self._memory_dir / "HISTORY.md"
MEMORY.md — Long-Term Facts
Structure
Structured facts with category tags:### Consolidated 2024-02-28
- [user_preference] Prefers TypeScript over JavaScript
- [project_decision] Using PostgreSQL for primary database
- [system_behavior] API rate limit is 1000 requests/hour
- [error_pattern] CORS errors fixed by adding origin header
### Consolidated 2024-02-27
- [user_preference] Works in Pacific timezone (PST/PDT)
- [project_decision] Deployed to AWS us-west-2 region
Writing to Memory
def append_to_memory(self, entry: str) -> None:
"""Append a new fact or section to the end of MEMORY.md.
Uses file append mode instead of read+rewrite for O(1) I/O
regardless of file size.
"""
text = entry.rstrip() + "\n"
if self._memory_path.exists() and self._memory_path.stat().st_size > 0:
with self._memory_path.open("rb") as f:
f.seek(-1, 2)
if f.read(1) != b"\n":
text = "\n" + text
with self._memory_path.open("a", encoding="utf-8") as f:
f.write(text)
Searching Memory
TF-IDF keyword search with optional category filter:def search_memory(
self, query: str, *, max_results: int = 10, category: str | None = None
) -> list[str]:
"""Search MEMORY.md using keyword-weighted relevance scoring.
Same TF-IDF approach as search_history but applied to the structured
facts in MEMORY.md. Returns matching bullet points or sections.
When category is provided, only entries matching `- [category]` are searched.
"""
content = self.read_memory()
if not content:
return []
chunks: list[str] = []
for line in content.splitlines():
stripped = line.strip()
if stripped:
chunks.append(stripped)
if category:
chunks = [c for c in chunks if c.startswith(f"- [{category}]")]
if not chunks:
return []
query_tokens = _tokenize(query)
if not query_tokens:
return chunks[:max_results]
# Simple substring for single-word queries
if len(query_tokens) <= 1:
query_lower = query.lower()
return [c for c in chunks if query_lower in c.lower()][:max_results]
# TF-IDF scoring
doc_freq: Counter[str] = Counter()
for chunk in chunks:
for token in set(_tokenize(chunk)):
doc_freq[token] += 1
total = len(chunks)
scored: list[tuple[float, str]] = []
for chunk in chunks:
chunk_tokens = _tokenize(chunk)
if not chunk_tokens:
continue
tf_counts = Counter(chunk_tokens)
score = 0.0
for qt in query_tokens:
if qt in tf_counts:
tf = tf_counts[qt] / len(chunk_tokens)
df = doc_freq.get(qt, 0)
idf = math.log((total + 1) / (df + 1)) + 1.0
score += tf * idf
if score > 0:
scored.append((score, chunk))
scored.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scored[:max_results]]
Memory Compaction
Deduplicate similar entries using Jaccard similarity:def compact_memory(self, similarity_threshold: float = 0.7) -> int:
"""Deduplicate memory entries using Jaccard similarity on token sets.
Returns number of entries removed.
"""
content = self.read_memory()
if not content:
return 0
chunks = [line.strip() for line in content.splitlines() if line.strip()]
if len(chunks) < 2:
return 0
token_sets = [set(_tokenize(chunk)) for chunk in chunks]
keep = [True] * len(chunks)
candidates = _jaccard_candidates(token_sets, similarity_threshold)
for i in range(len(chunks)):
if not keep[i] or i not in candidates:
continue
for j in sorted(candidates[i]):
if not keep[j]:
continue
intersection = len(token_sets[i] & token_sets[j])
union = len(token_sets[i] | token_sets[j])
if union > 0 and intersection / union >= similarity_threshold:
keep[j] = False
deduplicated = [c for c, k in zip(chunks, keep, strict=True) if k]
removed = len(chunks) - len(deduplicated)
if removed > 0:
self.write_memory("\n".join(deduplicated) + "\n")
logger.info("Memory compacted: removed {} duplicate entries", removed)
return removed
HISTORY.md — Conversation Log
Structure
Timestamped append-only log:[2024-02-28 14:23:45 UTC] User: How do I configure the database connection?
[2024-02-28 14:23:52 UTC] Agent: You can configure the database in config/database.yml. Here's the format...
[2024-02-28 14:25:10 UTC] User: What about connection pooling?
[2024-02-28 14:25:18 UTC] Agent: Connection pooling is configured via the `pool` parameter...
Appending to History
Automatic rotation when file exceeds 512KB:def append_history(self, entry: str) -> None:
"""Append a timestamped entry to HISTORY.md."""
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
line = f"[{timestamp}] {entry.rstrip()}\n"
with self._history_path.open("a", encoding="utf-8") as f:
f.write(line)
try:
if self._history_path.stat().st_size > self._HISTORY_MAX_BYTES:
self._rotate_history()
except OSError:
pass
def _rotate_history(self) -> None:
"""Archive older half of HISTORY.md when file exceeds size threshold."""
lines = self._history_path.read_text(encoding="utf-8").splitlines(keepends=True)
midpoint = len(lines) // 2
if midpoint == 0:
return
archive_name = f"HISTORY.archive.{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}.md"
archive_path = self._memory_dir / archive_name
tmp_archive = archive_path.with_suffix(".tmp")
tmp_archive.write_text("".join(lines[:midpoint]), encoding="utf-8")
tmp_archive.rename(archive_path)
tmp_history = self._history_path.with_suffix(".tmp")
tmp_history.write_text("".join(lines[midpoint:]), encoding="utf-8")
tmp_history.rename(self._history_path)
logger.info("Rotated HISTORY.md: archived {} lines to {}", midpoint, archive_name)
Searching History
TF-IDF with time decay for recency bias:def search_history(
self, query: str, *, max_results: int = 20, decay_rate: float = 0.001
) -> list[str]:
"""Search HISTORY.md using keyword-weighted relevance scoring with time decay.
Tokenizes the query into keywords, scores each history line by
TF-IDF-style relevance (term frequency * inverse document frequency),
applies a time-decay factor so recent entries rank higher, and
returns the top results sorted by score descending.
"""
content = self.read_history()
if not content:
return []
lines = [line for line in content.splitlines() if line.strip()]
if not lines:
return []
query_tokens = _tokenize(query)
if len(query_tokens) <= 1:
query_lower = query.lower()
return [line for line in lines if query_lower in line.lower()][:max_results]
doc_freq: Counter[str] = Counter()
line_token_lists: list[list[str]] = []
for line in lines:
tokens = _tokenize(line)
line_token_lists.append(tokens)
unique = set(tokens)
for token in unique:
doc_freq[token] += 1
total_docs = len(lines)
now = datetime.now(UTC)
scored: list[tuple[float, str]] = []
for idx, line in enumerate(lines):
line_tokens = line_token_lists[idx]
if not line_tokens:
continue
tf_counts = Counter(line_tokens)
score = 0.0
for qt in query_tokens:
if qt in tf_counts:
tf = tf_counts[qt] / len(line_tokens)
df = doc_freq.get(qt, 0)
idf = math.log((total_docs + 1) / (df + 1)) + 1.0
score += tf * idf
if score > 0 and decay_rate > 0:
ts_match = _TIMESTAMP_RE.match(line)
if ts_match:
try:
entry_time = datetime.strptime(
ts_match.group(1), "%Y-%m-%d %H:%M:%S"
).replace(tzinfo=UTC)
age_hours = (now - entry_time).total_seconds() / 3600
score *= 1.0 / (1.0 + age_hours * decay_rate)
except ValueError:
pass
if score > 0:
scored.append((score, line))
scored.sort(key=lambda x: x[0], reverse=True)
return [line for _, line in scored[:max_results]]
Memory Consolidation
When a session exceedsmemory_window * 2 messages, old messages are consolidated:
async def consolidate(
self,
old_messages: list[LLMMessage],
provider: LLMProvider,
model: str,
) -> str:
"""Extract key facts from old messages using the LLM.
Sends the old messages to the LLM with a consolidation prompt,
appends extracted facts to MEMORY.md, and writes a summary
to HISTORY.md.
"""
if not old_messages:
return ""
conversation_text = self._format_messages_for_consolidation(old_messages)
consolidation_prompt = (
"You are a memory consolidation assistant. Review the following conversation "
"and extract the key facts, decisions, and important information that should "
"be remembered long-term.\n\n"
"Rules:\n"
"- Extract only durable facts (user preferences, project decisions, names, "
"technical choices, important outcomes).\n"
"- Skip transient information (greetings, small talk, tool execution details).\n"
"- Format as a bulleted list with concise entries.\n"
"- If there are no important facts to extract, respond with 'No new facts.'\n\n"
f"Conversation:\n{conversation_text}"
)
response = await provider.chat(
[
LLMMessage(role="system", content="You extract key facts from conversations."),
LLMMessage(role="user", content=consolidation_prompt),
],
model=model,
temperature=0.3,
max_tokens=1024,
)
facts = response.content or ""
if facts and "no new facts" not in facts.lower():
self.append_to_memory(
f"\n### Consolidated {datetime.now(UTC).strftime('%Y-%m-%d')}\n{facts}\n"
)
logger.info("Appended consolidated facts to MEMORY.md")
summary = self._build_history_summary(old_messages)
self.append_history(summary)
logger.info("Appended summary to HISTORY.md")
return facts
Triggering Consolidation
Automatic consolidation runs when configured:async def _maybe_consolidate(self, session: Session) -> None:
"""Check if session needs consolidation and run it if so.
Triggered when message count exceeds 2x memory_window.
"""
if not self._memory_mgr:
return
defaults = self._config.agents.defaults
if not defaults.auto_consolidate:
return
if not self._memory_mgr.needs_consolidation(session.message_count, defaults.memory_window):
return
old_messages = session.get_old_messages(defaults.memory_window)
if not old_messages:
return
consolidation_model = defaults.consolidation_model or defaults.model
logger.info(
"Consolidating session '{}': {} old messages using model '{}'",
session.key,
len(old_messages),
consolidation_model,
)
try:
facts = await self._memory_mgr.consolidate(
old_messages, self._provider, consolidation_model
)
if facts and "no new facts" not in facts.lower():
session.summary = f"[Previous conversation context]\n{facts}"
pruned = session.prune_to_window(defaults.memory_window)
if self._session_mgr:
self._session_mgr.save(session)
logger.info(
"Consolidation complete: pruned {} messages, summary saved",
pruned,
)
except Exception as exc:
logger.error("Memory consolidation failed (non-fatal): {}", exc)
Knowledge Base
TheKnowledgeBase stores typed entries in JSON:
@dataclass
class KnowledgeEntry:
id: str
category: str # user_preference, project_decision, system_behavior, etc.
content: str
source: str = ""
tags: list[str] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
accessed_at: float = field(default_factory=time.time)
access_count: int = 0
VALID_CATEGORIES: frozenset[str] = frozenset({
"user_preference",
"project_decision",
"system_behavior",
"learned_fact",
"error_pattern",
})
Adding Entries
Automatic deduplication by content hash:def add(
self,
category: str,
content: str,
*,
source: str = "",
tags: list[str] | None = None,
) -> KnowledgeEntry:
"""Add a knowledge entry. Deduplicates by content hash within category.
If an entry with the same category+content hash already exists,
updates its access time and returns the existing entry.
"""
if category not in VALID_CATEGORIES:
logger.warning(
"Unknown knowledge category '{}', defaulting to 'learned_fact'", category
)
category = "learned_fact"
entry_id = _make_id(category, content)
if entry_id in self._entries:
existing = self._entries[entry_id]
existing.accessed_at = time.time()
existing.access_count += 1
self._mark_dirty()
logger.debug("Knowledge entry already exists (id={}), updated access time", entry_id)
return existing
entry = KnowledgeEntry(
id=entry_id,
category=category,
content=content.strip(),
source=source,
tags=tags or [],
)
self._entries[entry_id] = entry
self._mark_dirty()
logger.info("Added knowledge entry: {} (category={})", entry_id, category)
return entry
Searching Knowledge Base
def search(
self,
query: str = "",
*,
category: str = "",
max_results: int = 20,
) -> list[KnowledgeEntry]:
"""Search knowledge entries by keyword and/or category.
Returns entries sorted by relevance (access count + recency).
"""
results: list[KnowledgeEntry] = []
query_lower = query.lower()
for entry in self._entries.values():
if category and entry.category != category:
continue
if query_lower:
searchable = f"{entry.content} {' '.join(entry.tags)} {entry.source}".lower()
if query_lower not in searchable:
continue
results.append(entry)
# Sort by access_count (descending), then by created_at (descending)
results.sort(key=lambda e: (e.access_count, e.created_at), reverse=True)
return results[:max_results]
Exporting for Context
def export_for_context(self, *, max_chars: int = 2000) -> str:
"""Export the most relevant entries as a text block for LLM context.
Prioritizes frequently accessed entries and user preferences.
Truncates to max_chars to fit within context budgets.
"""
priority_order = [
"user_preference",
"project_decision",
"error_pattern",
"system_behavior",
"learned_fact",
]
sorted_entries: list[KnowledgeEntry] = []
for cat in priority_order:
cat_entries = self.by_category(cat)
sorted_entries.extend(cat_entries)
lines: list[str] = []
total_chars = 0
for entry in sorted_entries:
line = f"[{entry.category}] {entry.content}"
if total_chars + len(line) > max_chars:
break
lines.append(line)
total_chars += len(line) + 1
return "\n".join(lines)
Semantic Cache
TheSemanticCache caches LLM responses by query hash:
class SemanticCache:
"""Disk-backed LLM response cache with TTL expiry.
Cache key is SHA-256 of (normalized_message + model). Responses are
stored with timestamps and evicted when TTL expires or max_entries
is exceeded (LRU by access time).
"""
def __init__(
self,
state_dir: Path,
*,
ttl_seconds: int = 3600,
max_entries: int = 500,
enabled: bool = True,
) -> None:
self._state_dir = state_dir
self._cache_file = state_dir / "semantic_cache.json"
self._ttl = ttl_seconds
self._max_entries = max_entries
self._enabled = enabled
self._cache: dict[str, dict] = self._load()
@staticmethod
def _make_key(message: str, model: str) -> str:
normalized = message.strip().lower()
raw = f"{normalized}||{model}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get(self, message: str, model: str) -> str | None:
if not self._enabled:
return None
key = self._make_key(message, model)
entry = self._cache.get(key)
if entry is None:
return None
# Check TTL expiry
if time.time() - entry.get("created_at", 0) >= self._ttl:
del self._cache[key]
return None
# Update access time for LRU
entry["accessed_at"] = time.time()
logger.debug("Semantic cache hit for key {:.8}...", key)
return entry.get("response")
def put(self, message: str, model: str, response: str) -> None:
if not self._enabled:
return
key = self._make_key(message, model)
now = time.time()
self._cache[key] = {
"response": response,
"model": model,
"created_at": now,
"accessed_at": now,
"message_preview": message[:100],
}
# Evict oldest entries if over capacity (LRU by accessed_at)
if len(self._cache) > self._max_entries:
sorted_keys = sorted(
self._cache.keys(),
key=lambda k: self._cache[k].get("accessed_at", 0),
)
excess = len(self._cache) - self._max_entries
for k in sorted_keys[:excess]:
del self._cache[k]
Configuration
agents:
defaults:
# Memory window
memory_window: 10 # Recent messages kept in context
auto_consolidate: true # Auto-consolidate when exceeded
consolidation_model: gpt-4o-mini # Cheaper model for consolidation
# Semantic cache
semantic_cache_enabled: true
semantic_cache_ttl: 3600 # 1 hour
Next Steps
- Configure Session persistence
- Explore Agent lifecycle
- Understand Tool registry
- Learn about Dual-engine architecture