Overview
Checkpoints enable you to save and restore workflow state, allowing:
Pause and resume : Stop a workflow and continue later
Fault tolerance : Recover from failures without losing progress
Long-running workflows : Handle workflows that take hours or days
State inspection : Debug by examining saved state at any point
How Checkpoints Work
Workflows execute in super steps . A checkpoint is automatically created at the end of each super step, capturing:
Messages : All messages exchanged between executors
State : Workflow state including user data and executor states
Pending requests : Any pending request-info events (for human-in-the-loop)
Iteration count : Current iteration number
Metadata : Workflow topology and version information
Enabling Checkpointing
Provide a CheckpointStorage when building your workflow:
from agent_framework import WorkflowBuilder, InMemoryCheckpointStorage
# For testing: in-memory storage
checkpoint_storage = InMemoryCheckpointStorage()
workflow = (
WorkflowBuilder(
start_executor = start,
checkpoint_storage = checkpoint_storage
)
.add_edge(start, worker)
.add_edge(worker, worker) # Self-loop
.build()
)
Checkpoint Storage Backends
In-Memory Storage (Testing)
For development and testing:
from agent_framework import InMemoryCheckpointStorage
storage = InMemoryCheckpointStorage()
In-memory storage is lost when the process ends. Use file or database storage for production.
File Storage (Production)
Persist checkpoints to disk:
from agent_framework import FileCheckpointStorage
from pathlib import Path
storage = FileCheckpointStorage(
storage_path = Path( "./checkpoints" )
)
Security : File storage uses pickle for serialization. Only load checkpoints from trusted sources.
Custom Storage
Implement CheckpointStorage protocol for custom backends:
from agent_framework import CheckpointStorage, WorkflowCheckpoint
class DatabaseCheckpointStorage :
async def save ( self , checkpoint : WorkflowCheckpoint) -> str :
# Save to database
await db.insert(checkpoint.to_dict())
return checkpoint.checkpoint_id
async def load ( self , checkpoint_id : str ) -> WorkflowCheckpoint:
# Load from database
data = await db.get(checkpoint_id)
return WorkflowCheckpoint.from_dict(data)
async def list_checkpoints ( self , * , workflow_name : str ) -> list[WorkflowCheckpoint]:
# List all checkpoints for a workflow
results = await db.query( workflow_name = workflow_name)
return [WorkflowCheckpoint.from_dict(r) for r in results]
async def delete ( self , checkpoint_id : str ) -> bool :
return await db.delete(checkpoint_id)
async def get_latest ( self , * , workflow_name : str ) -> WorkflowCheckpoint | None :
checkpoints = await self .list_checkpoints( workflow_name = workflow_name)
return max (checkpoints, key = lambda cp : cp.timestamp) if checkpoints else None
Capturing Checkpoints
Checkpoints are created automatically. Listen for checkpoint events:
async for event in workflow.run(initial_input, stream = True ):
if event.type == "superstep_completed" :
# Checkpoint available after super step
print ( f "Checkpoint created: { event.checkpoint_id } " )
Restoring from Checkpoints
Resume workflow execution from a saved checkpoint:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage( storage_path = "./checkpoints" )
# Get the latest checkpoint
latest = await storage.get_latest( workflow_name = workflow.name)
if latest:
# Resume from checkpoint
events = await workflow.run(
checkpoint_id = latest.checkpoint_id,
stream = True
)
async for event in events:
if event.type == "output" :
print ( f "Output: { event.data } " )
Executor State Persistence
Executors can save and restore internal state across checkpoints:
from agent_framework import Executor, WorkflowContext, handler
import sys
if sys.version_info >= ( 3 , 12 ):
from typing import override
else :
from typing_extensions import override
class StatefulWorker ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
self ._processed_items = []
self ._total_count = 0
@handler
async def process (
self ,
item : dict ,
ctx : WorkflowContext[ dict , str ]
) -> None :
self ._processed_items.append(item[ "id" ])
self ._total_count += 1
if self ._total_count >= 10 :
await ctx.yield_output( f "Processed { self ._total_count } items" )
else :
await ctx.send_message(item)
@override
async def on_checkpoint_save ( self ) -> dict[ str , Any]:
"""Save executor state for checkpointing."""
return {
"processed_items" : self ._processed_items,
"total_count" : self ._total_count
}
@override
async def on_checkpoint_restore ( self , state : dict[ str , Any]) -> None :
"""Restore executor state from checkpoint."""
self ._processed_items = state.get( "processed_items" , [])
self ._total_count = state.get( "total_count" , 0 )
Override on_checkpoint_save and on_checkpoint_restore to persist executor instance state across workflow restarts.
Managing Checkpoints
List Checkpoints
# List all checkpoints for a workflow
checkpoints = await storage.list_checkpoints( workflow_name = "data-pipeline" )
for cp in checkpoints:
print ( f "ID: { cp.checkpoint_id } " )
print ( f "Timestamp: { cp.timestamp } " )
print ( f "Iteration: { cp.iteration_count } " )
print ( f "State keys: { list (cp.state.keys()) } " )
Delete Old Checkpoints
from datetime import datetime, timedelta
# Delete checkpoints older than 7 days
checkpoints = await storage.list_checkpoints( workflow_name = "data-pipeline" )
cutoff = datetime.now(timezone.utc) - timedelta( days = 7 )
for cp in checkpoints:
cp_time = datetime.fromisoformat(cp.timestamp)
if cp_time < cutoff:
await storage.delete(cp.checkpoint_id)
print ( f "Deleted checkpoint { cp.checkpoint_id } " )
# Inspect checkpoint contents
checkpoint = await storage.load(checkpoint_id)
print ( f "Workflow: { checkpoint.workflow_name } " )
print ( f "Iteration: { checkpoint.iteration_count } " )
print ( f "Graph hash: { checkpoint.graph_signature_hash } " )
print ( f "Messages: { len (checkpoint.messages) } " )
print ( f "State: { checkpoint.state } " )
print ( f "Pending requests: { len (checkpoint.pending_request_info_events) } " )
Automatic Recovery Pattern
Implement automatic retry with checkpoints:
from agent_framework import WorkflowCheckpoint
storage = FileCheckpointStorage( "./checkpoints" )
latest_checkpoint: WorkflowCheckpoint | None = None
while True :
workflow = create_workflow( checkpoint_storage = storage)
try :
# Start from checkpoint or fresh
if latest_checkpoint:
print ( f "Resuming from checkpoint { latest_checkpoint.checkpoint_id } " )
event_stream = workflow.run(
checkpoint_id = latest_checkpoint.checkpoint_id,
stream = True
)
else :
print ( "Starting new workflow" )
event_stream = workflow.run(
message = initial_input,
stream = True
)
# Process events
async for event in event_stream:
if event.type == "output" :
print ( f "Workflow completed: { event.data } " )
return # Success!
if event.type == "superstep_completed" :
# Randomly simulate interruptions
if random.random() < 0.3 :
print ( "Simulated interruption" )
break
# Get latest checkpoint for retry
latest_checkpoint = await storage.get_latest( workflow_name = workflow.name)
except Exception as e:
print ( f "Error: { e } " )
latest_checkpoint = await storage.get_latest( workflow_name = workflow.name)
Checkpoint Compatibility
Checkpoints are tied to workflow topology:
Graph signature hash : Validates workflow structure hasn’t changed
Version : Checkpoint format version
Workflow name : Logical grouping of checkpoints
Changing workflow structure (adding/removing executors or edges) invalidates existing checkpoints. Maintain workflow name and topology for checkpoint compatibility.
Complete Example
Iterative computation with automatic checkpointing:
import asyncio
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
FileCheckpointStorage,
handler
)
@dataclass
class Task :
remaining: list[ int ]
class StartExecutor ( Executor ):
@handler
async def start ( self , limit : int , ctx : WorkflowContext[Task]) -> None :
await ctx.send_message(Task( remaining = list ( range ( 1 , limit + 1 ))))
class WorkerExecutor ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
self ._results = {}
@handler
async def compute (
self ,
task : Task,
ctx : WorkflowContext[Task, dict ]
) -> None :
num = task.remaining.pop( 0 )
self ._results[num] = num * num # Simple computation
if not task.remaining:
await ctx.yield_output( self ._results)
else :
await ctx.send_message(task)
async def on_checkpoint_save ( self ) -> dict[ str , Any]:
return { "results" : self ._results}
async def on_checkpoint_restore ( self , state : dict[ str , Any]) -> None :
self ._results = state.get( "results" , {})
async def main ():
storage = FileCheckpointStorage( "./checkpoints" )
start = StartExecutor( id = "start" )
worker = WorkerExecutor( id = "worker" )
workflow = (
WorkflowBuilder(
start_executor = start,
checkpoint_storage = storage
)
.add_edge(start, worker)
.add_edge(worker, worker) # Self-loop
.build()
)
# Check for existing checkpoint
latest = await storage.get_latest( workflow_name = workflow.name)
if latest:
print ( f "Resuming from iteration { latest.iteration_count } " )
events = await workflow.run( checkpoint_id = latest.checkpoint_id)
else :
print ( "Starting new workflow" )
events = await workflow.run( message = 10 )
outputs = events.get_outputs()
print ( f "Results: { outputs } " )
if __name__ == "__main__" :
asyncio.run(main())
Best Practices
Use InMemoryCheckpointStorage for tests and local development
Use FileCheckpointStorage for production single-node deployments
Use custom storage (database, blob storage) for distributed systems
Keep state serializable (primitive types, dataclasses)
Minimize state size - only persist essential data
Use executor state hooks (on_checkpoint_save/restore) for executor-specific state
Avoid storing large objects in workflow state
Implement retention policies (delete old checkpoints)
Monitor checkpoint storage size
Test checkpoint restoration regularly
Document checkpoint compatibility requirements
Handle checkpoint load failures gracefully
Validate checkpoint compatibility before restore
Log checkpoint operations for debugging
Provide fallback to fresh start if restore fails
Next Steps
Human-in-the-Loop Combine checkpoints with human input for interactive workflows
Orchestration Patterns Learn about sequential, concurrent, and conditional execution