Overview
Hive’s session management provides a unified storage structure for agent executions, enabling crash recovery, pause/resume, and multi-entry-point coordination.
Session Structure
Each session follows a standardized directory layout:
sessions/session_YYYYMMDD_HHMMSS_{uuid}/
├── state.json # Single source of truth
├── conversations/ # Node conversation history
├── artifacts/ # Spillover data and files
└── logs/ # Observability logs
├── summary.json # L1: High-level run summary
├── details.jsonl # L2: Per-node execution details
└── tool_logs.jsonl # L3: Individual tool calls
Session Lifecycle
Creating Sessions
Sessions are created automatically when you trigger an execution:
from framework.storage.session_store import SessionStore
# Initialize session store
store = SessionStore( base_path = Path( "~/.hive/agents/my_agent" ))
# Generate a new session ID
session_id = store.generate_session_id()
# Returns: "session_20260206_143022_abc12345"
Sessions are created by the runtime when triggering executions:
from framework.runtime.agent_runtime import AgentRuntime
# Create runtime
runtime = AgentRuntime(
graph = support_agent_graph,
goal = support_agent_goal,
storage_path = Path( "./storage" ),
llm = llm_provider,
)
# Register entry points
runtime.register_entry_point(EntryPointSpec(
id = "webhook" ,
name = "Zendesk Webhook" ,
entry_node = "process-webhook" ,
trigger_type = "webhook" ,
isolation_level = "shared" ,
))
await runtime.start()
# Trigger execution - creates a new session
exec_id = await runtime.trigger( "webhook" , { "ticket_id" : "123" })
Session State
The state.json file is the single source of truth:
{
"session_id" : "session_20260206_143022_abc12345" ,
"stream_id" : "webhook" ,
"correlation_id" : "session_20260206_143022_abc12345" ,
"goal_id" : "support-goal" ,
"agent_id" : "support_agent" ,
"entry_point" : "webhook" ,
"status" : "paused" ,
"timestamps" : {
"started_at" : "2026-02-06T14:30:22Z" ,
"updated_at" : "2026-02-06T14:35:18Z" ,
"completed_at" : null
},
"progress" : {
"current_node" : "handle-request" ,
"paused_at" : "handle-request" ,
"resume_from" : "handle-request" ,
"path" : [ "process-webhook" , "classify" , "handle-request" ],
"node_visit_counts" : {
"process-webhook" : 1 ,
"classify" : 1 ,
"handle-request" : 2
}
},
"memory" : {
"ticket_id" : "123" ,
"category" : "technical" ,
"priority" : "high"
},
"input_data" : {
"ticket_id" : "123"
},
"result" : {
"output" : {},
"error" : null
}
}
Resuming Sessions
Resume a paused or crashed session:
# Read session state
state = await store.read_state(session_id)
if state and state.status == "paused" :
# Prepare resume data
session_state = {
"resume_session_id" : session_id,
"resume_from" : state.progress.resume_from,
"memory" : state.memory,
"paused_at" : state.progress.paused_at,
}
# Resume execution
new_exec_id = await runtime.trigger(
entry_point_id = "webhook" ,
input_data = {}, # Memory already preserved
session_state = session_state,
)
When resuming, the execution continues in the same session directory with the same session ID . This preserves conversation history, artifacts, and logs.
Session Isolation Levels
Entry points support three isolation modes:
Isolated
Each execution gets its own memory namespace:
EntryPointSpec(
id = "background-task" ,
entry_node = "process" ,
trigger_type = "timer" ,
isolation_level = "isolated" , # Own memory
)
Shared
All executions share the same memory (eventual consistency):
EntryPointSpec(
id = "webhook" ,
entry_node = "handle-webhook" ,
trigger_type = "webhook" ,
isolation_level = "shared" , # Shared memory
)
Synchronized
Shared memory with write locks for strong consistency:
EntryPointSpec(
id = "critical-update" ,
entry_node = "update-state" ,
trigger_type = "api" ,
isolation_level = "synchronized" , # Locks on writes
)
Querying Sessions
List and filter sessions:
# List all paused sessions
paused = await store.list_sessions( status = "paused" , limit = 10 )
for session in paused:
print ( f "Session { session.session_id } :" )
print ( f " Paused at: { session.progress.paused_at } " )
print ( f " Path: { ' -> ' .join(session.progress.path) } " )
# Filter by goal
goal_sessions = await store.list_sessions( goal_id = "support-goal" )
Session Cleanup
Delete completed or abandoned sessions:
# Delete a specific session
deleted = await store.delete_session(session_id)
if deleted:
print ( f "Deleted session { session_id } " )
Deleting a session removes all data : conversations, artifacts, logs, and checkpoints. This action is irreversible .
Multi-Graph Sessions
Secondary graphs can be loaded into an existing session:
from framework.monitoring import judge_graph, judge_goal
# Load health judge as secondary graph
await runtime.add_graph(
graph_id = "judge" ,
graph = judge_graph,
goal = judge_goal,
entry_points = { "health_check" : health_check_entry},
storage_subpath = "graphs/judge" , # Isolated storage
)
The judge operates in its own subdirectory:
sessions/session_20260206_143022_abc12345/
├── state.json # Primary worker state
├── conversations/ # Primary worker conversations
└── graphs/
└── judge/
├── sessions/ # Judge sessions
└── runtime_logs/ # Judge logs
Best Practices
Session IDs Use the generated session ID format. It embeds timestamp for easy sorting and debugging.
State Updates State is written atomically on pause, completion, or failure. Don’t rely on intermediate writes.
Memory Size Keep memory lean. Store large data in artifacts/ and reference by key.
Shared Sessions Use isolation_level="shared" carefully. Multiple entry points mutating shared memory can race.