Skip to main content
BaseCheckpointSaver is the abstract base class for all checkpoint savers in LangGraph. It defines the interface for persisting and retrieving agent state across multiple interactions.

Overview

Checkpointers allow LangGraph agents to:
  • Persist state within and across interactions
  • Resume from interrupts
  • Enable time-travel debugging
  • Support conversational memory

Class Definition

from langgraph.checkpoint.base import BaseCheckpointSaver

class BaseCheckpointSaver(Generic[V]):
    """Base class for creating a graph checkpointer."""
    
    serde: SerializerProtocol = JsonPlusSerializer()
Source: langgraph.checkpoint.base.__init__:122

Constructor

def __init__(
    self,
    *,
    serde: SerializerProtocol | None = None,
) -> None

Parameters

  • serde (SerializerProtocol, optional): Serializer for encoding/decoding checkpoints. Defaults to JsonPlusSerializer().

Key Concepts

Thread ID

When using a checkpointer, you must pass a thread_id in the config:
config = {"configurable": {"thread_id": "my-thread"}}
graph.invoke(inputs, config)
The thread_id is the primary key for storing and retrieving checkpoints. Without it, the checkpointer cannot save state.

Usage Patterns

  • Single-shot workflows: Use a unique ID (e.g., uuid4) for each independent run
  • Conversational memory: Reuse the same thread_id to accumulate state across invocations

Core Methods

Synchronous Methods

get

def get(self, config: RunnableConfig) -> Checkpoint | None
Fetch a checkpoint using the given configuration. Parameters:
  • config (RunnableConfig): Configuration specifying which checkpoint to retrieve
Returns:
  • Checkpoint | None: The requested checkpoint, or None if not found
Source: langgraph.checkpoint.base.__init__:173

get_tuple

def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None
Fetch a checkpoint tuple (checkpoint + metadata + config) using the given configuration. Parameters:
  • config (RunnableConfig): Configuration specifying which checkpoint to retrieve
Returns:
  • CheckpointTuple | None: The requested checkpoint tuple, or None if not found
Note: This method must be implemented by subclasses. Source: langgraph.checkpoint.base.__init__:185

list

