Skip to main content
This guide covers best practices, configuration patterns, and deployment strategies for running VectorDB pipelines in production.

Deployment checklist

Before deploying to production, ensure you have:
  • Benchmarked retrieval quality on representative queries
  • Tuned top_k, candidate_pool_size, and reranking settings
  • Set up monitoring and observability
  • Configured proper logging levels
  • Secured API keys using environment variables
  • Tested error handling and fallback behavior
  • Established cost budgets and alerts
  • Implemented rate limiting for LLM calls
  • Validated latency meets SLO requirements

Environment configuration

Production environment variables

Use environment variables for all secrets and environment-specific settings:
.env.production
# Vector Database Credentials
PINECONE_API_KEY=pc-prod-xxxx
WEAVIATE_URL=https://prod-cluster.weaviate.network
WEAVIATE_API_KEY=weaviate-prod-key
MILVUS_URI=https://prod-milvus.example.com:19530
MILVUS_TOKEN=milvus-prod-token
QDRANT_URL=https://qdrant-prod.example.com
QDRANT_API_KEY=qdrant-prod-key

# LLM API Keys
GROQ_API_KEY=gsk_prod_xxxx
OPENAI_API_KEY=sk-prod-xxxx
COHERE_API_KEY=cohere-prod-xxxx

# Deployment Settings
LOG_LEVEL=WARNING
ENABLE_TELEMETRY=true
MAX_RETRIES=3
TIMEOUT_SECONDS=30
See the environment variables reference for the complete list.

Configuration file structure

Organize configurations by environment:
configs/
├── base.yaml              # Shared settings
├── development.yaml       # Dev overrides
├── staging.yaml          # Staging overrides
└── production.yaml       # Production settings
Base configuration (configs/base.yaml):
embeddings:
  model: "Qwen/Qwen3-Embedding-0.6B"
  batch_size: 32
  device: "cpu"

logging:
  name: "vectordb_pipeline"
  level: "INFO"
Production overrides (configs/production.yaml):
embeddings:
  batch_size: 64  # Higher throughput in production
  device: "cuda"  # GPU acceleration

search:
  top_k: 10
  candidate_pool_size: 15  # Cost-optimized

rag:
  enabled: true
  model: "llama-3.3-70b-versatile"
  api_key: "${GROQ_API_KEY}"
  temperature: 0.7
  max_tokens: 2048

logging:
  level: "WARNING"  # Reduce log volume

retry:
  max_attempts: 3
  backoff_factor: 2
Load environment-specific config:
import os
from vectordb.utils.config_loader import ConfigLoader

env = os.getenv("ENVIRONMENT", "development")
config_path = f"configs/{env}.yaml"

pipeline = PineconeSemanticSearchPipeline(config_path)

Database-specific deployment considerations

Namespace strategy:
pinecone:
  api_key: "${PINECONE_API_KEY}"
  index_name: "production-index"
  namespace: "v1"  # Version namespaces for zero-downtime updates
  dimension: 1024
  metric: "cosine"
  recreate: false  # Never recreate in production
Multi-tenancy:Use namespaces for tenant isolation (scales to 100,000+ tenants):
for tenant_id in tenant_ids:
    result = pipeline.search(
        query=user_query,
        top_k=10,
        namespace=f"tenant-{tenant_id}"
    )
Best practices:
  • Monitor pod utilization and scale replicas based on QPS
  • Use serverless indexes for variable workloads
  • Implement retry logic for rate limit errors (429)
Connection configuration:
weaviate:
  cluster_url: "${WEAVIATE_URL}"
  api_key: "${WEAVIATE_API_KEY}"
  timeout: 30
  connection_pool_size: 10
Multi-tenancy:Weaviate supports native multi-tenancy with per-tenant shards:
# Initialize with tenant support
pipeline = WeaviateSemanticSearchPipeline(
    config_path,
    tenant="customer-123"
)
Best practices:
  • Use batch imports for initial indexing (100+ docs/batch)
  • Enable quantization (PQ or BQ) to reduce memory 4x
  • Monitor shard health and replication status
Production configuration:
milvus:
  uri: "${MILVUS_URI}"
  token: "${MILVUS_TOKEN}"
  collection_name: "production_collection"
  dimension: 1024
  recreate: false
  batch_size: 100
