Skip to main content

Objectives

By the end of this lab you will be able to:
  • Analyze the layered architecture of an MCP server with database integration
  • Understand the role and responsibilities of each architectural component
  • Design database schemas that support multi-tenant MCP applications
  • Implement connection pooling and resource management strategies
  • Apply error handling and logging patterns for production systems
  • Evaluate trade-offs between different architectural approaches

Prerequisites

  • Completed Lab 0: Introduction
  • Basic Python knowledge
  • Familiarity with SQL and relational databases

MCP server architecture layers

The server implements a layered architecture that separates concerns and promotes maintainability.

Layer 1: Protocol layer (FastMCP)

Handles MCP protocol communication and message routing.
from fastmcp import FastMCP

mcp = FastMCP("Zava Retail Analytics")

# Tool registration with type safety
@mcp.tool()
async def execute_sales_query(
    ctx: Context,
    postgresql_query: Annotated[str, Field(description="Well-formed PostgreSQL query")]
) -> str:
    """Execute PostgreSQL queries with Row Level Security."""
    return await query_executor.execute(postgresql_query, ctx)
Key features: protocol compliance, Pydantic type safety, async support, standardized errors.

Layer 2: Business logic layer

Implements business rules and coordinates between protocol and data layers.
class SalesAnalyticsService:
    """Business logic for retail analytics operations."""

    async def get_store_performance(
        self,
        store_id: str,
        time_period: str
    ) -> Dict[str, Any]:
        """Calculate store performance metrics."""

        if not self._validate_store_access(store_id):
            raise UnauthorizedError("Access denied for store")

        sales_data = await self.db_provider.get_sales_data(store_id, time_period)
        metrics = self._calculate_metrics(sales_data)

        return {
            "store_id": store_id,
            "period": time_period,
            "metrics": metrics,
            "insights": self._generate_insights(metrics)
        }

Layer 3: Data access layer

Manages database connections, query execution, and data mapping.
class PostgreSQLProvider:
    """Data access layer for PostgreSQL operations."""

    async def execute_query(
        self,
        query: str,
        rls_user_id: str
    ) -> List[Dict[str, Any]]:
        """Execute query with RLS context."""

        async with self.connection_pool.acquire() as conn:
            # Set RLS context before query execution
            await conn.execute(
                "SELECT set_config('app.current_rls_user_id', $1, false)",
                rls_user_id
            )

            try:
                rows = await asyncio.wait_for(
                    conn.fetch(query),
                    timeout=30.0
                )
                return [dict(row) for row in rows]
            except asyncio.TimeoutError:
                raise QueryTimeoutError("Query execution exceeded timeout")

Layer 4: Infrastructure layer

Handles cross-cutting concerns like logging, monitoring, and configuration.
class InfrastructureManager:
    """Infrastructure concerns management."""

    def __init__(self):
        self.logger = self._setup_logging()
        self.metrics = self._setup_metrics()
        self.config = self._load_configuration()

    def _setup_logging(self) -> Logger:
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(),
                logging.FileHandler('mcp_server.log')
            ]
        )
        return logging.getLogger(__name__)

Database design patterns

Multi-tenant schema design

