Skip to main content

Overview

Hive provides robust crash recovery mechanisms to ensure agents can recover from failures automatically:
  • Automatic Resurrection: Retry failed executions up to N times
  • Checkpoints: Incremental state snapshots for rollback
  • Session Persistence: Atomic state writes survive crashes
  • Fatal Error Detection: Skip resurrection for non-recoverable errors

Automatic Resurrection

How It Works

When an execution fails with a non-fatal error, Hive automatically restarts it from the failed node:
from framework.runtime.execution_stream import EntryPointSpec

# Configure resurrection behavior
entry = EntryPointSpec(
    id="api",
    entry_node="handle-request",
    trigger_type="api",
    max_resurrections=3,  # Retry up to 3 times (default)
)
Resurrection flow:
  1. Execution fails at node process-data
  2. Check if error is fatal (credentials, imports, etc.)
  3. If non-fatal and resurrections < max, restart from process-data
  4. Preserve memory and conversation state
  5. Continue execution with 2-second cooldown

Fatal vs Non-Fatal Errors

Hive distinguishes between recoverable and non-recoverable failures: Fatal Errors (no resurrection):
# From execution_stream.py
_FATAL_ERROR_PATTERNS = (
    "credential",
    "authentication",
    "unauthorized",
    "forbidden",
    "api key",
    "import error",
    "module not found",
    "no module named",
    "permission denied",
    "invalid api",
    "configuration error",
)
Examples:
  • Missing API credentials
  • Invalid authentication tokens
  • Import errors (missing dependencies)
  • Permission denied (filesystem, API)
  • Configuration errors
Non-Fatal Errors (resurrection enabled):
  • Transient network failures
  • Rate limit errors
  • Temporary API unavailability
  • Malformed data (can be retried with different approach)
  • Tool execution errors

Resurrection Example

from framework.runtime.event_bus import EventType

async def track_resurrections(event):
    print(f"🔄 Resurrection #{event.data['attempt']} of {event.data['max_resurrections']}")
    print(f"   Error: {event.data['error'][:200]}")
    print(f"   Resuming from: {event.data['resume_from']}")

bus.subscribe(
    event_types=[EventType.EXECUTION_RESURRECTED],
    handler=track_resurrections,
)

# Trigger execution
exec_id = await runtime.trigger("api", {"query": "process"})

# If it fails with transient error, automatically resurrects
Example output:
🔄 Resurrection #1 of 3
   Error: HTTPError: 503 Service Unavailable
   Resuming from: fetch-data

🔄 Resurrection #2 of 3
   Error: HTTPError: 503 Service Unavailable
   Resuming from: fetch-data

✅ Execution succeeded on attempt 3

Disabling Resurrection

# Disable resurrection for entry point
EntryPointSpec(
    id="critical",
    entry_node="process",
    max_resurrections=0,  # No auto-retry
)

Checkpoint System

Checkpoint Configuration

Configure checkpoint behavior:
from framework.graph.checkpoint_config import CheckpointConfig

# Default config (checkpoint before and after each node)
config = CheckpointConfig(
    enabled=True,
    checkpoint_on_node_start=True,
    checkpoint_on_node_complete=True,
    checkpoint_max_age_days=7,
    prune_every_n_nodes=10,
    async_checkpoint=True,  # Non-blocking writes
    include_full_memory=True,
    include_metrics=True,
)

runtime = AgentRuntime(
    graph=my_graph,
    goal=my_goal,
    storage_path=storage_path,
    checkpoint_config=config,
)
Pre-configured options:
from framework.graph.checkpoint_config import (
    DEFAULT_CHECKPOINT_CONFIG,  # Checkpoint at start + complete
    MINIMAL_CHECKPOINT_CONFIG,  # Only at node complete
    DISABLED_CHECKPOINT_CONFIG,  # No checkpoints
)

runtime = AgentRuntime(
    ...,
    checkpoint_config=MINIMAL_CHECKPOINT_CONFIG,
)