Partition-based multi-tenancy:
# Scales to millions of tenants using partition keys
config = {
    "milvus": {
        "partition_key": "tenant_id",
        "num_partitions": 1000
    }
}
Best practices:
  • Use scalar quantization (SQ8) for 4x storage reduction
  • Enable partition pruning with metadata filters
  • Monitor memory usage per collection
  • Set appropriate index_file_size for write throughput
Production setup:
qdrant:
  url: "${QDRANT_URL}"
  api_key: "${QDRANT_API_KEY}"
  collection_name: "production_docs"
  timeout: 30
  prefer_grpc: true  # Better performance
Payload-based multi-tenancy:
# Use payload filters for tenant isolation
filters = {
    "must": [
        {"key": "tenant_id", "match": {"value": tenant_id}}
    ]
}

result = pipeline.search(
    query=user_query,
    top_k=10,
    filters=filters
)
Best practices:
  • Enable payload indexing for frequently filtered fields
  • Use quantization (scalar or binary) for large datasets
  • Monitor disk usage and configure storage thresholds
  • Use gRPC instead of HTTP for lower latency
Production configuration:
chroma:
  host: "${CHROMA_HOST:-localhost}"
  port: ${CHROMA_PORT:-8000}
  tenant: "default"
  database: "production_db"
Best practices:
  • Run Chroma server in Docker for production
  • Use persistent storage volumes
  • Implement connection pooling for concurrent requests
  • Monitor collection size and query latency

Logging and monitoring

Production logging configuration

Set appropriate log levels by environment:
logging:
  name: "vectordb_production"
  level: "${LOG_LEVEL:-WARNING}"
  format: "json"  # Structured logging for analysis
  handlers:
    - type: "file"
      path: "/var/log/vectordb/pipeline.log"
      max_bytes: 10485760  # 10MB
      backup_count: 5
    - type: "console"
      level: "ERROR"
Custom logging setup:
import logging
from vectordb.utils.logging import LoggerFactory

# Configure structured logging
logger_factory = LoggerFactory(
    name="vectordb_production",
    log_level=logging.WARNING,
    log_format="json"
)
logger = logger_factory.get_logger()

logger.warning("High latency detected", extra={
    "query": query_text,
    "latency_ms": elapsed * 1000,
    "database": "pinecone",
    "top_k": 10
})

Key metrics to monitor

Query latency

  • p50, p95, p99 latency by query type
  • Breakdown: embedding, retrieval, reranking, generation
  • Alert on p95 > SLO threshold

Retrieval quality

  • Online Recall@k and MRR
  • User feedback signals (clicks, dwell time)
  • Fallback rate (queries with no results)

Cost metrics

  • LLM API token usage per query
  • Embedding API costs
  • Database operations cost
  • Cost per 1000 queries

System health

  • Database connection errors
  • API rate limit hits
  • Retry and timeout rates
  • Error rates by type

Example monitoring implementation

import time
import logging
from prometheus_client import Counter, Histogram

# Define metrics
query_latency = Histogram(
    "vectordb_query_latency_seconds",
    "Query latency in seconds",
    ["database", "strategy"]
)

query_counter = Counter(
    "vectordb_queries_total",
    "Total queries processed",
    ["database", "status"]
)

error_counter = Counter(
    "vectordb_errors_total",
    "Total errors",
    ["database", "error_type"]
)

# Instrument pipeline
class MonitoredPipeline:
    def __init__(self, base_pipeline, database_name):
        self.pipeline = base_pipeline
        self.database = database_name
        self.logger = logging.getLogger(__name__)
    
    def search(self, query: str, top_k: int = 10):
        start = time.time()
        
        try:
            result = self.pipeline.search(query, top_k=top_k)
            
            # Record success metrics
            elapsed = time.time() - start
            query_latency.labels(
                database=self.database,
                strategy="semantic"
            ).observe(elapsed)
            
            query_counter.labels(
                database=self.database,
                status="success"
            ).inc()
            
            self.logger.info(
                "Query completed",
                extra={
                    "latency_ms": elapsed * 1000,
                    "num_results": len(result["documents"])
                }
            )
            
            return result
            
        except Exception as e:
            # Record error metrics
            error_counter.labels(
                database=self.database,
                error_type=type(e).__name__
            ).inc()
            
            query_counter.labels(
                database=self.database,
                status="error"
            ).inc()
            
            self.logger.error(
                f"Query failed: {str(e)}",
                extra={
                    "query": query,
                    "error_type": type(e).__name__
                },
                exc_info=True
            )
            
            raise

