Skip to main content

Overview

Checkpointing enables LangGraph to persist state across invocations, recover from failures, and provide time-travel debugging. It’s the foundation for long-running agents, human-in-the-loop workflows, and conversational memory.

What is a Checkpoint?

A checkpoint is a snapshot of your graph’s state at a specific point in time:
from langgraph.checkpoint.base import Checkpoint

# A checkpoint contains:
Checkpoint = {
    "v": 1,                           # Checkpoint format version
    "id": "1ef8...",                   # Unique, monotonic ID
    "ts": "2024-01-15T10:30:00Z",     # ISO 8601 timestamp
    "channel_values": {               # State at this point
        "messages": [...],
        "count": 5,
    },
    "channel_versions": {             # Version tracking
        "messages": "v1",
        "count": "v2",
    },
    "versions_seen": {                # Node execution tracking
        "node1": {"messages": "v1"},
    },
}

Basic Setup

In-Memory Checkpointing

For development and testing:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[str]

builder = StateGraph(State)
builder.add_node("chat", lambda s: {"messages": ["response"]})
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

# Enable checkpointing
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

Using Thread IDs

Thread IDs group related invocations:
# Start a conversation
config = {"configurable": {"thread_id": "conversation-123"}}

result1 = graph.invoke(
    {"messages": ["Hello"]},
    config=config
)

# Continue the same conversation
result2 = graph.invoke(
    {"messages": ["How are you?"]},
    config=config  # Same thread_id continues from previous state
)
The thread_id is the primary key for storing and retrieving checkpoints. Use unique IDs for independent conversations or sessions.

Persistent Checkpointers

PostgreSQL

For production use with PostgreSQL:
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg

# Setup connection
conn_string = "postgresql://user:password@localhost:5432/langgraph"
conn = psycopg.connect(conn_string)

# Create tables (one time)
PostgresSaver.setup_tables(conn)

# Use checkpointer
checkpointer = PostgresSaver(conn)
graph = builder.compile(checkpointer=checkpointer)

SQLite

For single-file persistence:
from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)
    
    result = graph.invoke(
        {"messages": ["Hello"]},
        config={"configurable": {"thread_id": "1"}}
    )

Async Checkpointers

For async graphs:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncpg

pool = await asyncpg.create_pool(conn_string)
checkpointer = AsyncPostgresSaver(pool)

graph = builder.compile(checkpointer=checkpointer)

result = await graph.ainvoke(
    {"messages": ["Hello"]},
    config={"configurable": {"thread_id": "1"}}
)

State Management

Getting Current State

Retrieve the latest state for a thread:
config = {"configurable": {"thread_id": "conversation-123"}}

# Get current state
state_snapshot = graph.get_state(config)

print(state_snapshot.values)      # Current state values
print(state_snapshot.next)         # Next nodes to execute
print(state_snapshot.metadata)     # Checkpoint metadata
print(state_snapshot.created_at)   # Timestamp
print(state_snapshot.tasks)        # Pending tasks

State History

Access historical checkpoints:
config = {"configurable": {"thread_id": "conversation-123"}}

# Get all checkpoints for this thread
for state in graph.get_state_history(config):
    print(f"Step {state.metadata['step']}: {state.values}")

# Get checkpoints before a specific point
for state in graph.get_state_history(config, limit=5):
    print(state)

# Filter by metadata
for state in graph.get_state_history(
    config,
    filter={"source": "loop"},
    limit=10
):
    print(state)

Checkpoint Metadata

Each checkpoint includes metadata:
state = graph.get_state(config)

metadata = state.metadata
metadata["source"]    # "input", "loop", "update", or "fork"
metadata["step"]      # Step number (-1 for input, 0+ for loop)
metadata["run_id"]    # Unique run identifier
metadata["parents"]   # Parent checkpoint IDs for nested graphs

Time-Travel and Replay

Resume from Any Checkpoint

Replay from a historical state:
# Get history
history = list(graph.get_state_history(config, limit=10))

# Pick a checkpoint to resume from (3 steps back)
past_state = history[3]

# Resume from that checkpoint
result = graph.invoke(
    None,  # Use state from checkpoint
    config=past_state.config
)

Fork Conversations

Create alternate timelines:
# Get current state
original_config = {"configurable": {"thread_id": "conversation-1"}}
original_state = graph.get_state(original_config)

# Fork to new thread
new_config = {"configurable": {"thread_id": "conversation-1-fork"}}

# Start new thread from original state
result = graph.invoke(
    {"messages": ["What if we tried this instead?"]},
    config=new_config
)

# Original thread unchanged
original = graph.get_state(original_config)

Manual State Updates

Modify state externally:
config = {"configurable": {"thread_id": "conversation-123"}}

# Update state as if a node executed
updated_config = graph.update_state(
    config,
    values={"messages": ["Injected message"]},
    as_node="chat"  # Pretend this update came from 'chat' node
)

# Next invocation sees the updated state
result = graph.invoke(None, config=updated_config)

Bulk Updates

