Skip to main content

Objectives

By the end of this lab you will be able to:
  • Apply performance optimization techniques for MCP servers and databases
  • Implement comprehensive security hardening measures
  • Design scalable architecture patterns for production environments
  • Establish monitoring, maintenance, and operational procedures
  • Optimize costs while maintaining performance and reliability
  • Contribute to the MCP community and ecosystem

Prerequisites

  • Completed all previous labs (00–11)
  • MCP server deployed to a staging or production environment
  • Monitoring and alerting configured

Performance optimization

Connection pool tuning

import os
from multiprocessing import cpu_count

POOL_CONFIG = {
    # Size: scale with available CPU cores
    "min_size": max(2, cpu_count()),
    "max_size": min(20, cpu_count() * 4),

    # Connection lifecycle
    "max_inactive_connection_lifetime": 300,   # 5 minutes
    "command_timeout": 30,
    "max_queries": 50000,  # Rotate connections after this many queries

    # PostgreSQL session settings
    "server_settings": {
        "application_name": "mcp-server-prod",
        "jit": "off",                          # Disable JIT for predictable latency
        "work_mem": "8MB",
        "shared_preload_libraries": "pg_stat_statements",
        "log_statement": "mod",                # Log only modifications
        "log_min_duration_statement": "1s",   # Log queries over 1 second
    }
}

Query optimization with caching

import hashlib
import json
import time
from typing import Any, Optional, Callable
import redis

class SmartCache:
    """Multi-level cache: memory (L1) → Redis (L2)."""

    def __init__(self, redis_url: Optional[str] = None):
        self.memory_cache = {}
        self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
        self.max_memory_items = 1000

    async def get(self, key: str) -> Optional[Any]:
        # L1: memory
        entry = self.memory_cache.get(key)
        if entry and time.time() < entry['expires']:
            return entry['value']

        # L2: Redis
        if self.redis_client:
            try:
                import pickle
                cached = self.redis_client.get(key)
                if cached:
                    value = pickle.loads(cached)
                    self._set_memory(key, value, 60)  # Promote to L1 for 1 minute
                    return value
            except Exception as e:
                logger.warning(f"Redis get failed: {e}")

        return None

    async def set(self, key: str, value: Any, ttl: int = 300):
        self._set_memory(key, value, ttl)
        if self.redis_client:
            try:
                import pickle
                self.redis_client.setex(key, ttl, pickle.dumps(value))
            except Exception as e:
                logger.warning(f"Redis set failed: {e}")

    def _set_memory(self, key: str, value: Any, ttl: int):
        # Simple LRU eviction when cache is full
        if len(self.memory_cache) >= self.max_memory_items:
            oldest = min(self.memory_cache.keys(),
                         key=lambda k: self.memory_cache[k].get('timestamp', 0))
            del self.memory_cache[oldest]
        self.memory_cache[key] = {
            'value': value,
            'timestamp': time.time(),
            'expires': time.time() + ttl
        }

def generate_cache_key(query: str, user_context: str, params: dict = None) -> str:
    """Consistent, stable cache key for a query + context combination."""
    parts = [query.strip().lower(), user_context,
             json.dumps(params, sort_keys=True) if params else ""]
    return hashlib.sha256("|".join(parts).encode()).hexdigest()

Async batch processing with circuit breaker

import asyncio
from asyncio import Semaphore

class AsyncOptimizer:
    """Async patterns for high-throughput MCP operations."""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)
        self.circuit_breaker = CircuitBreaker()

    async def batch_process(self, items, process_func, batch_size: int = 100):
        """Process items in bounded concurrent batches."""
        async def process_batch(batch):
            async with self.semaphore:
                return await asyncio.gather(
                    *[process_func(item) for item in batch],
                    return_exceptions=True
                )

        results = []
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            results.extend(await process_batch(batch))
            if i + batch_size < len(items):
                await asyncio.sleep(0.1)  # Yield to other coroutines
        return results

class CircuitBreaker:
    """Prevent cascading failures by stopping calls to an unhealthy dependency."""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED → OPEN → HALF_OPEN → CLOSED

    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN — dependency unhealthy")

        try:
            result = await func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                logger.error(f"Circuit breaker opened after {self.failure_count} failures")
            raise

Security hardening

Input validation and SQL safety

import re
from typing import List