def list(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[CheckpointTuple]
List checkpoints that match the given criteria. Parameters:
  • config (RunnableConfig | None): Base configuration for filtering checkpoints
  • filter (dict[str, Any] | None): Additional filtering criteria for metadata
  • before (RunnableConfig | None): List checkpoints created before this configuration
  • limit (int | None): Maximum number of checkpoints to return
Returns:
  • Iterator[CheckpointTuple]: Iterator of matching checkpoint tuples
Note: This method must be implemented by subclasses. Source: langgraph.checkpoint.base.__init__:199

put

def put(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig
Store a checkpoint with its configuration and metadata. Parameters:
  • config (RunnableConfig): Configuration for the checkpoint
  • checkpoint (Checkpoint): The checkpoint to store
  • metadata (CheckpointMetadata): Additional metadata for the checkpoint
  • new_versions (ChannelVersions): New channel versions as of this write
Returns:
  • RunnableConfig: Updated configuration after storing the checkpoint
Note: This method must be implemented by subclasses. Source: langgraph.checkpoint.base.__init__:223

put_writes

def put_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None
Store intermediate writes linked to a checkpoint. Parameters:
  • config (RunnableConfig): Configuration of the related checkpoint
  • writes (Sequence[tuple[str, Any]]): List of writes to store
  • task_id (str): Identifier for the task creating the writes
  • task_path (str): Path of the task creating the writes (default: "")
Note: This method must be implemented by subclasses. Source: langgraph.checkpoint.base.__init__:246

delete_thread

def delete_thread(self, thread_id: str) -> None
Delete all checkpoints and writes associated with a specific thread ID. Parameters:
  • thread_id (str): The thread ID whose checkpoints should be deleted
Source: langgraph.checkpoint.base.__init__:266

delete_for_runs

def delete_for_runs(self, run_ids: Sequence[str]) -> None
Delete all checkpoints and writes associated with the given run IDs. Parameters:
  • run_ids (Sequence[str]): The run IDs whose checkpoints should be deleted
Source: langgraph.checkpoint.base.__init__:277

copy_thread

def copy_thread(
    self,
    source_thread_id: str,
    target_thread_id: str,
) -> None
Copy all checkpoints and writes from one thread to another. Parameters:
  • source_thread_id (str): The thread ID to copy from
  • target_thread_id (str): The thread ID to copy to
Source: langgraph.checkpoint.base.__init__:288

prune

def prune(
    self,
    thread_ids: Sequence[str],
    *,
    strategy: str = "keep_latest",
) -> None
Prune checkpoints for the given threads. Parameters:
  • thread_ids (Sequence[str]): The thread IDs to prune
  • strategy (str): The pruning strategy. Options:
    • "keep_latest": Retains only the most recent checkpoint per namespace
    • "delete": Removes all checkpoints
Source: langgraph.checkpoint.base.__init__:301

Asynchronous Methods

All synchronous methods have async equivalents with an a prefix:
  • aget(config) - Async version of get()
  • aget_tuple(config) - Async version of get_tuple()
  • alist(config, *, filter, before, limit) - Async version of list()
  • aput(config, checkpoint, metadata, new_versions) - Async version of put()
  • aput_writes(config, writes, task_id, task_path) - Async version of put_writes()
  • adelete_thread(thread_id) - Async version of delete_thread()
  • adelete_for_runs(run_ids) - Async version of delete_for_runs()
  • acopy_thread(source_thread_id, target_thread_id) - Async version of copy_thread()
  • aprune(thread_ids, *, strategy) - Async version of prune()

Utility Methods

get_next_version

def get_next_version(self, current: V | None, channel: None) -> V
Generate the next version ID for a channel. Default implementation uses integer versions, incrementing by 1. Parameters:
  • current (V | None): The current version identifier (int, float, or str)
  • channel (None): Deprecated argument, kept for backwards compatibility
Returns:
  • V: The next version identifier, which must be monotonically increasing
Source: langgraph.checkpoint.base.__init__:460

Data Types

Checkpoint

class Checkpoint(TypedDict):
    """State snapshot at a given point in time."""
    
    v: int  # Checkpoint format version (currently 1)
    id: str  # Unique, monotonically increasing ID
    ts: str  # Timestamp in ISO 8601 format
    channel_values: dict[str, Any]  # Channel snapshots
    channel_versions: ChannelVersions  # Channel version strings
    versions_seen: dict[str, ChannelVersions]  # Node-channel version tracking
    updated_channels: list[str] | None  # Channels updated in this checkpoint
Source: langgraph.checkpoint.base.__init__:65

CheckpointMetadata

class CheckpointMetadata(TypedDict, total=False):
    """Metadata associated with a checkpoint."""
    
    source: Literal["input", "loop", "update", "fork"]  # Checkpoint source
    step: int  # Step number (-1 for first input, 0+ for loop steps)
    parents: dict[str, str]  # Parent checkpoint IDs by namespace
    run_id: str  # ID of the run that created this checkpoint
Source: langgraph.checkpoint.base.__init__:35

CheckpointTuple

class CheckpointTuple(NamedTuple):
    """A tuple containing a checkpoint and its associated data."""
    
    config: RunnableConfig
    checkpoint: Checkpoint
    metadata: CheckpointMetadata
    parent_config: RunnableConfig | None = None
    pending_writes: list[PendingWrite] | None = None
Source: langgraph.checkpoint.base.__init__:112

Implementation Notes

When creating a custom checkpoint saver:
  1. Inherit from BaseCheckpointSaver: Extend this base class
  2. Implement required methods: At minimum, implement:
    • get_tuple()
    • list()
    • put()
    • put_writes()
  3. Consider async support: Implement async versions to avoid blocking
  4. Handle serialization: Use the provided serde for encoding/decoding
  5. Thread safety: Ensure your implementation is thread-safe if needed

See Also

Build docs developers (and LLMs) love