Checkpoint Storage

Checkpoints are stored per-session:
sessions/session_20260206_143022_abc12345/
├── checkpoints/
│   ├── node_plan_start_001.json
│   ├── node_plan_complete_001.json
│   ├── node_research_start_001.json
│   ├── node_research_complete_001.json
│   └── ...
├── conversations/
├── artifacts/
└── logs/
Checkpoint contents:
{
  "checkpoint_id": "ckpt_20260206_143522_abc",
  "session_id": "session_20260206_143022_abc12345",
  "timestamp": "2026-02-06T14:35:22Z",
  "node_id": "research",
  "checkpoint_type": "node_complete",
  "execution_path": ["plan", "research"],
  "memory": {
    "research_query": "machine learning frameworks",
    "sources_found": 5
  },
  "node_visit_counts": {
    "plan": 1,
    "research": 1
  },
  "metrics": {
    "total_tokens": 12500,
    "execution_time_seconds": 45.2
  }
}

Manual Checkpointing

Create checkpoints programmatically:
from framework.storage.checkpoint_store import CheckpointStore

ckpt_store = CheckpointStore(base_path / "checkpoints")

# Create checkpoint
checkpoint = Checkpoint(
    checkpoint_id=f"ckpt_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}",
    session_id=session_id,
    node_id=current_node,
    checkpoint_type="manual",
    memory=current_memory,
    execution_path=path,
)

await ckpt_store.save_checkpoint(session_id, checkpoint)

Checkpoint Pruning

Old checkpoints are automatically pruned:
# Configured via CheckpointConfig
config = CheckpointConfig(
    checkpoint_max_age_days=7,  # Delete checkpoints older than 7 days
    prune_every_n_nodes=10,  # Check for pruning every 10 nodes
)
Manual pruning:
from framework.storage.checkpoint_store import CheckpointStore
from datetime import timedelta

ckpt_store = CheckpointStore(base_path / "checkpoints")

# Prune checkpoints older than 7 days
max_age = timedelta(days=7)
pruned = await ckpt_store.prune_old_checkpoints(session_id, max_age)
print(f"Pruned {pruned} old checkpoints")

Session State Persistence

Atomic State Writes

State is written atomically to survive crashes:
# From session_store.py

async def write_state(self, session_id: str, state: SessionState):
    """Atomically write state.json using temp file + rename."""
    def _write():
        state_path = self.get_state_path(session_id)
        state_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Write to temp file
        with atomic_write(state_path) as f:
            f.write(state.model_dump_json(indent=2))
        
        # Atomic rename (temp → state.json)
    
    await asyncio.to_thread(_write)
This ensures:
  • No partial writes from crashes
  • State is always consistent
  • Can resume from last good state

State Write Points

State is written at critical points:
  1. Execution start: Initial state with input data
  2. Execution pause: When waiting for user input
  3. Execution complete: Final state with output
  4. Execution failure: State with error information
  5. Resurrection: State before each retry attempt

Resume After Crash

If an agent crashes mid-execution, resume from saved state:
from framework.storage.session_store import SessionStore

store = SessionStore(base_path)

# Find crashed sessions
sessions = await store.list_sessions(status="active", limit=100)

for session in sessions:
    # Check if actually crashed (no recent updates)
    last_update = datetime.fromisoformat(session.timestamps.updated_at)
    if datetime.now() - last_update > timedelta(hours=1):
        print(f"Found crashed session: {session.session_id}")
        print(f"  Last node: {session.progress.current_node}")
        print(f"  Path: {' -> '.join(session.progress.path)}")
        
        # Resume execution
        session_state = {
            "resume_session_id": session.session_id,
            "resume_from": session.progress.current_node,
            "memory": session.memory,
        }
        
        exec_id = await runtime.trigger(
            entry_point_id=session.entry_point,
            input_data={},  # Memory already loaded
            session_state=session_state,
        )
        print(f"Resumed as {exec_id}")

