Deploying LangGraph applications requires careful consideration of persistence, scalability, monitoring, and infrastructure.
Deployment Options
LangSmith Deployment
The easiest way to deploy LangGraph applications:
# Install CLI
pip install langgraph-cli
# Initialize project
langgraph init
# Deploy to LangSmith
langgraph deploy
Benefits:
- Managed infrastructure
- Built-in observability
- Automatic scaling
- Production checkpointers
- LangGraph Studio integration
LangSmith Deployment handles persistence, scaling, and monitoring automatically.
Self-Hosted Deployment
For self-hosted deployments, you’ll need to configure:
- Web server (FastAPI, Flask)
- Persistent checkpointer (PostgreSQL, SQLite)
- Message queue (for async processing)
- Load balancer
- Monitoring and logging
Production Setup
Use a persistent checkpointer:
from langgraph.checkpoint.postgres import PostgresSaver
import os
# Production database
DB_URI = os.getenv("DATABASE_URL")
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
Wrap your graph in a web API:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
api = FastAPI()
class InvokeRequest(BaseModel):
input: dict
thread_id: str
class InvokeResponse(BaseModel):
output: dict
thread_id: str
@api.post("/invoke", response_model=InvokeResponse)
async def invoke_graph(request: InvokeRequest):
"""Invoke the graph."""
try:
config = {"configurable": {"thread_id": request.thread_id}}
result = await app.ainvoke(request.input, config)
return InvokeResponse(
output=result,
thread_id=request.thread_id,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.post("/stream")
async def stream_graph(request: InvokeRequest):
"""Stream graph execution."""
config = {"configurable": {"thread_id": request.thread_id}}
async def event_generator():
async for chunk in app.astream(request.input, config):
yield f"data: {json.dumps(chunk)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)
if __name__ == "__main__":
uvicorn.run(api, host="0.0.0.0", port=8000)
@api.get("/health")
async def health_check():
"""Health check endpoint."""
try:
# Check database connection
state = app.get_state({"configurable": {"thread_id": "health-check"}})
return {"status": "healthy"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Unhealthy: {e}")
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
openai_api_key: str
anthropic_api_key: str
log_level: str = "INFO"
max_workers: int = 4
class Config:
env_file = ".env"
settings = Settings()
Containerization
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run server
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/langgraph
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- db
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=langgraph
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Kubernetes Deployment
Deployment Manifest
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-app
spec:
replicas: 3
selector:
matchLabels:
app: langgraph
template:
metadata:
labels:
app: langgraph
spec:
containers:
- name: app
image: your-registry/langgraph-app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
Service Manifest
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
spec:
selector:
app: langgraph
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Scaling Considerations
Horizontal Scaling
LangGraph applications can scale horizontally:
# Use connection pooling
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
conninfo=DB_URI,
min_size=2,
max_size=10,
)
checkpointer = PostgresSaver(pool)
Async Processing
Handle long-running workflows asynchronously:
from celery import Celery
celery_app = Celery('langgraph', broker='redis://localhost:6379')
@celery_app.task
def process_graph(input_data: dict, thread_id: str):
"""Process graph in background."""
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke(input_data, config)
return result
# API endpoint
@api.post("/invoke-async")
async def invoke_async(request: InvokeRequest):
task = process_graph.delay(request.input, request.thread_id)
return {"task_id": task.id}
Caching
Implement caching for frequently accessed data:
from langgraph.cache.memory import InMemoryCache
cache = InMemoryCache()
app = graph.compile(
checkpointer=checkpointer,
cache=cache,
)
Monitoring
LangSmith Integration
import os
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-app"
# Traces automatically sent to LangSmith
result = app.invoke(input_data, config)
Custom Metrics
from prometheus_client import Counter, Histogram
import time
# Define metrics
invocation_counter = Counter(
'langgraph_invocations_total',
'Total graph invocations',
['status']
)
invocation_duration = Histogram(
'langgraph_invocation_duration_seconds',
'Graph invocation duration'
)
# Instrument code
def invoke_with_metrics(input_data, config):
start = time.time()
try:
result = app.invoke(input_data, config)
invocation_counter.labels(status='success').inc()
return result
except Exception as e:
invocation_counter.labels(status='error').inc()
raise
finally:
duration = time.time() - start
invocation_duration.observe(duration)
Logging
import logging
import json
# Structured logging
logger = logging.getLogger(__name__)
class StructuredLogger:
@staticmethod
def log_invocation(thread_id: str, input_data: dict, result: dict):
logger.info(json.dumps({
"event": "graph_invocation",
"thread_id": thread_id,
"input": input_data,
"output": result,
"timestamp": datetime.now().isoformat(),
}))
# Use in API
@api.post("/invoke")
async def invoke_graph(request: InvokeRequest):
result = app.invoke(request.input, config)
StructuredLogger.log_invocation(
request.thread_id,
request.input,
result,
)
return result
Security
Authentication
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify JWT token."""
token = credentials.credentials
# Verify token
if not is_valid_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
return get_user_from_token(token)
@api.post("/invoke")
async def invoke_graph(
request: InvokeRequest,
user = Depends(verify_token),
):
# Use user-specific thread_id
config = {"configurable": {"thread_id": f"{user.id}-{request.thread_id}"}}
return app.invoke(request.input, config)
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
api.state.limiter = limiter
@api.post("/invoke")
@limiter.limit("10/minute")
async def invoke_graph(request: Request, invoke_request: InvokeRequest):
# Rate limited to 10 requests per minute
return app.invoke(invoke_request.input, config)
Best Practices
- Use persistent checkpointers: PostgreSQL or managed services for production
- Implement health checks: Monitor application and database health
- Enable tracing: Use LangSmith for observability
- Handle errors gracefully: Return meaningful error messages
- Validate input: Check user input before processing
- Set resource limits: Prevent resource exhaustion
- Use environment variables: Never hardcode secrets
- Implement retries: Handle transient failures
- Monitor performance: Track latency and throughput
- Plan for scaling: Design for horizontal scaling from the start
Next Steps