Persistence allows LangGraph applications to save state and resume from any point, enabling durable execution and human-in-the-loop workflows.
Checkpointers
Checkpointers save graph state at each step, creating a complete execution history.
InMemorySaver
For development and testing:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph
# Create checkpointer
memory = InMemorySaver()
# Compile with checkpointer
app = graph.compile(checkpointer=memory)
# Use with thread_id for persistence
config = {"configurable": {"thread_id": "thread-1"}}
result = app.invoke({"text": "hello"}, config)
InMemorySaver is only for debugging and testing. Use a persistent checkpointer for production.
PostgresSaver
For production use with PostgreSQL:
from langgraph.checkpoint.postgres import PostgresSaver
# Setup connection string
DB_URI = "postgresql://user:password@localhost:5432/langgraph"
# Create checkpointer
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# Setup database tables (run once)
checkpointer.setup()
# Compile graph
app = graph.compile(checkpointer=checkpointer)
# Use with thread_id
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"question": "What is LangGraph?"}, config)
SQLite Saver
For local persistence:
from langgraph.checkpoint.sqlite import SqliteSaver
# Create SQLite checkpointer
with SqliteSaver.from_conn_string("./checkpoints.db") as checkpointer:
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
Thread Management
Each conversation or workflow instance uses a unique thread ID:
import uuid
# Generate unique thread ID
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
app.invoke(input_data, config)
Resume from the last checkpoint using the same thread ID:
# First invocation
config = {"configurable": {"thread_id": "thread-1"}}
app.invoke({"messages": [{"role": "user", "content": "Hello"}]}, config)
# Resume later
app.invoke({"messages": [{"role": "user", "content": "Continue"}]}, config)
# Get all checkpoints for a thread
for state in app.get_state_history(config):
print(f"Checkpoint: {state.metadata}")
print(f"State: {state.values}")
State Inspection
Get Current State
# Get the current state
state = app.get_state(config)
print(f"Values: {state.values}")
print(f"Next steps: {state.next}")
print(f"Metadata: {state.metadata}")
Access State History
# Iterate through all checkpoints
for state in app.get_state_history(config):
print(f"Step: {state.metadata.get('step')}")
print(f"Node: {state.metadata.get('source')}")
print(f"State: {state.values}")
print("---")
Filter History
# Get history with filters
history = app.get_state_history(
config,
filter={"source": "agent"}, # Only checkpoints from 'agent' node
limit=10, # Last 10 checkpoints
)
for state in history:
print(state.values)
Time Travel
Rewind and replay from any checkpoint:
# Get state history
history = list(app.get_state_history(config))
# Get a previous checkpoint
previous_state = history[2] # 3rd most recent
# Create config for that checkpoint
past_config = {
"configurable": {
"thread_id": "thread-1",
"checkpoint_id": previous_state.config["configurable"]["checkpoint_id"],
}
}
# Resume from that point
app.invoke({"new_input": "..."}, past_config)
State Updates
Modify state before resuming:
# Get current state
state = app.get_state(config)
# Update state
app.update_state(
config,
{"corrected_value": "new value"},
as_node="agent", # Update as if from this node
)
# Continue execution with updated state
app.invoke(None, config)
Checkpoint Configuration
Setup Database Schema
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# Creates necessary tables
checkpointer.setup()
Checkpointers handle migrations automatically:
# Setup runs migrations to latest version
checkpointer.setup()
Connection Pooling
For production, use connection pooling:
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
# Create connection pool
pool = ConnectionPool(
conninfo=DB_URI,
min_size=1,
max_size=10,
)
# Use pool with checkpointer
checkpointer = PostgresSaver(pool)
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
Durability Modes
Control when checkpoints are saved:
# Sync: Checkpoint before each step (default)
app = graph.compile(
checkpointer=memory,
durability="sync",
)
# Async: Checkpoint asynchronously
app = graph.compile(
checkpointer=memory,
durability="async",
)
# Exit: Only checkpoint when graph exits
app = graph.compile(
checkpointer=memory,
durability="exit",
)
- sync: Safest, slowest - checkpoint saved before next step
- async: Faster - checkpoint saved in background
- exit: Fastest - only checkpoint at end
Serialization
Customize how state is serialized:
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from langgraph.checkpoint.memory import InMemorySaver
# Use JSON serializer
serde = JsonPlusSerializer()
memory = InMemorySaver(serde=serde)
app = graph.compile(checkpointer=memory)
Best Practices
- Use thread IDs consistently: Map thread IDs to user sessions or conversation IDs
- Handle checkpoint errors: Wrap checkpoint operations in try/except blocks
- Clean old checkpoints: Implement cleanup for old or completed threads
- Test with real checkpointers: Don’t rely on InMemorySaver for production testing
- Monitor checkpoint size: Large states may need optimization
- Use connection pooling: For production PostgreSQL deployments
- Index your queries: Add database indexes on thread_id for performance
Cleanup
Remove old checkpoints:
import datetime
# Delete checkpoints older than 30 days
cutoff = datetime.datetime.now() - datetime.timedelta(days=30)
# Implementation depends on checkpointer
# For PostgreSQL:
# DELETE FROM checkpoints WHERE created_at < cutoff
Next Steps
- Implement Memory for long-term storage across threads
- Add Interrupts for human-in-the-loop workflows
- Learn about Deployment for production systems