JSONL Logging for Crash Resilience

L2 and L3 logs use JSONL format for incremental persistence:
# From runtime_log_store.py

def append_step(self, run_id: str, step: NodeStepLog) -> None:
    """Append one JSONL line to tool_logs.jsonl. Sync."""
    path = self._get_run_dir(run_id) / "tool_logs.jsonl"
    line = json.dumps(step.model_dump(), ensure_ascii=False) + "\n"
    
    # Immediate write - survives crashes
    with open(path, "a", encoding="utf-8") as f:
        f.write(line)

def append_node_detail(self, run_id: str, detail: NodeDetail) -> None:
    """Append one JSONL line to details.jsonl. Sync."""
    path = self._get_run_dir(run_id) / "details.jsonl"
    line = json.dumps(detail.model_dump(), ensure_ascii=False) + "\n"
    
    with open(path, "a", encoding="utf-8") as f:
        f.write(line)
Benefits:
  • Data on disk immediately after each node/tool completes
  • No data loss even if agent crashes before end_run()
  • Corrupt line handling: Skips partial writes from crashes
Reading crash-corrupted logs:
def _read_jsonl_as_models(path: Path, model_cls: type) -> list:
    """Parse JSONL, skipping corrupt lines."""
    results = []
    if not path.exists():
        return results
    
    with open(path, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                data = json.loads(line)
                results.append(model_cls(**data))
            except (json.JSONDecodeError, Exception) as e:
                # Skip corrupt line from crash
                logger.warning(f"Skipping corrupt JSONL line: {e}")
                continue
    
    return results

Multi-Graph Recovery

Secondary graphs recover independently:
# Load health judge as secondary graph
await runtime.add_graph(
    graph_id="judge",
    graph=judge_graph,
    goal=judge_goal,
    entry_points={"health_check": health_entry},
    storage_subpath="graphs/judge",
)
If the judge crashes, it resumes from its own state:
sessions/session_20260206_143022_abc12345/
├── state.json              # Primary worker state
├── conversations/          # Primary worker conversations
└── graphs/
    └── judge/
        ├── sessions/       # Judge sessions (isolated)
        │   └── session_20260206_143100_def45678/
        │       ├── state.json  # Judge state
        │       └── conversations/
        └── runtime_logs/   # Judge logs
The primary worker and judge recover independently.

Error Handling Patterns

Graceful Degradation

try:
    result = await execute_primary_path()
except Exception as e:
    logger.warning(f"Primary path failed: {e}")
    # Fall back to alternative approach
    result = await execute_fallback_path()

Retry with Backoff

import asyncio
from typing import TypeVar

T = TypeVar('T')

async def retry_with_backoff(
    func,
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
) -> T:
    """Retry with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            if attempt == max_attempts - 1:
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            await asyncio.sleep(delay)

# Usage
result = await retry_with_backoff(
    lambda: api_client.fetch_data(query),
    max_attempts=3,
)

Circuit Breaker

import time

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half_open
    
    async def call(self, func):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half_open"
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")
        
        try:
            result = await func()
            
            # Success - reset
            if self.state == "half_open":
                self.state = "closed"
                self.failures = 0
            
            return result
        
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            
            if self.failures >= self.failure_threshold:
                self.state = "open"
            
            raise

# Usage
breaker = CircuitBreaker(failure_threshold=3, timeout=60.0)

try:
    result = await breaker.call(lambda: unreliable_api.call())
except CircuitBreakerOpenError:
    logger.error("API circuit breaker open, skipping call")
    result = fallback_value

Best Practices

Resurrection Budget

Set max_resurrections based on your use case. 3 is reasonable for transient failures.

Fatal Detection

Add custom fatal error patterns if needed. Don’t waste resurrections on config errors.

Checkpoint Frequency

Balance checkpoint overhead with recovery granularity. Node completion is usually sufficient.

State Validation

Validate session state before resuming. Detect corrupted state early.

Build docs developers (and LLMs) love