PostgresSaver is a checkpoint saver that stores checkpoints in a PostgreSQL database. It provides a robust, production-ready persistence solution for LangGraph agents with support for high concurrency and advanced features.
Overview
PostgresSaver is designed for:
- Production workloads
- High concurrency applications
- Multi-threaded environments
- Distributed systems
- Applications requiring advanced querying capabilities
Class Definition
from langgraph.checkpoint.postgres import PostgresSaver
class PostgresSaver(BasePostgresSaver):
"""Checkpointer that stores checkpoints in a Postgres database."""
Source: langgraph.checkpoint.postgres.__init__:32
Installation
Install the PostgreSQL checkpoint package:
pip install langgraph-checkpoint-postgres
This package requires psycopg (version 3+) and psycopg-pool for connection pooling.
Constructor
def __init__(
self,
conn: Connection | ConnectionPool,
pipe: Pipeline | None = None,
serde: SerializerProtocol | None = None,
) -> None
Parameters
conn (Connection | ConnectionPool): PostgreSQL connection or connection pool
pipe (Pipeline | None): Optional psycopg Pipeline for batching operations
serde (SerializerProtocol | None): Serializer for encoding/decoding checkpoints. Defaults to JsonPlusSerializer
Pipeline should only be used with a single Connection, not ConnectionPool.
Source: langgraph.checkpoint.postgres.__init__:37
Usage
Basic Setup
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
DB_URI = "postgres://user:password@localhost:5432/mydatabase"
# Create a graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")
# Use PostgresSaver
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# Setup database (must be called once)
checkpointer.setup()
# Compile graph with checkpointer
graph = builder.compile(checkpointer=checkpointer)
# Use the graph
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(3, config)
print(result) # Output: 4
Using with Pipeline
Pipeline mode enables batching of database operations for better performance:
with PostgresSaver.from_conn_string(DB_URI, pipeline=True) as checkpointer:
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(3, config)
Source: langgraph.checkpoint.postgres.__init__:54
Using Connection Pool
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
with ConnectionPool(DB_URI) as pool:
checkpointer = PostgresSaver(pool)
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
# Use graph...
Class Methods
from_conn_string
@classmethod
@contextmanager
def from_conn_string(
cls,
conn_string: str,
*,
pipeline: bool = False
) -> Iterator[PostgresSaver]
Create a new PostgresSaver instance from a connection string.
Parameters:
conn_string (str): PostgreSQL connection string (e.g., postgres://user:pass@host:port/db)
pipeline (bool): Whether to use Pipeline for batching operations (default: False)
Returns:
Iterator[PostgresSaver]: A context manager yielding a PostgresSaver instance
Example:
DB_URI = "postgres://postgres:postgres@localhost:5432/checkpoints?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
# Use checkpointer...
Source: langgraph.checkpoint.postgres.__init__:54
Instance Methods
setup
Set up the checkpoint database. Creates necessary tables and runs migrations.
Important: This method MUST be called directly by the user the first time the checkpointer is used.
Example:
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # Required on first use
graph = builder.compile(checkpointer=checkpointer)
Source: langgraph.checkpoint.postgres.__init__:77
get_tuple
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None
Get a checkpoint tuple from the database.
Parameters:
config (RunnableConfig): Configuration containing thread_id and optionally checkpoint_id
Returns:
CheckpointTuple | None: The checkpoint tuple, or None if not found
Example:
# Get latest checkpoint
config = {"configurable": {"thread_id": "user-123"}}
checkpoint_tuple = checkpointer.get_tuple(config)
# Get specific checkpoint by ID
config = {
"configurable": {
"thread_id": "user-123",
"checkpoint_ns": "",
"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875",
}
}
checkpoint_tuple = checkpointer.get_tuple(config)
Source: langgraph.checkpoint.postgres.__init__:184
list
def list(
self,
config: RunnableConfig | None,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> Iterator[CheckpointTuple]
List checkpoints from the database.
Parameters:
config (RunnableConfig | None): Base configuration for filtering
filter (dict[str, Any] | None): Additional metadata filtering criteria
before (RunnableConfig | None): Only return checkpoints before this checkpoint ID
limit (int | None): Maximum number of checkpoints to return
Returns:
Iterator[CheckpointTuple]: Iterator of checkpoint tuples, ordered by checkpoint ID (newest first)
Example:
# List all checkpoints for a thread
config = {"configurable": {"thread_id": "user-123"}}
checkpoints = list(checkpointer.list(config, limit=10))
# List with metadata filter
filter_criteria = {"source": "input"}
checkpoints = list(checkpointer.list(config, filter=filter_criteria))
# List checkpoints before a specific checkpoint
before_config = {
"configurable": {
"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
}
}
checkpoints = list(checkpointer.list(config, before=before_config, limit=5))
Source: langgraph.checkpoint.postgres.__init__:104
put
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig
Save a checkpoint to the database.
Parameters:
config (RunnableConfig): Configuration for the checkpoint
checkpoint (Checkpoint): The checkpoint to save
metadata (CheckpointMetadata): Additional metadata
new_versions (ChannelVersions): New channel versions
Returns:
RunnableConfig: Updated configuration with the new checkpoint ID
Example:
config = {"configurable": {"thread_id": "user-123", "checkpoint_ns": ""}}
checkpoint = {
"v": 1,
"ts": "2024-05-04T06:32:42.235444+00:00",
"id": "1ef4f797-8335-6428-8001-8a1503f9b875",
"channel_values": {"messages": [], "state": "active"},
"channel_versions": {},
"versions_seen": {},
}
metadata = {"source": "input", "step": 1, "run_id": "abc123"}
saved_config = checkpointer.put(config, checkpoint, metadata, {})
Source: langgraph.checkpoint.postgres.__init__:255
put_writes
def put_writes(
self,
config: RunnableConfig,
writes: Sequence[tuple[str, Any]],
task_id: str,
task_path: str = "",
) -> None
Store intermediate writes linked to a checkpoint.
Parameters:
config (RunnableConfig): Configuration of the related checkpoint
writes (Sequence[tuple[str, Any]]): List of (channel, value) pairs to store
task_id (str): Identifier for the task creating the writes
task_path (str): Path of the task (default: "")
Source: langgraph.checkpoint.postgres.__init__:336
delete_thread
def delete_thread(self, thread_id: str) -> None
Delete all checkpoints and writes associated with a thread ID.
Parameters:
thread_id (str): The thread ID to delete
Example:
checkpointer.delete_thread("user-123")
Source: langgraph.checkpoint.postgres.__init__:370
Database Schema
PostgresSaver creates three tables:
checkpoints table
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
checkpoint_blobs table
CREATE TABLE checkpoint_blobs (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
channel TEXT NOT NULL,
version TEXT NOT NULL,
type TEXT NOT NULL,
blob BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);
checkpoint_writes table
CREATE TABLE checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
blob BYTEA NOT NULL,
task_path TEXT NOT NULL DEFAULT '',
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
Indices are automatically created on thread_id columns for performance.
AsyncPostgresSaver
For async applications, use AsyncPostgresSaver:
import asyncio
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.graph import StateGraph
DB_URI = "postgres://user:password@localhost:5432/mydatabase"
async def main():
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
await checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-123"}}
result = await graph.ainvoke(3, config)
print(result) # Output: 4
asyncio.run(main())
Source: langgraph.checkpoint.postgres.aio:32
Advanced Features
Connection Pooling
Use ConnectionPool for better resource management:
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
DB_URI,
min_size=1,
max_size=10,
timeout=30,
)
checkpointer = PostgresSaver(pool)
checkpointer.setup()
Pipeline Mode
Pipeline mode batches database operations for improved performance:
with PostgresSaver.from_conn_string(DB_URI, pipeline=True) as checkpointer:
checkpointer.setup()
# Operations are automatically batched
JSONB Storage
PostgresSaver stores checkpoints as JSONB, enabling:
- Efficient querying of checkpoint data
- Native JSON operators in SQL queries
- Indexing on specific JSON fields
- Smaller storage footprint for structured data
Blob Storage
Large channel values are stored separately in checkpoint_blobs table for:
- Optimized storage of binary data
- Reduced checkpoint table size
- Better query performance
- Use connection pooling for multi-threaded applications
- Enable pipeline mode when available for batching operations
- Set appropriate pool sizes based on your concurrency requirements
- Use indices on frequently queried metadata fields
- Regular VACUUM operations to maintain performance
ShallowPostgresSaver
For specialized use cases requiring minimal checkpoint storage:
from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver
with ShallowPostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
# Uses optimized storage strategy
ShallowPostgresSaver stores only essential checkpoint data, reducing storage requirements.
Migrations
The checkpointer automatically runs database migrations on setup(). The migration system:
- Tracks applied migrations in
checkpoint_migrations table
- Applies new migrations incrementally
- Uses
CONCURRENTLY for index creation to avoid locking
- Supports version-based migration ordering
See Also