Skip to main content

Overview

Grip AI uses a dual-layer memory system for infinite context:
  1. MEMORY.md — Structured long-term facts extracted from conversations
  2. HISTORY.md — Append-only log of conversation summaries
Both files use TF-IDF keyword search for relevance-scored retrieval, plus optional time decay for recency bias. Additional memory components:
  • KnowledgeBase — Typed entries (preferences, decisions, patterns) in JSON
  • SemanticCache — SHA-256 keyed cache of recent LLM responses

Memory Manager

The MemoryManager handles all memory operations:
class MemoryManager:
    """Manages MEMORY.md and HISTORY.md within the workspace.

    Provides read/write for both files and an LLM-driven consolidation
    process that extracts durable facts from old conversation messages.
    """

    _HISTORY_MAX_BYTES: int = 512_000

    def __init__(self, workspace_path: Path) -> None:
        self._memory_dir = workspace_path / "memory"
        self._memory_dir.mkdir(parents=True, exist_ok=True)
        self._memory_path = self._memory_dir / "MEMORY.md"
        self._history_path = self._memory_dir / "HISTORY.md"

MEMORY.md — Long-Term Facts

Structure

Structured facts with category tags:
### Consolidated 2024-02-28
- [user_preference] Prefers TypeScript over JavaScript
- [project_decision] Using PostgreSQL for primary database
- [system_behavior] API rate limit is 1000 requests/hour
- [error_pattern] CORS errors fixed by adding origin header

### Consolidated 2024-02-27
- [user_preference] Works in Pacific timezone (PST/PDT)
- [project_decision] Deployed to AWS us-west-2 region

Writing to Memory

def append_to_memory(self, entry: str) -> None:
    """Append a new fact or section to the end of MEMORY.md.

    Uses file append mode instead of read+rewrite for O(1) I/O
    regardless of file size.
    """
    text = entry.rstrip() + "\n"
    if self._memory_path.exists() and self._memory_path.stat().st_size > 0:
        with self._memory_path.open("rb") as f:
            f.seek(-1, 2)
            if f.read(1) != b"\n":
                text = "\n" + text
    with self._memory_path.open("a", encoding="utf-8") as f:
        f.write(text)

Searching Memory

TF-IDF keyword search with optional category filter:
def search_memory(
    self, query: str, *, max_results: int = 10, category: str | None = None
) -> list[str]:
    """Search MEMORY.md using keyword-weighted relevance scoring.

    Same TF-IDF approach as search_history but applied to the structured
    facts in MEMORY.md. Returns matching bullet points or sections.
    When category is provided, only entries matching `- [category]` are searched.
    """
    content = self.read_memory()
    if not content:
        return []

    chunks: list[str] = []
    for line in content.splitlines():
        stripped = line.strip()
        if stripped:
            chunks.append(stripped)

    if category:
        chunks = [c for c in chunks if c.startswith(f"- [{category}]")]

    if not chunks:
        return []

    query_tokens = _tokenize(query)
    if not query_tokens:
        return chunks[:max_results]

    # Simple substring for single-word queries
    if len(query_tokens) <= 1:
        query_lower = query.lower()
        return [c for c in chunks if query_lower in c.lower()][:max_results]

    # TF-IDF scoring
    doc_freq: Counter[str] = Counter()
    for chunk in chunks:
        for token in set(_tokenize(chunk)):
            doc_freq[token] += 1

    total = len(chunks)
    scored: list[tuple[float, str]] = []
    for chunk in chunks:
        chunk_tokens = _tokenize(chunk)
        if not chunk_tokens:
            continue
        tf_counts = Counter(chunk_tokens)
        score = 0.0
        for qt in query_tokens:
            if qt in tf_counts:
                tf = tf_counts[qt] / len(chunk_tokens)
                df = doc_freq.get(qt, 0)
                idf = math.log((total + 1) / (df + 1)) + 1.0
                score += tf * idf
        if score > 0:
            scored.append((score, chunk))

    scored.sort(key=lambda x: x[0], reverse=True)
    return [chunk for _, chunk in scored[:max_results]]

Memory Compaction