class InputValidator:
    """SQL injection prevention and input sanitization."""

    # Patterns that must never appear in user-provided SQL
    FORBIDDEN_PATTERNS = [
        r";\s*(DROP|DELETE|UPDATE|INSERT|ALTER|CREATE)\s+",
        r"--.*",          # SQL comments
        r"/\*.*\*/",      # Block comments
        r"xp_cmdshell",   # SQL Server RCE
        r"sp_executesql",
        r"EXEC\s*\(",
    ]

    VALID_TABLES = {
        "retail.stores", "retail.customers", "retail.products",
        "retail.sales_transactions", "retail.sales_transaction_items",
        "retail.product_categories", "retail.product_embeddings"
    }

    @classmethod
    def validate_sql_query(cls, query: str) -> bool:
        """Return False if the query contains any forbidden patterns."""
        for pattern in cls.FORBIDDEN_PATTERNS:
            if re.search(pattern, query, re.IGNORECASE):
                logger.warning(f"Blocked query with pattern: {pattern}")
                return False
        if not query.strip().upper().startswith("SELECT"):
            return False
        return True

    @classmethod
    def sanitize_table_name(cls, table_name: str) -> str:
        """Validate and return the table name, raising ValueError if invalid."""
        if not re.match(r"^[a-zA-Z0-9_.]+$", table_name):
            raise ValueError("Invalid table name format")
        if table_name not in cls.VALID_TABLES:
            raise ValueError(f"Table '{table_name}' is not in the allowed list")
        return table_name

Data protection utilities

from cryptography.fernet import Fernet
import hashlib
import os

class DataProtection:
    """Encryption and data masking utilities."""

    def __init__(self):
        self.cipher_suite = Fernet(self._get_encryption_key())

    def _get_encryption_key(self) -> bytes:
        """Retrieve encryption key from Azure Key Vault or environment."""
        key = os.getenv("DEV_ENCRYPTION_KEY")
        if not key:
            raise ValueError("No encryption key configured")
        return key.encode()

    def encrypt(self, data: str) -> str:
        return self.cipher_suite.encrypt(data.encode()).decode()

    def decrypt(self, encrypted_data: str) -> str:
        return self.cipher_suite.decrypt(encrypted_data.encode()).decode()

    @staticmethod
    def mask_sensitive_logs(log_data: dict) -> dict:
        """Replace sensitive field values with masked versions for logging."""
        sensitive_keys = ['password', 'token', 'secret', 'key', 'authorization',
                          'x-api-key', 'client_secret', 'connection_string']
        masked = log_data.copy()
        for key in list(masked.keys()):
            if any(s in key.lower() for s in sensitive_keys):
                value = str(masked[key])
                masked[key] = value[:2] + "*" * max(0, len(value) - 4) + value[-2:] if len(value) > 4 else "***"
        return masked

Production deployment checklist

Environment validation

class ProductionConfig:
    """Validate all required production settings on startup."""

    REQUIRED_SETTINGS = [
        "AZURE_CLIENT_ID", "AZURE_CLIENT_SECRET", "AZURE_TENANT_ID",
        "PROJECT_ENDPOINT", "AZURE_OPENAI_ENDPOINT",
        "POSTGRES_HOST", "POSTGRES_PASSWORD",
        "APPLICATIONINSIGHTS_CONNECTION_STRING"
    ]

    def validate_production_requirements(self):
        missing = [s for s in self.REQUIRED_SETTINGS if not os.getenv(s)]
        if missing:
            raise EnvironmentError(f"Missing required production settings: {missing}")

    def setup_production_logging(self):
        import logging.handlers
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(sys.stdout),
                logging.handlers.RotatingFileHandler(
                    '/var/log/mcp-server.log',
                    maxBytes=50 * 1024 * 1024,  # 50 MB
                    backupCount=5
                )
            ]
        )
        # Reduce noise from third-party libraries
        logging.getLogger('azure').setLevel(logging.WARNING)
        logging.getLogger('urllib3').setLevel(logging.WARNING)
-- Core business query patterns
CREATE INDEX CONCURRENTLY idx_orders_store_date
    ON retail.orders (store_id, order_date DESC);

CREATE INDEX CONCURRENTLY idx_order_items_product
    ON retail.order_items (product_id);

CREATE INDEX CONCURRENTLY idx_customers_store_email
    ON retail.customers (store_id, email);

-- Analytics aggregation indexes
CREATE INDEX CONCURRENTLY idx_orders_date_amount
    ON retail.orders (order_date, total_amount);

CREATE INDEX CONCURRENTLY idx_products_category_price
    ON retail.products (category_id, unit_price);

-- Vector search (IVFFlat for large catalogs, HNSW for smaller)
CREATE INDEX CONCURRENTLY idx_embeddings_vector
    ON retail.product_description_embeddings
    USING ivfflat (description_embedding vector_cosine_ops)
    WITH (lists = 100);

Cost optimization

