The AI agent uses LangGraph’s PostgreSQL checkpointing to maintain conversation state across sessions. This enables multi-turn conversations with full context preservation.
Overview
Checkpointing is implemented using langgraph.checkpoint.postgres.PostgresSaver (see src/copilot/graph.py:417-429):
def create_agent_graph():
"""Create and compile the agent graph with PostgreSQL checkpointing.
Returns:
Compiled LangGraph workflow ready for invocation
Note:
The connection to PostgreSQL is managed internally. For production,
consider using a connection pool for better resource management.
"""
conn = Connection.connect(config.VECTOR_DATABASE_URL, **_connection_kwargs)
checkpointer = PostgresSaver(conn)
checkpointer.setup()
# ... workflow definition ...
return workflow.compile(checkpointer=checkpointer)
Environment Variables
Configure the PostgreSQL connection for checkpointing:
# Required: PostgreSQL database URL
VECTOR_DATABASE_URL=postgresql://user:password@localhost:5432/dbname
# Alternative variable name (for compatibility)
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
postgresql://[user[:password]@][host][:port][/database][?param1=value1&...]
Examples:
# Local development
VECTOR_DATABASE_URL=postgresql://postgres:password@localhost:5432/copilot_dev
# Production with SSL
VECTOR_DATABASE_URL=postgresql://user:[email protected]:5432/copilot?sslmode=require
# Cloud provider (e.g., Neon, Supabase)
VECTOR_DATABASE_URL=postgresql://user:[email protected]:5432/main?sslmode=require
Connection Configuration
The checkpointer uses specific connection settings (defined in src/copilot/graph.py:32-35):
_connection_kwargs = {
"prepare_threshold": 0,
"autocommit": True,
}
Connection Parameters
Disables prepared statements. Set to 0 to avoid issues with connection poolers like PgBouncer.
Enables autocommit mode for immediate persistence of checkpoint data.
Database Schema
The PostgresSaver automatically creates the required tables when checkpointer.setup() is called:
-- Checkpoints table (created automatically by LangGraph)
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
parent_id TEXT,
checkpoint JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (thread_id, checkpoint_id)
);
-- Writes table for pending operations
CREATE TABLE IF NOT EXISTS checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
value JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Note: These tables are managed by LangGraph. You don’t need to create them manually.
Session Management
Conversations are organized by thread_id. Each thread maintains its own checkpoint history:
from src.copilot.graph import create_agent_graph
app = create_agent_graph()
# Start a new conversation
result = app.invoke(
input={"messages": [("user", "What is incident INC-2025-08-24-001?")]},
config={"configurable": {"thread_id": "user-123-session-1"}}
)
# Continue the conversation (context is preserved)
result = app.invoke(
input={"messages": [("user", "What was the resolution?")]},
config={"configurable": {"thread_id": "user-123-session-1"}}
)
Thread ID Conventions
Recommended patterns for thread_id:
# User-based sessions
thread_id = f"user-{user_id}-session-{session_id}"
# Timestamp-based
thread_id = f"{user_id}-{datetime.now().isoformat()}"
# UUID-based (for anonymous sessions)
thread_id = str(uuid.uuid4())
Agent State Schema
The checkpointed state includes (defined in src/copilot/graph.py:74-82):
class AgentState(TypedDict):
"""State schema for the agent graph."""
messages: Annotated[Sequence[BaseMessage], add_messages]
title: Optional[str]
session_id: Optional[str]
user_id: Optional[str]
langfuse_enabled: Optional[bool]
generate_title: Optional[bool]
State Fields
Conversation history with automatic message deduplication via add_messages reducer
Generated title for the conversation
Session identifier for tracking and analytics
User identifier for tracking and analytics
Enable Langfuse observability callbacks
Whether to generate a conversation title automatically
Checkpoint Lifecycle
1. Checkpoint Creation
A checkpoint is created after each node execution:
User Message → Support Bot → Tool Call → Tool Execution → Support Bot → Response
↓ ↓ ↓ ↓ ↓ ↓
Checkpoint Checkpoint Checkpoint Checkpoint Checkpoint Checkpoint
2. Checkpoint Retrieval
When continuing a conversation, LangGraph:
- Loads the latest checkpoint for the
thread_id
- Restores the full agent state
- Continues execution from that point
3. Checkpoint History
You can access historical checkpoints:
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver(conn)
# Get checkpoint history for a thread
history = checkpointer.list(
config={"configurable": {"thread_id": "user-123-session-1"}}
)
for checkpoint in history:
print(f"Checkpoint: {checkpoint.id}")
print(f"Created: {checkpoint.created_at}")
print(f"Messages: {len(checkpoint.state['messages'])}")
Streaming with Checkpoints
Checkpoints work seamlessly with streaming:
for mode, chunk in app.stream(
config={"configurable": {"thread_id": "user-123-session-1"}},
input={"messages": [("user", "Tell me about recent incidents")]},
stream_mode=["messages", "custom"]
):
if mode == "messages":
token_chunk, metadata = chunk
if token_chunk.content:
print(token_chunk.content, end="", flush=True)
elif mode == "custom":
# Handle status updates
print(f"Update: {chunk}")
Checkpoint Timing:
- Checkpoints are saved after each node completes
- Streaming tokens are not checkpointed (only final messages)
- If streaming is interrupted, the last complete checkpoint is preserved
Production Considerations
Connection Pooling
For production deployments, use a connection pool:
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
import src.copilot.config as config
# Create connection pool
pool = ConnectionPool(
conninfo=config.VECTOR_DATABASE_URL,
min_size=5,
max_size=20,
kwargs={
"prepare_threshold": 0,
"autocommit": True,
}
)
# Use pooled connections
with pool.connection() as conn:
checkpointer = PostgresSaver(conn)
checkpointer.setup()
app = workflow.compile(checkpointer=checkpointer)
Database Maintenance
Cleanup Old Checkpoints
Implement a cleanup strategy to avoid unbounded growth:
-- Delete checkpoints older than 30 days
DELETE FROM checkpoints
WHERE created_at < NOW() - INTERVAL '30 days';
-- Delete checkpoint writes for deleted checkpoints
DELETE FROM checkpoint_writes
WHERE (thread_id, checkpoint_id) NOT IN (
SELECT thread_id, checkpoint_id FROM checkpoints
);
Vacuum and Analyze
Regularly vacuum the checkpoint tables:
VACUUM ANALYZE checkpoints;
VACUUM ANALYZE checkpoint_writes;
Indexing
Create indexes for better query performance:
-- Index on thread_id for faster lookups
CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_id
ON checkpoints(thread_id);
-- Index on created_at for cleanup queries
CREATE INDEX IF NOT EXISTS idx_checkpoints_created_at
ON checkpoints(created_at);
-- Composite index for writes
CREATE INDEX IF NOT EXISTS idx_checkpoint_writes_composite
ON checkpoint_writes(thread_id, checkpoint_id);
Troubleshooting
Connection Errors
Issue: ValueError: Required environment variable VECTOR_DATABASE_URL is not set
Solution: Set the environment variable in your .env file:
VECTOR_DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Permission Errors
Issue: permission denied to create table
Solution: Ensure the database user has CREATE privileges:
GRANT CREATE ON DATABASE dbname TO your_user;
PgBouncer Compatibility
Issue: Errors with prepared statements when using PgBouncer
Solution: The connection is already configured with prepare_threshold: 0 (see src/copilot/graph.py:33). Ensure PgBouncer is in session mode:
# pgbouncer.ini
pool_mode = session
Checkpoint Size
Issue: Checkpoints growing too large with long conversations
Solution: Implement message pruning in your application logic:
def prune_messages(state: AgentState, max_messages: int = 20) -> AgentState:
"""Keep only the most recent messages to limit checkpoint size."""
if len(state["messages"]) > max_messages:
# Keep system message + recent messages
system_msgs = [m for m in state["messages"] if m.type == "system"]
recent_msgs = state["messages"][-max_messages:]
state["messages"] = system_msgs + recent_msgs
return state
Migration from Memory-Based Checkpointing
If migrating from MemorySaver:
# Before (memory-based, not persistent)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# After (PostgreSQL-based, persistent)
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection
import src.copilot.config as config
conn = Connection.connect(
config.VECTOR_DATABASE_URL,
prepare_threshold=0,
autocommit=True
)
checkpointer = PostgresSaver(conn)
checkpointer.setup()
Benefits:
- Conversations persist across server restarts
- Scales to multiple instances (shared state)
- Full conversation history available for analytics
Monitoring
Monitor checkpoint health:
-- Count checkpoints per thread
SELECT thread_id, COUNT(*) as checkpoint_count
FROM checkpoints
GROUP BY thread_id
ORDER BY checkpoint_count DESC
LIMIT 10;
-- Average checkpoint size
SELECT AVG(pg_column_size(checkpoint)) as avg_size_bytes
FROM checkpoints;
-- Most active threads (by checkpoint count)
SELECT thread_id, COUNT(*) as activity
FROM checkpoints
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY thread_id
ORDER BY activity DESC
LIMIT 10;
Best Practices
- Use Descriptive Thread IDs: Include user/session identifiers for easier debugging
- Implement Cleanup: Regularly delete old checkpoints to manage database size
- Monitor Size: Track checkpoint sizes to detect conversation bloat
- Use Connection Pools: Essential for production deployments with multiple workers
- Enable SSL: Use
sslmode=require for production database connections
- Backup Regularly: Checkpoint data is valuable conversation history