Apply multiple updates in sequence:
from langgraph.types import StateUpdate

updates = [
    # Each superstep is a sequence of updates
    [
        StateUpdate(values={"count": 1}, as_node="node1"),
        StateUpdate(values={"count": 2}, as_node="node2"),
    ],
    [
        StateUpdate(values={"status": "done"}, as_node="node3"),
    ],
]

updated_config = graph.bulk_update_state(config, updates)

Subgraph Checkpoints

Subgraphs can have independent checkpointers:
# Subgraph with its own checkpointer
subgraph = StateGraph(SubState)
subgraph.add_node("sub_node", sub_logic)
subgraph.add_edge(START, "sub_node")
subgraph.add_edge("sub_node", END)
compiled_sub = subgraph.compile(
    checkpointer=True  # Inherit from parent
)

# Parent graph
parent = StateGraph(ParentState)
parent.add_node("subgraph", compiled_sub)
parent.add_edge(START, "subgraph")
parent.add_edge("subgraph", END)

# Parent provides the actual checkpointer
parent_graph = parent.compile(checkpointer=PostgresSaver(conn))

# Subgraph checkpoints stored under parent's thread
config = {"configurable": {"thread_id": "main-thread"}}
result = parent_graph.invoke({...}, config)

# Access subgraph state
state = parent_graph.get_state(config, subgraphs=True)

Interrupt and Resume

Pause execution and resume later:
# Compile with interrupt points
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"],
    interrupt_after=["generate_draft"]
)

config = {"configurable": {"thread_id": "workflow-1"}}

# First invocation - pauses at interrupt
for event in graph.stream({"input": "data"}, config):
    print(event)
# Execution stops at 'human_review'

# Check state
state = graph.get_state(config)
print(state.next)  # ('human_review',)

# Resume execution
for event in graph.stream(None, config):
    print(event)
# Continues from 'human_review'
See Human-in-the-Loop for more on interrupts.

Custom Checkpointers

Implement your own storage backend:
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointTuple
from langchain_core.runnables import RunnableConfig

class CustomCheckpointer(BaseCheckpointSaver):
    def __init__(self, storage_client):
        super().__init__()
        self.storage = storage_client
    
    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_data = self.storage.get(thread_id)
        if not checkpoint_data:
            return None
        return CheckpointTuple(
            config=config,
            checkpoint=checkpoint_data["checkpoint"],
            metadata=checkpoint_data["metadata"],
        )
    
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: dict,
        new_versions: dict,
    ) -> RunnableConfig:
        thread_id = config["configurable"]["thread_id"]
        self.storage.put(thread_id, {
            "checkpoint": checkpoint,
            "metadata": metadata,
        })
        return config
    
    def list(self, config, *, filter=None, before=None, limit=None):
        # Implement checkpoint listing
        ...
    
    def put_writes(self, config, writes, task_id, task_path=""):
        # Implement pending writes storage
        ...
Also implement async versions (aget_tuple, aput, etc.) for async graphs.

Checkpoint Lifecycle

Thread Management

Deleting Threads

# Delete all checkpoints for a thread
checkpointer.delete_thread("conversation-123")

# Delete multiple threads
for thread_id in old_thread_ids:
    checkpointer.delete_thread(thread_id)

Copying Threads

# Clone a thread's entire history
checkpointer.copy_thread(
    source_thread_id="conversation-123",
    target_thread_id="conversation-123-backup"
)

Pruning Checkpoints

# Keep only latest checkpoint per thread
checkpointer.prune(
    thread_ids=["thread-1", "thread-2"],
    strategy="keep_latest"
)

# Delete all checkpoints
checkpointer.prune(
    thread_ids=["old-thread"],
    strategy="delete"
)

Best Practices

  • Use UUID for one-off workflows
  • Use user_id + session_id for conversations
  • Include version in ID for schema changes
  • Document your thread ID format
  • Use connection pools for database checkpointers
  • Implement async checkpointers for async graphs
  • Prune old checkpoints regularly
  • Index thread_id and timestamp columns
  • Consider checkpoint size (minimize state bloat)
  • Always use persistent checkpointers in production
  • Test checkpoint recovery scenarios
  • Monitor checkpoint save/load latency
  • Handle checkpointer failures gracefully
  • Back up checkpoint databases
  • Encrypt sensitive data in checkpoints
  • Implement access control on thread IDs
  • Audit checkpoint access
  • Comply with data retention policies

Troubleshooting

  • Verify thread_id is correct
  • Check if checkpoint was ever created
  • Ensure checkpointer is connected
  • Look for deletion/pruning events
  • Confirm checkpointer is passed to compile()
  • Verify thread_id in config
  • Check for database connection issues
  • Review error logs for write failures
  • Reduce state size (avoid large objects)
  • Use indexed queries for history
  • Enable connection pooling
  • Consider checkpoint compression

Next Steps

Human-in-the-Loop

Use checkpointing with interrupts for human oversight

Streaming

Stream checkpoints in real-time as they’re created

Build docs developers (and LLMs) love