class CostOptimizer:
    """Dynamic resource adjustment to control Azure spending."""

    async def optimize_database_connections(self, current_load: float):
        """Scale the connection pool up or down based on current load."""
        if current_load < 0.3:
            target = max(2, int(current_load * 10))
        elif current_load < 0.7:
            target = max(5, int(current_load * 15))
        else:
            target = min(20, int(current_load * 25))

        await db_provider.adjust_pool_size(target)
        logger.info(f"Connection pool adjusted to {target} for load {current_load:.2f}")

    def estimate_monthly_costs(self) -> dict:
        """Return estimated Azure resource costs for capacity planning."""
        return {
            "container_apps": "$20–80 (scales to zero when idle)",
            "postgresql_flexible_server": "$150–300 (Standard_D4s_v3)",
            "azure_openai_embeddings": "$5–20 (text-embedding-3-small @ $0.00002/1K tokens)",
            "application_insights": "$5–15 (first 5 GB/month free)",
            "container_registry": "$5 (Basic tier)"
        }

Comprehensive health monitoring

class OperationalHealth:
    """Scheduled comprehensive health checks with alert integration."""

    async def comprehensive_health_check(self) -> dict:
        health_report = {
            "timestamp": datetime.utcnow().isoformat(),
            "overall_status": "healthy",
            "components": {}
        }

        checks = {
            "database": self.check_database_health,
            "ai_service": self.check_ai_service_health,
            "system": self.check_system_resources,
        }

        for name, check_func in checks.items():
            health_report["components"][name] = await check_func()

        failed = [k for k, v in health_report["components"].items()
                  if v.get("status") != "healthy"]

        if failed:
            health_report["overall_status"] = "unhealthy"
            health_report["failed_components"] = failed
            await alert_manager.evaluate_metrics({
                "database_status": health_report["components"].get("database", {}).get("status")
            })

        return health_report

    async def check_database_health(self) -> dict:
        try:
            start = time.time()
            async with db_provider.get_connection() as conn:
                await conn.fetchval("SELECT 1")
                connection_count = await conn.fetchval(
                    "SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
                )
            return {
                "status": "healthy",
                "response_time_ms": (time.time() - start) * 1000,
                "active_connections": connection_count
            }
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}

Final project: production-ready assessment

Before considering this learning path complete, verify your implementation covers:
  • Multi-tenant schema with store_id on every transactional table
  • RLS policies enabled and tested for all tables
  • pgvector HNSW index on product embeddings
  • Full-text search index on product names and descriptions
  • Audit log table with appropriate indexes
  • Automated backup script with Azure Storage upload
  • FastMCP with all required tools registered
  • RLS context set per-request from x-rls-user-id header
  • Query validator blocking dangerous SQL patterns
  • Connection pool with configurable min/max sizes
  • Health endpoints (/health, /health/ready, /health/live)
  • Graceful startup/shutdown with resource cleanup
  • Azure Entra ID JWT validation with JWKS caching
  • Role-based authorization with permission inheritance
  • Input validation on all tool parameters
  • Sensitive values masked in all log output
  • Security audit log with monitoring views
  • Application Insights configured with OpenTelemetry
  • Structured JSON logging for all key operations
  • Custom metrics for requests, queries, tools, and errors
  • Alert rules with cooldown periods for all critical conditions
  • Azure Monitor dashboard with KQL queries
  • Multi-stage Dockerfile with non-root user
  • Docker Compose for local development
  • Bicep templates for Azure Container Apps and PostgreSQL
  • GitHub Actions CI/CD with automated tests and staged deployment
  • Auto-scaling rules based on HTTP concurrency and CPU

Community and next steps

Contributing to MCP

  • Follow PEP 8 for Python code style
  • Maintain test coverage above 90%
  • Use type hints throughout the codebase
  • Write docstrings for all public functions and classes
  • Report security vulnerabilities privately before public disclosure

Advanced learning paths

  • MCP Architecture Patterns — Advanced server architectures for complex use cases
  • Multi-Model Integration — Combining different AI models within a single MCP server
  • Enterprise Scale — Large-scale MCP deployments with thousands of concurrent users
  • Custom Tool Development — Building specialized MCP tools for domain-specific workflows

Community resources

Key takeaways from the full learning path

  • RLS + shared schema is the right multi-tenancy model for most retail analytics workloads
  • FastMCP reduces server boilerplate while preserving full protocol compliance
  • Azure OpenAI + pgvector provides production-quality semantic search without a separate vector database
  • Structured logging + distributed tracing dramatically reduces mean time to diagnosis
  • Multi-stage Docker builds and non-root users are non-negotiable in production
  • CI/CD with staged deployments enables zero-downtime releases and safe rollbacks
  • Circuit breakers and connection pool sizing prevent cascading failures under load

Return to Overview

Review the full lab structure and learning paths.

Sample Repository

Explore the complete working implementation on GitHub.

Build docs developers (and LLMs) love