Checkpointing enables LangGraph to persist state across invocations, recover from failures, and provide time-travel debugging. It’s the foundation for long-running agents, human-in-the-loop workflows, and conversational memory.
config = {"configurable": {"thread_id": "conversation-123"}}# Get current statestate_snapshot = graph.get_state(config)print(state_snapshot.values) # Current state valuesprint(state_snapshot.next) # Next nodes to executeprint(state_snapshot.metadata) # Checkpoint metadataprint(state_snapshot.created_at) # Timestampprint(state_snapshot.tasks) # Pending tasks
config = {"configurable": {"thread_id": "conversation-123"}}# Get all checkpoints for this threadfor state in graph.get_state_history(config): print(f"Step {state.metadata['step']}: {state.values}")# Get checkpoints before a specific pointfor state in graph.get_state_history(config, limit=5): print(state)# Filter by metadatafor state in graph.get_state_history( config, filter={"source": "loop"}, limit=10): print(state)
state = graph.get_state(config)metadata = state.metadatametadata["source"] # "input", "loop", "update", or "fork"metadata["step"] # Step number (-1 for input, 0+ for loop)metadata["run_id"] # Unique run identifiermetadata["parents"] # Parent checkpoint IDs for nested graphs
# Get historyhistory = list(graph.get_state_history(config, limit=10))# Pick a checkpoint to resume from (3 steps back)past_state = history[3]# Resume from that checkpointresult = graph.invoke( None, # Use state from checkpoint config=past_state.config)
# Get current stateoriginal_config = {"configurable": {"thread_id": "conversation-1"}}original_state = graph.get_state(original_config)# Fork to new threadnew_config = {"configurable": {"thread_id": "conversation-1-fork"}}# Start new thread from original stateresult = graph.invoke( {"messages": ["What if we tried this instead?"]}, config=new_config)# Original thread unchangedoriginal = graph.get_state(original_config)
config = {"configurable": {"thread_id": "conversation-123"}}# Update state as if a node executedupdated_config = graph.update_state( config, values={"messages": ["Injected message"]}, as_node="chat" # Pretend this update came from 'chat' node)# Next invocation sees the updated stateresult = graph.invoke(None, config=updated_config)
# Delete all checkpoints for a threadcheckpointer.delete_thread("conversation-123")# Delete multiple threadsfor thread_id in old_thread_ids: checkpointer.delete_thread(thread_id)
# Keep only latest checkpoint per threadcheckpointer.prune( thread_ids=["thread-1", "thread-2"], strategy="keep_latest")# Delete all checkpointscheckpointer.prune( thread_ids=["old-thread"], strategy="delete")