Skip to main content

Overview

Checkpoints enable you to save and restore workflow state, allowing:
  • Pause and resume: Stop a workflow and continue later
  • Fault tolerance: Recover from failures without losing progress
  • Long-running workflows: Handle workflows that take hours or days
  • State inspection: Debug by examining saved state at any point

How Checkpoints Work

Workflows execute in super steps. A checkpoint is automatically created at the end of each super step, capturing:
  • Messages: All messages exchanged between executors
  • State: Workflow state including user data and executor states
  • Pending requests: Any pending request-info events (for human-in-the-loop)
  • Iteration count: Current iteration number
  • Metadata: Workflow topology and version information
Checkpoint creation at super step boundaries

Enabling Checkpointing

Provide a CheckpointStorage when building your workflow:
from agent_framework import WorkflowBuilder, InMemoryCheckpointStorage

# For testing: in-memory storage
checkpoint_storage = InMemoryCheckpointStorage()

workflow = (
    WorkflowBuilder(
        start_executor=start,
        checkpoint_storage=checkpoint_storage
    )
    .add_edge(start, worker)
    .add_edge(worker, worker)  # Self-loop
    .build()
)

Checkpoint Storage Backends

In-Memory Storage (Testing)

For development and testing:
Python
from agent_framework import InMemoryCheckpointStorage

storage = InMemoryCheckpointStorage()
In-memory storage is lost when the process ends. Use file or database storage for production.

File Storage (Production)

Persist checkpoints to disk:
Python
from agent_framework import FileCheckpointStorage
from pathlib import Path

storage = FileCheckpointStorage(
    storage_path=Path("./checkpoints")
)
Security: File storage uses pickle for serialization. Only load checkpoints from trusted sources.

Custom Storage

Implement CheckpointStorage protocol for custom backends:
Python
from agent_framework import CheckpointStorage, WorkflowCheckpoint

class DatabaseCheckpointStorage:
    async def save(self, checkpoint: WorkflowCheckpoint) -> str:
        # Save to database
        await db.insert(checkpoint.to_dict())
        return checkpoint.checkpoint_id
    
    async def load(self, checkpoint_id: str) -> WorkflowCheckpoint:
        # Load from database
        data = await db.get(checkpoint_id)
        return WorkflowCheckpoint.from_dict(data)
    
    async def list_checkpoints(self, *, workflow_name: str) -> list[WorkflowCheckpoint]:
        # List all checkpoints for a workflow
        results = await db.query(workflow_name=workflow_name)
        return [WorkflowCheckpoint.from_dict(r) for r in results]
    
    async def delete(self, checkpoint_id: str) -> bool:
        return await db.delete(checkpoint_id)
    
    async def get_latest(self, *, workflow_name: str) -> WorkflowCheckpoint | None:
        checkpoints = await self.list_checkpoints(workflow_name=workflow_name)
        return max(checkpoints, key=lambda cp: cp.timestamp) if checkpoints else None

Capturing Checkpoints

Checkpoints are created automatically. Listen for checkpoint events:
async for event in workflow.run(initial_input, stream=True):
    if event.type == "superstep_completed":
        # Checkpoint available after super step
        print(f"Checkpoint created: {event.checkpoint_id}")

Restoring from Checkpoints

Resume workflow execution from a saved checkpoint:
from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(storage_path="./checkpoints")

# Get the latest checkpoint
latest = await storage.get_latest(workflow_name=workflow.name)

if latest:
    # Resume from checkpoint
    events = await workflow.run(
        checkpoint_id=latest.checkpoint_id,
        stream=True
    )
    
    async for event in events:
        if event.type == "output":
            print(f"Output: {event.data}")

Executor State Persistence

Executors can save and restore internal state across checkpoints:
Python
from agent_framework import Executor, WorkflowContext, handler
import sys
if sys.version_info >= (3, 12):
    from typing import override
else:
    from typing_extensions import override