Deduplicate similar entries using Jaccard similarity:
def compact_memory(self, similarity_threshold: float = 0.7) -> int:
    """Deduplicate memory entries using Jaccard similarity on token sets.

    Returns number of entries removed.
    """
    content = self.read_memory()
    if not content:
        return 0
    chunks = [line.strip() for line in content.splitlines() if line.strip()]
    if len(chunks) < 2:
        return 0

    token_sets = [set(_tokenize(chunk)) for chunk in chunks]
    keep = [True] * len(chunks)
    candidates = _jaccard_candidates(token_sets, similarity_threshold)

    for i in range(len(chunks)):
        if not keep[i] or i not in candidates:
            continue
        for j in sorted(candidates[i]):
            if not keep[j]:
                continue
            intersection = len(token_sets[i] & token_sets[j])
            union = len(token_sets[i] | token_sets[j])
            if union > 0 and intersection / union >= similarity_threshold:
                keep[j] = False

    deduplicated = [c for c, k in zip(chunks, keep, strict=True) if k]
    removed = len(chunks) - len(deduplicated)
    if removed > 0:
        self.write_memory("\n".join(deduplicated) + "\n")
        logger.info("Memory compacted: removed {} duplicate entries", removed)
    return removed

HISTORY.md — Conversation Log

Structure

Timestamped append-only log:
[2024-02-28 14:23:45 UTC] User: How do I configure the database connection?
[2024-02-28 14:23:52 UTC] Agent: You can configure the database in config/database.yml. Here's the format...
[2024-02-28 14:25:10 UTC] User: What about connection pooling?
[2024-02-28 14:25:18 UTC] Agent: Connection pooling is configured via the `pool` parameter...

Appending to History

Automatic rotation when file exceeds 512KB:
def append_history(self, entry: str) -> None:
    """Append a timestamped entry to HISTORY.md."""
    timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
    line = f"[{timestamp}] {entry.rstrip()}\n"

    with self._history_path.open("a", encoding="utf-8") as f:
        f.write(line)

    try:
        if self._history_path.stat().st_size > self._HISTORY_MAX_BYTES:
            self._rotate_history()
    except OSError:
        pass

def _rotate_history(self) -> None:
    """Archive older half of HISTORY.md when file exceeds size threshold."""
    lines = self._history_path.read_text(encoding="utf-8").splitlines(keepends=True)
    midpoint = len(lines) // 2
    if midpoint == 0:
        return
    archive_name = f"HISTORY.archive.{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}.md"
    archive_path = self._memory_dir / archive_name
    tmp_archive = archive_path.with_suffix(".tmp")
    tmp_archive.write_text("".join(lines[:midpoint]), encoding="utf-8")
    tmp_archive.rename(archive_path)
    tmp_history = self._history_path.with_suffix(".tmp")
    tmp_history.write_text("".join(lines[midpoint:]), encoding="utf-8")
    tmp_history.rename(self._history_path)
    logger.info("Rotated HISTORY.md: archived {} lines to {}", midpoint, archive_name)

Searching History

TF-IDF with time decay for recency bias:
def search_history(
    self, query: str, *, max_results: int = 20, decay_rate: float = 0.001
) -> list[str]:
    """Search HISTORY.md using keyword-weighted relevance scoring with time decay.

    Tokenizes the query into keywords, scores each history line by
    TF-IDF-style relevance (term frequency * inverse document frequency),
    applies a time-decay factor so recent entries rank higher, and
    returns the top results sorted by score descending.
    """
    content = self.read_history()
    if not content:
        return []

    lines = [line for line in content.splitlines() if line.strip()]
    if not lines:
        return []

    query_tokens = _tokenize(query)

    if len(query_tokens) <= 1:
        query_lower = query.lower()
        return [line for line in lines if query_lower in line.lower()][:max_results]

    doc_freq: Counter[str] = Counter()
    line_token_lists: list[list[str]] = []
    for line in lines:
        tokens = _tokenize(line)
        line_token_lists.append(tokens)
        unique = set(tokens)
        for token in unique:
            doc_freq[token] += 1

    total_docs = len(lines)
    now = datetime.now(UTC)

    scored: list[tuple[float, str]] = []
    for idx, line in enumerate(lines):
        line_tokens = line_token_lists[idx]
        if not line_tokens:
            continue
        tf_counts = Counter(line_tokens)
        score = 0.0
        for qt in query_tokens:
            if qt in tf_counts:
                tf = tf_counts[qt] / len(line_tokens)
                df = doc_freq.get(qt, 0)
                idf = math.log((total_docs + 1) / (df + 1)) + 1.0
                score += tf * idf
        if score > 0 and decay_rate > 0:
            ts_match = _TIMESTAMP_RE.match(line)
            if ts_match:
                try:
                    entry_time = datetime.strptime(
                        ts_match.group(1), "%Y-%m-%d %H:%M:%S"
                    ).replace(tzinfo=UTC)
                    age_hours = (now - entry_time).total_seconds() / 3600
                    score *= 1.0 / (1.0 + age_hours * decay_rate)
                except ValueError:
                    pass
        if score > 0:
            scored.append((score, line))

    scored.sort(key=lambda x: x[0], reverse=True)
    return [line for _, line in scored[:max_results]]

