Overview
Grip AI’s session system provides persistent conversation history per user/channel. Each session is stored as a JSON file with:
- Message history — Full conversation with tool calls
- Summary — Consolidated context from old messages
- Timestamps — Creation and last update times
- Metadata — Session key and configuration
Sessions use:
- Atomic writes — Temp file + rename for crash safety
- LRU cache — In-memory cache with configurable size
- Lazy loading — Sessions loaded on-demand from disk
Session Data Structure
@dataclass
class Session:
"""A single conversation session with its message history."""
key: str
messages: list[LLMMessage] = field(default_factory=list)
summary: str | None = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def add_message(self, msg: LLMMessage) -> None:
self.messages.append(msg)
self.updated_at = time.time()
def get_recent(self, window: int) -> list[LLMMessage]:
"""Return the last `window` messages for LLM context."""
if len(self.messages) <= window:
return list(self.messages)
return self.messages[-window:]
def get_old_messages(self, window: int) -> list[LLMMessage]:
"""Return messages older than the recent `window` (candidates for consolidation)."""
if len(self.messages) <= window:
return []
return self.messages[:-window]
def prune_to_window(self, window: int) -> int:
"""Remove messages older than the recent `window`. Returns count of pruned messages."""
if len(self.messages) <= window:
return 0
pruned_count = len(self.messages) - window
self.messages = self.messages[-window:]
self.updated_at = time.time()
return pruned_count
@property
def message_count(self) -> int:
return len(self.messages)
Session Manager
The SessionManager handles disk persistence and caching:
class SessionManager:
"""Manages conversation session files on disk.
Sessions are stored as individual JSON files in the sessions/ directory
within the workspace. All writes are atomic (temp file + rename).
"""
_DEFAULT_MAX_CACHE = 200
def __init__(self, sessions_dir: Path, max_cache_size: int = _DEFAULT_MAX_CACHE) -> None:
self._dir = sessions_dir
self._dir.mkdir(parents=True, exist_ok=True)
self._cache: dict[str, Session] = {}
self._max_cache_size = max_cache_size
def _path_for(self, key: str) -> Path:
return self._dir / f"{_sanitize_key(key)}.json"
Session Keys
Session keys follow the format: channel:identifier
CLI
Telegram
Discord
Slack
Subagent
Default session for terminal usage. Per-user session using Telegram user ID. Per-user session using Discord user ID. Per-user session using Slack user ID. Isolated session for spawned subagents.
Loading Sessions
Get or Create
def get_or_create(self, key: str) -> Session:
"""Load an existing session from disk, or create a new empty one."""
if key in self._cache:
return self._cache[key]
path = self._path_for(key)
if path.exists():
try:
data = json.loads(path.read_text(encoding="utf-8"))
session = _dict_to_session(data)
self._cache[key] = session
logger.debug("Loaded session '{}' ({} messages)", key, session.message_count)
return session
except (json.JSONDecodeError, KeyError) as exc:
logger.warning("Corrupt session file {}, creating new: {}", path, exc)
session = Session(key=key)
self._cache[key] = session
self._evict_if_needed()
logger.debug("Created new session: {}", key)
return session
Get Without Creating
def get(self, key: str) -> Session | None:
"""Load an existing session, or return None if it doesn't exist."""
if key in self._cache:
return self._cache[key]
path = self._path_for(key)
if not path.exists():
return None
try:
data = json.loads(path.read_text(encoding="utf-8"))
session = _dict_to_session(data)
self._cache[key] = session
self._evict_if_needed()
return session
except (json.JSONDecodeError, KeyError) as exc:
logger.warning("Corrupt session file {}: {}", path, exc)
return None
Saving Sessions
Atomic writes with temp file + rename:
def save(self, session: Session) -> None:
"""Persist a session to disk atomically."""
session.updated_at = time.time()
path = self._path_for(session.key)
tmp_path = path.with_suffix(".tmp")
data = _session_to_dict(session)
tmp_path.write_text(
json.dumps(data, ensure_ascii=False, separators=(",", ":")),
encoding="utf-8",
)
tmp_path.rename(path)
self._cache[session.key] = session
self._evict_if_needed()
logger.debug("Saved session '{}' ({} messages)", session.key, session.message_count)
Session Serialization
Message Serialization
def _message_to_dict(msg: LLMMessage) -> dict[str, Any]:
"""Serialize an LLMMessage to a JSON-safe dict."""
d: dict[str, Any] = {"role": msg.role}
if msg.content is not None:
d["content"] = msg.content
if msg.tool_calls:
d["tool_calls"] = [
{
"id": tc.id,
"function_name": tc.function_name,
"arguments": tc.arguments,
}
for tc in msg.tool_calls
]
if msg.tool_call_id is not None:
d["tool_call_id"] = msg.tool_call_id
if msg.name is not None:
d["name"] = msg.name
return d
def _dict_to_message(d: dict[str, Any]) -> LLMMessage:
"""Deserialize a dict back into an LLMMessage."""
tool_calls = [
ToolCall(
id=tc["id"],
function_name=tc["function_name"],
arguments=tc["arguments"],
)
for tc in d.get("tool_calls", [])
]
return LLMMessage(
role=d["role"],
content=d.get("content"),
tool_calls=tool_calls,
tool_call_id=d.get("tool_call_id"),
name=d.get("name"),
)
Session Serialization
def _session_to_dict(session: Session) -> dict[str, Any]:
return {
"key": session.key,
"messages": [_message_to_dict(m) for m in session.messages],
"summary": session.summary,
"created_at": session.created_at,
"updated_at": session.updated_at,
}
def _dict_to_session(d: dict[str, Any]) -> Session:
return Session(
key=d["key"],
messages=[_dict_to_message(m) for m in d.get("messages", [])],
summary=d.get("summary"),
created_at=d.get("created_at", time.time()),
updated_at=d.get("updated_at", time.time()),
)
LRU Cache
The session cache evicts least-recently-updated sessions when full:
def _evict_if_needed(self) -> None:
"""Evict least-recently-updated sessions when cache exceeds max size."""
if len(self._cache) <= self._max_cache_size:
return
sorted_keys = sorted(self._cache, key=lambda k: self._cache[k].updated_at)
excess = len(self._cache) - self._max_cache_size
for key in sorted_keys[:excess]:
del self._cache[key]
Default cache size: 200 sessions
Evicted sessions are still persisted on disk. They’ll be reloaded on next access.
Session Lifecycle
1. Agent Run Starts
Load or create session:
if self._session_mgr:
session = self._session_mgr.get_or_create(session_key)
history = session.get_recent(immediate_window)
session_summary = session.summary
else:
history = []
2. Build Context
Recent messages + summary injected into context:
messages: list[LLMMessage] = [system_msg]
if session_summary:
messages.append(LLMMessage(role="system", content=session_summary))
messages.extend(history)
messages.append(LLMMessage(role="user", content=user_message))
3. Agent Run Completes
Persist user message + assistant response:
def _persist_session(
self, session: Session | None, user_message: str, assistant_response: str
) -> None:
if session is None or self._session_mgr is None:
return
session.add_message(LLMMessage(role="user", content=user_message))
session.add_message(LLMMessage(role="assistant", content=assistant_response))
self._session_mgr.save(session)
if self._memory_mgr:
self._memory_mgr.append_history(f"User: {user_message[:200]}")
self._memory_mgr.append_history(f"Assistant: {assistant_response[:200]}")
logger.debug(
"Session '{}' saved ({} messages)",
session.key,
session.message_count,
)
4. Consolidation Check
If message count exceeds threshold, trigger consolidation:
async def _maybe_consolidate(self, session: Session) -> None:
if not self._memory_mgr:
return
defaults = self._config.agents.defaults
if not defaults.auto_consolidate:
return
if not self._memory_mgr.needs_consolidation(session.message_count, defaults.memory_window):
return
old_messages = session.get_old_messages(defaults.memory_window)
if not old_messages:
return
consolidation_model = defaults.consolidation_model or defaults.model
logger.info(
"Consolidating session '{}': {} old messages using model '{}'",
session.key,
len(old_messages),
consolidation_model,
)
try:
facts = await self._memory_mgr.consolidate(
old_messages, self._provider, consolidation_model
)
if facts and "no new facts" not in facts.lower():
session.summary = f"[Previous conversation context]\n{facts}"
pruned = session.prune_to_window(defaults.memory_window)
if self._session_mgr:
self._session_mgr.save(session)
logger.info(
"Consolidation complete: pruned {} messages, summary saved",
pruned,
)
except Exception as exc:
logger.error("Memory consolidation failed (non-fatal): {}", exc)
Session Management Commands
List Sessions
def list_sessions(self) -> list[str]:
"""Return all session keys found on disk.
Uses the in-memory cache for sessions already loaded (avoids
re-reading their JSON files). Only reads JSON for sessions
not yet in cache.
"""
keys: set[str] = set(self._cache.keys())
cached_stems = {_sanitize_key(k) for k in keys}
for path in self._dir.glob("*.json"):
if path.stem in cached_stems:
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
keys.add(data["key"])
except (json.JSONDecodeError, KeyError):
keys.add(path.stem)
return sorted(keys)
Delete Session
def delete(self, key: str) -> bool:
"""Remove a session from disk and cache."""
self._cache.pop(key, None)
path = self._path_for(key)
if path.exists():
path.unlink()
logger.debug("Deleted session: {}", key)
return True
return False
Clear Cache
def clear_cache(self) -> None:
"""Drop all in-memory cached sessions."""
self._cache.clear()
Example session JSON:
{
"key": "cli:default",
"messages": [
{
"role": "user",
"content": "What files are in the current directory?"
},
{
"role": "assistant",
"content": "I'll list the files for you.",
"tool_calls": [
{
"id": "call_abc123",
"function_name": "list_directory",
"arguments": {"path": "."}
}
]
},
{
"role": "tool",
"content": "config.py\nmain.py\nREADME.md",
"tool_call_id": "call_abc123",
"name": "list_directory"
},
{
"role": "assistant",
"content": "The current directory contains:\n- config.py\n- main.py\n- README.md"
}
],
"summary": "[Previous conversation context]\nUser asked about directory listing and configuration options.",
"created_at": 1709136000.0,
"updated_at": 1709138400.0
}
Session Isolation
Each session is completely isolated:
- Separate message history — No cross-contamination
- Independent consolidation — Per-session memory management
- Isolated summaries — Summary stored in session object
- Per-user context — Each user gets their own conversation thread
Subagents spawned via spawn_subagent get isolated sessions but share the parent workspace, so they can access the same files and memory.
Configuration
agents:
defaults:
# Memory window
memory_window: 10 # Recent messages kept in context
auto_consolidate: true # Auto-consolidate when exceeded
consolidation_model: gpt-4o-mini # Cheaper model for consolidation
# Workspace location (sessions stored in workspace/sessions/)
workspace: ~/.grip/workspace
Session Manager API
class SessionManager:
def get_or_create(self, key: str) -> Session:
"""Load or create session."""
def get(self, key: str) -> Session | None:
"""Load existing session or None."""
def save(self, session: Session) -> None:
"""Persist session to disk."""
def delete(self, key: str) -> bool:
"""Remove session from disk and cache."""
def list_sessions(self) -> list[str]:
"""Return all session keys."""
def clear_cache(self) -> None:
"""Drop in-memory cache."""
Next Steps