Every transactional table includes store_id to support RLS-based isolation:
CREATE TABLE retail.stores (
    store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) NOT NULL,
    location VARCHAR(200) NOT NULL,
    manager_id UUID NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE retail.customers (
    customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    store_id UUID REFERENCES retail.stores(store_id),
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE retail.orders (
    order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id UUID REFERENCES retail.customers(customer_id),
    store_id UUID REFERENCES retail.stores(store_id),
    order_date TIMESTAMP DEFAULT NOW(),
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending'
);
Design principles:
  • Foreign key consistency for data integrity across tables
  • store_id propagated to every transactional table
  • UUID primary keys for globally unique identifiers in distributed systems
  • Timestamp tracking for audit trails

Row Level Security implementation

-- Enable RLS on multi-tenant tables
ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.orders ENABLE ROW LEVEL SECURITY;

-- Store manager sees only their store
CREATE POLICY store_manager_customers ON retail.customers
    FOR ALL TO store_managers
    USING (store_id = get_current_user_store());

-- Regional manager sees multiple stores
CREATE POLICY regional_manager_orders ON retail.orders
    FOR ALL TO regional_managers
    USING (store_id = ANY(get_user_store_list()));

-- Support function for RLS context
CREATE OR REPLACE FUNCTION get_current_user_store()
RETURNS UUID AS $$
BEGIN
    RETURN current_setting('app.current_rls_user_id')::UUID;
EXCEPTION WHEN OTHERS THEN
    RETURN '00000000-0000-0000-0000-000000000000'::UUID;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
RLS provides security by default — it is impossible to accidentally access data from the wrong store because the database enforces isolation at query time, regardless of the application-level query.

Vector search schema

-- Product embeddings for semantic search
CREATE TABLE retail.product_description_embeddings (
    product_id UUID PRIMARY KEY REFERENCES retail.products(product_id),
    description_embedding vector(1536),
    last_updated TIMESTAMP DEFAULT NOW()
);

-- Optimize vector similarity search
CREATE INDEX idx_product_embeddings_vector
ON retail.product_description_embeddings
USING ivfflat (description_embedding vector_cosine_ops);

-- Semantic search function
CREATE OR REPLACE FUNCTION search_products_by_description(
    query_embedding vector(1536),
    similarity_threshold FLOAT DEFAULT 0.7,
    max_results INTEGER DEFAULT 20
)
RETURNS TABLE(
    product_id UUID,
    name VARCHAR,
    description TEXT,
    similarity_score FLOAT
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        p.product_id,
        p.name,
        p.description,
        (1 - (pde.description_embedding <=> query_embedding)) AS similarity_score
    FROM retail.products p
    JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id
    WHERE (pde.description_embedding <=> query_embedding) <= (1 - similarity_threshold)
    ORDER BY similarity_score DESC
    LIMIT max_results;
END;
$$ LANGUAGE plpgsql;

Connection management patterns

Connection pool configuration

class ConnectionPoolManager:
    """Manages PostgreSQL connection pools."""

    async def create_pool(self) -> Pool:
        """Create optimized connection pool."""
        return await asyncpg.create_pool(
            host=self.config.db_host,
            port=self.config.db_port,
            database=self.config.db_name,
            user=self.config.db_user,
            password=self.config.db_password,

            min_size=2,          # Minimum connections kept alive
            max_size=10,         # Maximum concurrent connections
            max_inactive_connection_lifetime=300,  # 5 minutes

            command_timeout=30,
            server_settings={
                "application_name": "zava-mcp-server",
                "jit": "off",
                "work_mem": "4MB",
                "statement_timeout": "30s"
            }
        )

    async def execute_with_retry(
        self,
        query: str,
        params: Tuple = None,
        max_retries: int = 3
    ) -> List[Dict[str, Any]]:
        """Execute query with automatic retry and exponential backoff."""

        for attempt in range(max_retries):
            try:
                async with self.pool.acquire() as conn:
                    rows = await conn.fetch(query, *(params or ()))
                    return [dict(row) for row in rows]

            except (ConnectionError, InterfaceError) as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
                logger.warning(f"Retrying ({attempt + 1}/{max_retries})")

Resource lifecycle management

class MCPServerManager:
    """Manages MCP server lifecycle and resources."""

    async def startup(self):
        """Initialize server resources."""
        self.db_pool = await self.pool_manager.create_pool()
        self.ai_client = await self.create_ai_client()
        self.metrics_collector = MetricsCollector()
        logger.info("MCP server startup complete")

    async def shutdown(self):
        """Cleanup server resources."""
        try:
            if self.db_pool:
                await self.db_pool.close()
            if self.ai_client:
                await self.ai_client.close()
            await self.metrics_collector.flush()
            logger.info("MCP server shutdown complete")
        except Exception as e:
            logger.error(f"Error during shutdown: {e}")

    async def health_check(self) -> Dict[str, str]:
        """Verify server health status."""
        status = {}
        try:
            async with self.db_pool.acquire() as conn:
                await conn.fetchval("SELECT 1")
            status["database"] = "healthy"
        except Exception as e:
            status["database"] = f"unhealthy: {e}"
        return status

Error handling and resilience

Hierarchical error types

class MCPError(Exception):
    def __init__(self, message: str, error_code: str = "MCP_ERROR"):
        self.message = message
        self.error_code = error_code
        super().__init__(message)

class DatabaseError(MCPError):
    def __init__(self, message: str, query: str = None):
        super().__init__(message, "DATABASE_ERROR")
        self.query = query

class AuthorizationError(MCPError):
    def __init__(self, message: str, user_id: str = None):
        super().__init__(message, "AUTHORIZATION_ERROR")
        self.user_id = user_id

class QueryTimeoutError(DatabaseError):
    def __init__(self, query: str):
        super().__init__(f"Query timeout: {query[:100]}...", query)
        self.error_code = "QUERY_TIMEOUT"

class ValidationError(MCPError):
    def __init__(self, field: str, value: Any, constraint: str):
        message = f"Validation failed for {field}: {constraint}"
        super().__init__(message, "VALIDATION_ERROR")

Performance optimization

Query performance monitoring

class QueryPerformanceMonitor:
    """Monitor and optimize query performance."""

    def __init__(self):
        self.slow_query_threshold = 1.0  # seconds

    @contextmanager
    async def monitor_query(self, query: str, operation_type: str = "unknown"):
        """Monitor query execution time and log slow queries."""
        start_time = time.time()
        query_hash = hashlib.md5(query.encode()).hexdigest()[:8]

        try:
            yield
            duration = time.time() - start_time

            if duration > self.slow_query_threshold:
                logger.warning("Slow query detected", extra={
                    "query_hash": query_hash,
                    "duration": duration,
                    "operation_type": operation_type,
                    "query": query[:200]
                })

            metrics.query_duration.labels(type=operation_type).observe(duration)

        except Exception as e:
            logger.error("Query failed", extra={
                "query_hash": query_hash,
                "error": str(e)
            })
            raise

Caching strategy

class QueryCache:
    """Intelligent query result caching."""

    def __init__(self, redis_url: str = None):
        self.cache = {}
        self.redis_client = redis.Redis.from_url(redis_url) if redis_url else None
        self.cache_ttl = 300  # 5 minutes

    async def get_cached_result(
        self,
        cache_key: str,
        query_func: Callable,
        ttl: int = None
    ) -> Any:
        """Return cached result or execute query and cache the result."""
        ttl = ttl or self.cache_ttl

        cached_result = await self._get_from_cache(cache_key)
        if cached_result is not None:
            metrics.cache_hit.labels(type="query").inc()
            return cached_result

        metrics.cache_miss.labels(type="query").inc()
        result = await query_func()
        await self._set_in_cache(cache_key, result, ttl)
        return result

Key takeaways

  • Layered architecture separates protocol, business logic, data access, and infrastructure concerns
  • Store ID propagation in every table is the foundation of multi-tenant isolation
  • RLS provides security by default — the database enforces isolation automatically
  • Connection pooling with retry logic ensures resilience under load
  • Hierarchical error types enable precise handling and user-friendly messages
  • Query monitoring identifies slow operations before they become production incidents

Next: Lab 2 — Security & Multi-Tenancy

Implement enterprise-grade Row Level Security, Azure Entra ID authentication, and comprehensive audit logging.

Build docs developers (and LLMs) love