Memory Consolidation

When a session exceeds memory_window * 2 messages, old messages are consolidated:
async def consolidate(
    self,
    old_messages: list[LLMMessage],
    provider: LLMProvider,
    model: str,
) -> str:
    """Extract key facts from old messages using the LLM.

    Sends the old messages to the LLM with a consolidation prompt,
    appends extracted facts to MEMORY.md, and writes a summary
    to HISTORY.md.
    """
    if not old_messages:
        return ""

    conversation_text = self._format_messages_for_consolidation(old_messages)

    consolidation_prompt = (
        "You are a memory consolidation assistant. Review the following conversation "
        "and extract the key facts, decisions, and important information that should "
        "be remembered long-term.\n\n"
        "Rules:\n"
        "- Extract only durable facts (user preferences, project decisions, names, "
        "technical choices, important outcomes).\n"
        "- Skip transient information (greetings, small talk, tool execution details).\n"
        "- Format as a bulleted list with concise entries.\n"
        "- If there are no important facts to extract, respond with 'No new facts.'\n\n"
        f"Conversation:\n{conversation_text}"
    )

    response = await provider.chat(
        [
            LLMMessage(role="system", content="You extract key facts from conversations."),
            LLMMessage(role="user", content=consolidation_prompt),
        ],
        model=model,
        temperature=0.3,
        max_tokens=1024,
    )

    facts = response.content or ""

    if facts and "no new facts" not in facts.lower():
        self.append_to_memory(
            f"\n### Consolidated {datetime.now(UTC).strftime('%Y-%m-%d')}\n{facts}\n"
        )
        logger.info("Appended consolidated facts to MEMORY.md")

    summary = self._build_history_summary(old_messages)
    self.append_history(summary)
    logger.info("Appended summary to HISTORY.md")

    return facts

Triggering Consolidation

Automatic consolidation runs when configured:
async def _maybe_consolidate(self, session: Session) -> None:
    """Check if session needs consolidation and run it if so.

    Triggered when message count exceeds 2x memory_window.
    """
    if not self._memory_mgr:
        return
    defaults = self._config.agents.defaults
    if not defaults.auto_consolidate:
        return
    if not self._memory_mgr.needs_consolidation(session.message_count, defaults.memory_window):
        return

    old_messages = session.get_old_messages(defaults.memory_window)
    if not old_messages:
        return

    consolidation_model = defaults.consolidation_model or defaults.model
    logger.info(
        "Consolidating session '{}': {} old messages using model '{}'",
        session.key,
        len(old_messages),
        consolidation_model,
    )

    try:
        facts = await self._memory_mgr.consolidate(
            old_messages, self._provider, consolidation_model
        )
        if facts and "no new facts" not in facts.lower():
            session.summary = f"[Previous conversation context]\n{facts}"
        pruned = session.prune_to_window(defaults.memory_window)
        if self._session_mgr:
            self._session_mgr.save(session)
        logger.info(
            "Consolidation complete: pruned {} messages, summary saved",
            pruned,
        )
    except Exception as exc:
        logger.error("Memory consolidation failed (non-fatal): {}", exc)

Knowledge Base

The KnowledgeBase stores typed entries in JSON:
@dataclass
class KnowledgeEntry:
    id: str
    category: str  # user_preference, project_decision, system_behavior, etc.
    content: str
    source: str = ""
    tags: list[str] = field(default_factory=list)
    created_at: float = field(default_factory=time.time)
    accessed_at: float = field(default_factory=time.time)
    access_count: int = 0

VALID_CATEGORIES: frozenset[str] = frozenset({
    "user_preference",
    "project_decision",
    "system_behavior",
    "learned_fact",
    "error_pattern",
})

Adding Entries

Automatic deduplication by content hash:
def add(
    self,
    category: str,
    content: str,
    *,
    source: str = "",
    tags: list[str] | None = None,
) -> KnowledgeEntry:
    """Add a knowledge entry. Deduplicates by content hash within category.

    If an entry with the same category+content hash already exists,
    updates its access time and returns the existing entry.
    """
    if category not in VALID_CATEGORIES:
        logger.warning(
            "Unknown knowledge category '{}', defaulting to 'learned_fact'", category
        )
        category = "learned_fact"

    entry_id = _make_id(category, content)

    if entry_id in self._entries:
        existing = self._entries[entry_id]
        existing.accessed_at = time.time()
        existing.access_count += 1
        self._mark_dirty()
        logger.debug("Knowledge entry already exists (id={}), updated access time", entry_id)
        return existing

    entry = KnowledgeEntry(
        id=entry_id,
        category=category,
        content=content.strip(),
        source=source,
        tags=tags or [],
    )
    self._entries[entry_id] = entry
    self._mark_dirty()
    logger.info("Added knowledge entry: {} (category={})", entry_id, category)
    return entry

