Skip to main content

Overview

Hive’s session management provides a unified storage structure for agent executions, enabling crash recovery, pause/resume, and multi-entry-point coordination.

Session Structure

Each session follows a standardized directory layout:
sessions/session_YYYYMMDD_HHMMSS_{uuid}/
├── state.json            # Single source of truth
├── conversations/        # Node conversation history
├── artifacts/            # Spillover data and files
└── logs/                 # Observability logs
    ├── summary.json      # L1: High-level run summary
    ├── details.jsonl     # L2: Per-node execution details
    └── tool_logs.jsonl   # L3: Individual tool calls

Session Lifecycle

Creating Sessions

Sessions are created automatically when you trigger an execution:
from framework.storage.session_store import SessionStore

# Initialize session store
store = SessionStore(base_path=Path("~/.hive/agents/my_agent"))

# Generate a new session ID
session_id = store.generate_session_id()
# Returns: "session_20260206_143022_abc12345"
Sessions are created by the runtime when triggering executions:
from framework.runtime.agent_runtime import AgentRuntime

# Create runtime
runtime = AgentRuntime(
    graph=support_agent_graph,
    goal=support_agent_goal,
    storage_path=Path("./storage"),
    llm=llm_provider,
)

# Register entry points
runtime.register_entry_point(EntryPointSpec(
    id="webhook",
    name="Zendesk Webhook",
    entry_node="process-webhook",
    trigger_type="webhook",
    isolation_level="shared",
))

await runtime.start()

# Trigger execution - creates a new session
exec_id = await runtime.trigger("webhook", {"ticket_id": "123"})

Session State

The state.json file is the single source of truth:
{
  "session_id": "session_20260206_143022_abc12345",
  "stream_id": "webhook",
  "correlation_id": "session_20260206_143022_abc12345",
  "goal_id": "support-goal",
  "agent_id": "support_agent",
  "entry_point": "webhook",
  "status": "paused",
  "timestamps": {
    "started_at": "2026-02-06T14:30:22Z",
    "updated_at": "2026-02-06T14:35:18Z",
    "completed_at": null
  },
  "progress": {
    "current_node": "handle-request",
    "paused_at": "handle-request",
    "resume_from": "handle-request",
    "path": ["process-webhook", "classify", "handle-request"],
    "node_visit_counts": {
      "process-webhook": 1,
      "classify": 1,
      "handle-request": 2
    }
  },
  "memory": {
    "ticket_id": "123",
    "category": "technical",
    "priority": "high"
  },
  "input_data": {
    "ticket_id": "123"
  },
  "result": {
    "output": {},
    "error": null
  }
}

Resuming Sessions

Resume a paused or crashed session:
# Read session state
state = await store.read_state(session_id)

if state and state.status == "paused":
    # Prepare resume data
    session_state = {
        "resume_session_id": session_id,
        "resume_from": state.progress.resume_from,
        "memory": state.memory,
        "paused_at": state.progress.paused_at,
    }
    
    # Resume execution
    new_exec_id = await runtime.trigger(
        entry_point_id="webhook",
        input_data={},  # Memory already preserved
        session_state=session_state,
    )
When resuming, the execution continues in the same session directory with the same session ID. This preserves conversation history, artifacts, and logs.

Session Isolation Levels

Entry points support three isolation modes:

Isolated

Each execution gets its own memory namespace:
EntryPointSpec(
    id="background-task",
    entry_node="process",
    trigger_type="timer",
    isolation_level="isolated",  # Own memory
)

Shared

All executions share the same memory (eventual consistency):
EntryPointSpec(
    id="webhook",
    entry_node="handle-webhook",
    trigger_type="webhook",
    isolation_level="shared",  # Shared memory
)

Synchronized

Shared memory with write locks for strong consistency:
EntryPointSpec(
    id="critical-update",
    entry_node="update-state",
    trigger_type="api",
    isolation_level="synchronized",  # Locks on writes
)

Querying Sessions

List and filter sessions:
# List all paused sessions
paused = await store.list_sessions(status="paused", limit=10)

for session in paused:
    print(f"Session {session.session_id}:")
    print(f"  Paused at: {session.progress.paused_at}")
    print(f"  Path: {' -> '.join(session.progress.path)}")

# Filter by goal
goal_sessions = await store.list_sessions(goal_id="support-goal")

Session Cleanup

Delete completed or abandoned sessions:
# Delete a specific session
deleted = await store.delete_session(session_id)

if deleted:
    print(f"Deleted session {session_id}")
Deleting a session removes all data: conversations, artifacts, logs, and checkpoints. This action is irreversible.

Multi-Graph Sessions

Secondary graphs can be loaded into an existing session:
from framework.monitoring import judge_graph, judge_goal

# Load health judge as secondary graph
await runtime.add_graph(
    graph_id="judge",
    graph=judge_graph,
    goal=judge_goal,
    entry_points={"health_check": health_check_entry},
    storage_subpath="graphs/judge",  # Isolated storage
)
The judge operates in its own subdirectory:
sessions/session_20260206_143022_abc12345/
├── state.json              # Primary worker state
├── conversations/          # Primary worker conversations
└── graphs/
    └── judge/
        ├── sessions/       # Judge sessions
        └── runtime_logs/   # Judge logs

Best Practices

Session IDs

Use the generated session ID format. It embeds timestamp for easy sorting and debugging.

State Updates

State is written atomically on pause, completion, or failure. Don’t rely on intermediate writes.

Memory Size

Keep memory lean. Store large data in artifacts/ and reference by key.

Shared Sessions

Use isolation_level="shared" carefully. Multiple entry points mutating shared memory can race.

Build docs developers (and LLMs) love