class StatefulWorker(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self._processed_items = []
        self._total_count = 0
    
    @handler
    async def process(
        self,
        item: dict,
        ctx: WorkflowContext[dict, str]
    ) -> None:
        self._processed_items.append(item["id"])
        self._total_count += 1
        
        if self._total_count >= 10:
            await ctx.yield_output(f"Processed {self._total_count} items")
        else:
            await ctx.send_message(item)
    
    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        """Save executor state for checkpointing."""
        return {
            "processed_items": self._processed_items,
            "total_count": self._total_count
        }
    
    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        """Restore executor state from checkpoint."""
        self._processed_items = state.get("processed_items", [])
        self._total_count = state.get("total_count", 0)
Override on_checkpoint_save and on_checkpoint_restore to persist executor instance state across workflow restarts.

Managing Checkpoints

List Checkpoints

Python
# List all checkpoints for a workflow
checkpoints = await storage.list_checkpoints(workflow_name="data-pipeline")

for cp in checkpoints:
    print(f"ID: {cp.checkpoint_id}")
    print(f"Timestamp: {cp.timestamp}")
    print(f"Iteration: {cp.iteration_count}")
    print(f"State keys: {list(cp.state.keys())}")

Delete Old Checkpoints

Python
from datetime import datetime, timedelta

# Delete checkpoints older than 7 days
checkpoints = await storage.list_checkpoints(workflow_name="data-pipeline")
cutoff = datetime.now(timezone.utc) - timedelta(days=7)

for cp in checkpoints:
    cp_time = datetime.fromisoformat(cp.timestamp)
    if cp_time < cutoff:
        await storage.delete(cp.checkpoint_id)
        print(f"Deleted checkpoint {cp.checkpoint_id}")

Checkpoint Metadata

Python
# Inspect checkpoint contents
checkpoint = await storage.load(checkpoint_id)

print(f"Workflow: {checkpoint.workflow_name}")
print(f"Iteration: {checkpoint.iteration_count}")
print(f"Graph hash: {checkpoint.graph_signature_hash}")
print(f"Messages: {len(checkpoint.messages)}")
print(f"State: {checkpoint.state}")
print(f"Pending requests: {len(checkpoint.pending_request_info_events)}")

Automatic Recovery Pattern

Implement automatic retry with checkpoints:
Python
from agent_framework import WorkflowCheckpoint

storage = FileCheckpointStorage("./checkpoints")
latest_checkpoint: WorkflowCheckpoint | None = None

while True:
    workflow = create_workflow(checkpoint_storage=storage)
    
    try:
        # Start from checkpoint or fresh
        if latest_checkpoint:
            print(f"Resuming from checkpoint {latest_checkpoint.checkpoint_id}")
            event_stream = workflow.run(
                checkpoint_id=latest_checkpoint.checkpoint_id,
                stream=True
            )
        else:
            print("Starting new workflow")
            event_stream = workflow.run(
                message=initial_input,
                stream=True
            )
        
        # Process events
        async for event in event_stream:
            if event.type == "output":
                print(f"Workflow completed: {event.data}")
                return  # Success!
            
            if event.type == "superstep_completed":
                # Randomly simulate interruptions
                if random.random() < 0.3:
                    print("Simulated interruption")
                    break
        
        # Get latest checkpoint for retry
        latest_checkpoint = await storage.get_latest(workflow_name=workflow.name)
        
    except Exception as e:
        print(f"Error: {e}")
        latest_checkpoint = await storage.get_latest(workflow_name=workflow.name)

Checkpoint Compatibility

Checkpoints are tied to workflow topology:
  • Graph signature hash: Validates workflow structure hasn’t changed
  • Version: Checkpoint format version
  • Workflow name: Logical grouping of checkpoints
Changing workflow structure (adding/removing executors or edges) invalidates existing checkpoints. Maintain workflow name and topology for checkpoint compatibility.

Complete Example

Iterative computation with automatic checkpointing:
Python
import asyncio
from dataclasses import dataclass
from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    FileCheckpointStorage,
    handler
)

@dataclass
class Task:
    remaining: list[int]

class StartExecutor(Executor):
    @handler
    async def start(self, limit: int, ctx: WorkflowContext[Task]) -> None:
        await ctx.send_message(Task(remaining=list(range(1, limit + 1))))

class WorkerExecutor(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self._results = {}
    
    @handler
    async def compute(
        self,
        task: Task,
        ctx: WorkflowContext[Task, dict]
    ) -> None:
        num = task.remaining.pop(0)
        self._results[num] = num * num  # Simple computation
        
        if not task.remaining:
            await ctx.yield_output(self._results)
        else:
            await ctx.send_message(task)
    
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"results": self._results}
    
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._results = state.get("results", {})

async def main():
    storage = FileCheckpointStorage("./checkpoints")
    start = StartExecutor(id="start")
    worker = WorkerExecutor(id="worker")
    
    workflow = (
        WorkflowBuilder(
            start_executor=start,
            checkpoint_storage=storage
        )
        .add_edge(start, worker)
        .add_edge(worker, worker)  # Self-loop
        .build()
    )
    
    # Check for existing checkpoint
    latest = await storage.get_latest(workflow_name=workflow.name)
    
    if latest:
        print(f"Resuming from iteration {latest.iteration_count}")
        events = await workflow.run(checkpoint_id=latest.checkpoint_id)
    else:
        print("Starting new workflow")
        events = await workflow.run(message=10)
    
    outputs = events.get_outputs()
    print(f"Results: {outputs}")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

  • Use InMemoryCheckpointStorage for tests and local development
  • Use FileCheckpointStorage for production single-node deployments
  • Use custom storage (database, blob storage) for distributed systems
  • Keep state serializable (primitive types, dataclasses)
  • Minimize state size - only persist essential data
  • Use executor state hooks (on_checkpoint_save/restore) for executor-specific state
  • Avoid storing large objects in workflow state
  • Implement retention policies (delete old checkpoints)
  • Monitor checkpoint storage size
  • Test checkpoint restoration regularly
  • Document checkpoint compatibility requirements
  • Handle checkpoint load failures gracefully
  • Validate checkpoint compatibility before restore
  • Log checkpoint operations for debugging
  • Provide fallback to fresh start if restore fails

Next Steps

Human-in-the-Loop

Combine checkpoints with human input for interactive workflows

Orchestration Patterns

Learn about sequential, concurrent, and conditional execution

Build docs developers (and LLMs) love