# Usage
base_pipeline = PineconeSemanticSearchPipeline("config.yaml")
monitored_pipeline = MonitoredPipeline(base_pipeline, "pinecone")

result = monitored_pipeline.search("What is quantum computing?", top_k=10)

Error handling and resilience

Retry logic with exponential backoff

import time
from typing import TypeVar, Callable

T = TypeVar('T')

def retry_with_backoff(
    func: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
) -> T:
    """Retry function with exponential backoff."""
    
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = min(
                base_delay * (exponential_base ** attempt),
                max_delay
            )
            
            logging.warning(
                f"Attempt {attempt + 1} failed: {str(e)}. "
                f"Retrying in {delay:.1f}s..."
            )
            
            time.sleep(delay)
    
    raise RuntimeError("Max retries exceeded")

# Usage
result = retry_with_backoff(
    lambda: pipeline.search(query, top_k=10),
    max_retries=3
)

Circuit breaker pattern

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def call(self, func):
        if self.state == "open":
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func()
            if self.state == "half-open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = datetime.now()
            
            if self.failures >= self.failure_threshold:
                self.state = "open"
            
            raise

# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60)

try:
    result = breaker.call(
        lambda: pipeline.search(query, top_k=10)
    )
except Exception as e:
    # Fall back to cached results or degraded service
    result = get_cached_results(query)

Performance optimization

Cost-optimized configuration

Reduce costs while maintaining quality:
search:
  candidate_pool_size: 15  # Reduced from 50
  top_k: 10

cost_optimization:
  context_budget: 2000  # Max tokens for LLM
  model_tiering:
    routing: "llama-3.1-8b-instant"  # Cheaper model
    generation: "llama-3.3-70b-versatile"
  compression:
    enabled: true
    strategy: "extractive"
    num_sentences: 5

rag:
  enabled: true
  model: "${COST_OPTIMIZATION_MODEL_TIERING_GENERATION}"
  api_key: "${GROQ_API_KEY}"

Caching strategy

from functools import lru_cache
import hashlib

class CachedPipeline:
    def __init__(self, pipeline, cache_size=1000):
        self.pipeline = pipeline
        self._search_cached = lru_cache(maxsize=cache_size)(self._search)
    
    def _search(self, query_hash: str, top_k: int):
        return self.pipeline.search(query_hash, top_k=top_k)
    
    def search(self, query: str, top_k: int = 10):
        # Hash query for cache key
        query_hash = hashlib.md5(
            f"{query}:{top_k}".encode()
        ).hexdigest()
        
        return self._search_cached(query_hash, top_k)

# Usage
cached_pipeline = CachedPipeline(pipeline, cache_size=1000)
result = cached_pipeline.search("What is photosynthesis?", top_k=10)

Deployment patterns

Containerized deployment

Sample Dockerfile:
FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Copy dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen

# Copy application
COPY src/ ./src/
COPY configs/ ./configs/

# Set environment
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production

# Run application
CMD ["uv", "run", "python", "src/main.py"]

Kubernetes deployment

deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: vectordb-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: vectordb-pipeline
  template:
    metadata:
      labels:
        app: vectordb-pipeline
    spec:
      containers:
      - name: pipeline
        image: vectordb-pipeline:latest
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: PINECONE_API_KEY
          valueFrom:
            secretKeyRef:
              name: vectordb-secrets
              key: pinecone-api-key
        - name: GROQ_API_KEY
          valueFrom:
            secretKeyRef:
              name: vectordb-secrets
              key: groq-api-key
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"

Next steps

Benchmarking

Validate production performance with benchmarks

Configuration

Fine-tune production settings

Environment variables

Complete reference for production credentials

Building RAG pipelines

Learn core RAG pipeline concepts

Build docs developers (and LLMs) love