Searching Knowledge Base

def search(
    self,
    query: str = "",
    *,
    category: str = "",
    max_results: int = 20,
) -> list[KnowledgeEntry]:
    """Search knowledge entries by keyword and/or category.

    Returns entries sorted by relevance (access count + recency).
    """
    results: list[KnowledgeEntry] = []
    query_lower = query.lower()

    for entry in self._entries.values():
        if category and entry.category != category:
            continue
        if query_lower:
            searchable = f"{entry.content} {' '.join(entry.tags)} {entry.source}".lower()
            if query_lower not in searchable:
                continue
        results.append(entry)

    # Sort by access_count (descending), then by created_at (descending)
    results.sort(key=lambda e: (e.access_count, e.created_at), reverse=True)
    return results[:max_results]

Exporting for Context

def export_for_context(self, *, max_chars: int = 2000) -> str:
    """Export the most relevant entries as a text block for LLM context.

    Prioritizes frequently accessed entries and user preferences.
    Truncates to max_chars to fit within context budgets.
    """
    priority_order = [
        "user_preference",
        "project_decision",
        "error_pattern",
        "system_behavior",
        "learned_fact",
    ]
    sorted_entries: list[KnowledgeEntry] = []
    for cat in priority_order:
        cat_entries = self.by_category(cat)
        sorted_entries.extend(cat_entries)

    lines: list[str] = []
    total_chars = 0
    for entry in sorted_entries:
        line = f"[{entry.category}] {entry.content}"
        if total_chars + len(line) > max_chars:
            break
        lines.append(line)
        total_chars += len(line) + 1

    return "\n".join(lines)

Semantic Cache

The SemanticCache caches LLM responses by query hash:
class SemanticCache:
    """Disk-backed LLM response cache with TTL expiry.

    Cache key is SHA-256 of (normalized_message + model). Responses are
    stored with timestamps and evicted when TTL expires or max_entries
    is exceeded (LRU by access time).
    """

    def __init__(
        self,
        state_dir: Path,
        *,
        ttl_seconds: int = 3600,
        max_entries: int = 500,
        enabled: bool = True,
    ) -> None:
        self._state_dir = state_dir
        self._cache_file = state_dir / "semantic_cache.json"
        self._ttl = ttl_seconds
        self._max_entries = max_entries
        self._enabled = enabled
        self._cache: dict[str, dict] = self._load()

    @staticmethod
    def _make_key(message: str, model: str) -> str:
        normalized = message.strip().lower()
        raw = f"{normalized}||{model}"
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def get(self, message: str, model: str) -> str | None:
        if not self._enabled:
            return None

        key = self._make_key(message, model)
        entry = self._cache.get(key)
        if entry is None:
            return None

        # Check TTL expiry
        if time.time() - entry.get("created_at", 0) >= self._ttl:
            del self._cache[key]
            return None

        # Update access time for LRU
        entry["accessed_at"] = time.time()
        logger.debug("Semantic cache hit for key {:.8}...", key)
        return entry.get("response")

    def put(self, message: str, model: str, response: str) -> None:
        if not self._enabled:
            return

        key = self._make_key(message, model)
        now = time.time()
        self._cache[key] = {
            "response": response,
            "model": model,
            "created_at": now,
            "accessed_at": now,
            "message_preview": message[:100],
        }

        # Evict oldest entries if over capacity (LRU by accessed_at)
        if len(self._cache) > self._max_entries:
            sorted_keys = sorted(
                self._cache.keys(),
                key=lambda k: self._cache[k].get("accessed_at", 0),
            )
            excess = len(self._cache) - self._max_entries
            for k in sorted_keys[:excess]:
                del self._cache[k]

Configuration

agents:
  defaults:
    # Memory window
    memory_window: 10  # Recent messages kept in context
    auto_consolidate: true  # Auto-consolidate when exceeded
    consolidation_model: gpt-4o-mini  # Cheaper model for consolidation

    # Semantic cache
    semantic_cache_enabled: true
    semantic_cache_ttl: 3600  # 1 hour

Next Steps

Build docs developers (and LLMs) love