Skip to main content
Deploying LangGraph applications requires careful consideration of persistence, scalability, monitoring, and infrastructure.

Deployment Options

LangSmith Deployment

The easiest way to deploy LangGraph applications:
# Install CLI
pip install langgraph-cli

# Initialize project
langgraph init

# Deploy to LangSmith
langgraph deploy
Benefits:
  • Managed infrastructure
  • Built-in observability
  • Automatic scaling
  • Production checkpointers
  • LangGraph Studio integration
LangSmith Deployment handles persistence, scaling, and monitoring automatically.

Self-Hosted Deployment

For self-hosted deployments, you’ll need to configure:
  1. Web server (FastAPI, Flask)
  2. Persistent checkpointer (PostgreSQL, SQLite)
  3. Message queue (for async processing)
  4. Load balancer
  5. Monitoring and logging

Production Setup

1
Configure Checkpointer
2
Use a persistent checkpointer:
3
from langgraph.checkpoint.postgres import PostgresSaver
import os

# Production database
DB_URI = os.getenv("DATABASE_URL")

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    app = graph.compile(checkpointer=checkpointer)
4
Create API Server
5
Wrap your graph in a web API:
6
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

api = FastAPI()

class InvokeRequest(BaseModel):
    input: dict
    thread_id: str

class InvokeResponse(BaseModel):
    output: dict
    thread_id: str

@api.post("/invoke", response_model=InvokeResponse)
async def invoke_graph(request: InvokeRequest):
    """Invoke the graph."""
    try:
        config = {"configurable": {"thread_id": request.thread_id}}
        result = await app.ainvoke(request.input, config)
        
        return InvokeResponse(
            output=result,
            thread_id=request.thread_id,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@api.post("/stream")
async def stream_graph(request: InvokeRequest):
    """Stream graph execution."""
    config = {"configurable": {"thread_id": request.thread_id}}
    
    async def event_generator():
        async for chunk in app.astream(request.input, config):
            yield f"data: {json.dumps(chunk)}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

if __name__ == "__main__":
    uvicorn.run(api, host="0.0.0.0", port=8000)
7
Add Health Checks
8
@api.get("/health")
async def health_check():
    """Health check endpoint."""
    try:
        # Check database connection
        state = app.get_state({"configurable": {"thread_id": "health-check"}})
        return {"status": "healthy"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Unhealthy: {e}")
9
Configure Environment
10
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    openai_api_key: str
    anthropic_api_key: str
    log_level: str = "INFO"
    max_workers: int = 4
    
    class Config:
        env_file = ".env"

settings = Settings()

Containerization

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run server
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/langgraph
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - db
  
  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=langgraph
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

Kubernetes Deployment

Deployment Manifest

apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph
  template:
    metadata:
      labels:
        app: langgraph
    spec:
      containers:
      - name: app
        image: your-registry/langgraph-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10

Service Manifest

apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
spec:
  selector:
    app: langgraph
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Scaling Considerations

Horizontal Scaling

LangGraph applications can scale horizontally:
# Use connection pooling
from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    conninfo=DB_URI,
    min_size=2,
    max_size=10,
)

checkpointer = PostgresSaver(pool)

Async Processing

Handle long-running workflows asynchronously:
from celery import Celery

celery_app = Celery('langgraph', broker='redis://localhost:6379')

@celery_app.task
def process_graph(input_data: dict, thread_id: str):
    """Process graph in background."""
    config = {"configurable": {"thread_id": thread_id}}
    result = app.invoke(input_data, config)
    return result

# API endpoint
@api.post("/invoke-async")
async def invoke_async(request: InvokeRequest):
    task = process_graph.delay(request.input, request.thread_id)
    return {"task_id": task.id}

Caching

Implement caching for frequently accessed data:
from langgraph.cache.memory import InMemoryCache

cache = InMemoryCache()

app = graph.compile(
    checkpointer=checkpointer,
    cache=cache,
)

Monitoring

LangSmith Integration

import os

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-app"

# Traces automatically sent to LangSmith
result = app.invoke(input_data, config)

Custom Metrics

from prometheus_client import Counter, Histogram
import time

# Define metrics
invocation_counter = Counter(
    'langgraph_invocations_total',
    'Total graph invocations',
    ['status']
)

invocation_duration = Histogram(
    'langgraph_invocation_duration_seconds',
    'Graph invocation duration'
)

# Instrument code
def invoke_with_metrics(input_data, config):
    start = time.time()
    
    try:
        result = app.invoke(input_data, config)
        invocation_counter.labels(status='success').inc()
        return result
    except Exception as e:
        invocation_counter.labels(status='error').inc()
        raise
    finally:
        duration = time.time() - start
        invocation_duration.observe(duration)

Logging

import logging
import json

# Structured logging
logger = logging.getLogger(__name__)

class StructuredLogger:
    @staticmethod
    def log_invocation(thread_id: str, input_data: dict, result: dict):
        logger.info(json.dumps({
            "event": "graph_invocation",
            "thread_id": thread_id,
            "input": input_data,
            "output": result,
            "timestamp": datetime.now().isoformat(),
        }))

# Use in API
@api.post("/invoke")
async def invoke_graph(request: InvokeRequest):
    result = app.invoke(request.input, config)
    StructuredLogger.log_invocation(
        request.thread_id,
        request.input,
        result,
    )
    return result

Security

Authentication

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify JWT token."""
    token = credentials.credentials
    
    # Verify token
    if not is_valid_token(token):
        raise HTTPException(status_code=401, detail="Invalid token")
    
    return get_user_from_token(token)

@api.post("/invoke")
async def invoke_graph(
    request: InvokeRequest,
    user = Depends(verify_token),
):
    # Use user-specific thread_id
    config = {"configurable": {"thread_id": f"{user.id}-{request.thread_id}"}}
    return app.invoke(request.input, config)

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
api.state.limiter = limiter

@api.post("/invoke")
@limiter.limit("10/minute")
async def invoke_graph(request: Request, invoke_request: InvokeRequest):
    # Rate limited to 10 requests per minute
    return app.invoke(invoke_request.input, config)

Best Practices

  • Use persistent checkpointers: PostgreSQL or managed services for production
  • Implement health checks: Monitor application and database health
  • Enable tracing: Use LangSmith for observability
  • Handle errors gracefully: Return meaningful error messages
  • Validate input: Check user input before processing
  • Set resource limits: Prevent resource exhaustion
  • Use environment variables: Never hardcode secrets
  • Implement retries: Handle transient failures
  • Monitor performance: Track latency and throughput
  • Plan for scaling: Design for horizontal scaling from the start

Next Steps

Build docs developers (and LLMs) love