Objectives
By the end of this lab you will be able to:
Analyze the layered architecture of an MCP server with database integration
Understand the role and responsibilities of each architectural component
Design database schemas that support multi-tenant MCP applications
Implement connection pooling and resource management strategies
Apply error handling and logging patterns for production systems
Evaluate trade-offs between different architectural approaches
Prerequisites
Completed Lab 0: Introduction
Basic Python knowledge
Familiarity with SQL and relational databases
MCP server architecture layers
The server implements a layered architecture that separates concerns and promotes maintainability.
Layer 1: Protocol layer (FastMCP)
Handles MCP protocol communication and message routing.
from fastmcp import FastMCP
mcp = FastMCP( "Zava Retail Analytics" )
# Tool registration with type safety
@mcp.tool ()
async def execute_sales_query (
ctx : Context,
postgresql_query : Annotated[ str , Field( description = "Well-formed PostgreSQL query" )]
) -> str :
"""Execute PostgreSQL queries with Row Level Security."""
return await query_executor.execute(postgresql_query, ctx)
Key features: protocol compliance, Pydantic type safety, async support, standardized errors.
Layer 2: Business logic layer
Implements business rules and coordinates between protocol and data layers.
class SalesAnalyticsService :
"""Business logic for retail analytics operations."""
async def get_store_performance (
self ,
store_id : str ,
time_period : str
) -> Dict[ str , Any]:
"""Calculate store performance metrics."""
if not self ._validate_store_access(store_id):
raise UnauthorizedError( "Access denied for store" )
sales_data = await self .db_provider.get_sales_data(store_id, time_period)
metrics = self ._calculate_metrics(sales_data)
return {
"store_id" : store_id,
"period" : time_period,
"metrics" : metrics,
"insights" : self ._generate_insights(metrics)
}
Layer 3: Data access layer
Manages database connections, query execution, and data mapping.
class PostgreSQLProvider :
"""Data access layer for PostgreSQL operations."""
async def execute_query (
self ,
query : str ,
rls_user_id : str
) -> List[Dict[ str , Any]]:
"""Execute query with RLS context."""
async with self .connection_pool.acquire() as conn:
# Set RLS context before query execution
await conn.execute(
"SELECT set_config('app.current_rls_user_id', $1, false)" ,
rls_user_id
)
try :
rows = await asyncio.wait_for(
conn.fetch(query),
timeout = 30.0
)
return [ dict (row) for row in rows]
except asyncio.TimeoutError:
raise QueryTimeoutError( "Query execution exceeded timeout" )
Layer 4: Infrastructure layer
Handles cross-cutting concerns like logging, monitoring, and configuration.
class InfrastructureManager :
"""Infrastructure concerns management."""
def __init__ ( self ):
self .logger = self ._setup_logging()
self .metrics = self ._setup_metrics()
self .config = self ._load_configuration()
def _setup_logging ( self ) -> Logger:
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s ' ,
handlers = [
logging.StreamHandler(),
logging.FileHandler( 'mcp_server.log' )
]
)
return logging.getLogger( __name__ )
Database design patterns
Multi-tenant schema design
Every transactional table includes store_id to support RLS-based isolation:
CREATE TABLE retail .stores (
store_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR ( 100 ) NOT NULL ,
location VARCHAR ( 200 ) NOT NULL ,
manager_id UUID NOT NULL ,
created_at TIMESTAMP DEFAULT NOW ()
);
CREATE TABLE retail .customers (
customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
store_id UUID REFERENCES retail . stores (store_id),
first_name VARCHAR ( 50 ) NOT NULL ,
last_name VARCHAR ( 50 ) NOT NULL ,
email VARCHAR ( 100 ) UNIQUE ,
created_at TIMESTAMP DEFAULT NOW ()
);
CREATE TABLE retail .orders (
order_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID REFERENCES retail . customers (customer_id),
store_id UUID REFERENCES retail . stores (store_id),
order_date TIMESTAMP DEFAULT NOW (),
total_amount DECIMAL ( 10 , 2 ) NOT NULL ,
status VARCHAR ( 20 ) DEFAULT 'pending'
);
Design principles:
Foreign key consistency for data integrity across tables
store_id propagated to every transactional table
UUID primary keys for globally unique identifiers in distributed systems
Timestamp tracking for audit trails
Row Level Security implementation
-- Enable RLS on multi-tenant tables
ALTER TABLE retail . customers ENABLE ROW LEVEL SECURITY ;
ALTER TABLE retail . orders ENABLE ROW LEVEL SECURITY ;
-- Store manager sees only their store
CREATE POLICY store_manager_customers ON retail . customers
FOR ALL TO store_managers
USING (store_id = get_current_user_store());
-- Regional manager sees multiple stores
CREATE POLICY regional_manager_orders ON retail . orders
FOR ALL TO regional_managers
USING (store_id = ANY(get_user_store_list()));
-- Support function for RLS context
CREATE OR REPLACE FUNCTION get_current_user_store ()
RETURNS UUID AS $$
BEGIN
RETURN current_setting( 'app.current_rls_user_id' )::UUID;
EXCEPTION WHEN OTHERS THEN
RETURN '00000000-0000-0000-0000-000000000000' ::UUID;
END ;
$$ LANGUAGE plpgsql SECURITY DEFINER;
RLS provides security by default — it is impossible to accidentally access data from the wrong store because the database enforces isolation at query time, regardless of the application-level query.
Vector search schema
-- Product embeddings for semantic search
CREATE TABLE retail .product_description_embeddings (
product_id UUID PRIMARY KEY REFERENCES retail . products (product_id),
description_embedding vector ( 1536 ),
last_updated TIMESTAMP DEFAULT NOW ()
);
-- Optimize vector similarity search
CREATE INDEX idx_product_embeddings_vector
ON retail . product_description_embeddings
USING ivfflat (description_embedding vector_cosine_ops);
-- Semantic search function
CREATE OR REPLACE FUNCTION search_products_by_description (
query_embedding vector ( 1536 ),
similarity_threshold FLOAT DEFAULT 0 . 7 ,
max_results INTEGER DEFAULT 20
)
RETURNS TABLE (
product_id UUID,
name VARCHAR ,
description TEXT ,
similarity_score FLOAT
) AS $$
BEGIN
RETURN QUERY
SELECT
p . product_id ,
p . name ,
p . description ,
( 1 - ( pde . description_embedding <=> query_embedding)) AS similarity_score
FROM retail . products p
JOIN retail . product_description_embeddings pde ON p . product_id = pde . product_id
WHERE ( pde . description_embedding <=> query_embedding) <= ( 1 - similarity_threshold)
ORDER BY similarity_score DESC
LIMIT max_results;
END ;
$$ LANGUAGE plpgsql;
Connection management patterns
Connection pool configuration
class ConnectionPoolManager :
"""Manages PostgreSQL connection pools."""
async def create_pool ( self ) -> Pool:
"""Create optimized connection pool."""
return await asyncpg.create_pool(
host = self .config.db_host,
port = self .config.db_port,
database = self .config.db_name,
user = self .config.db_user,
password = self .config.db_password,
min_size = 2 , # Minimum connections kept alive
max_size = 10 , # Maximum concurrent connections
max_inactive_connection_lifetime = 300 , # 5 minutes
command_timeout = 30 ,
server_settings = {
"application_name" : "zava-mcp-server" ,
"jit" : "off" ,
"work_mem" : "4MB" ,
"statement_timeout" : "30s"
}
)
async def execute_with_retry (
self ,
query : str ,
params : Tuple = None ,
max_retries : int = 3
) -> List[Dict[ str , Any]]:
"""Execute query with automatic retry and exponential backoff."""
for attempt in range (max_retries):
try :
async with self .pool.acquire() as conn:
rows = await conn.fetch(query, * (params or ()))
return [ dict (row) for row in rows]
except ( ConnectionError , InterfaceError) as e:
if attempt == max_retries - 1 :
raise
await asyncio.sleep( 2 ** attempt)
logger.warning( f "Retrying ( { attempt + 1 } / { max_retries } )" )
Resource lifecycle management
class MCPServerManager :
"""Manages MCP server lifecycle and resources."""
async def startup ( self ):
"""Initialize server resources."""
self .db_pool = await self .pool_manager.create_pool()
self .ai_client = await self .create_ai_client()
self .metrics_collector = MetricsCollector()
logger.info( "MCP server startup complete" )
async def shutdown ( self ):
"""Cleanup server resources."""
try :
if self .db_pool:
await self .db_pool.close()
if self .ai_client:
await self .ai_client.close()
await self .metrics_collector.flush()
logger.info( "MCP server shutdown complete" )
except Exception as e:
logger.error( f "Error during shutdown: { e } " )
async def health_check ( self ) -> Dict[ str , str ]:
"""Verify server health status."""
status = {}
try :
async with self .db_pool.acquire() as conn:
await conn.fetchval( "SELECT 1" )
status[ "database" ] = "healthy"
except Exception as e:
status[ "database" ] = f "unhealthy: { e } "
return status
Error handling and resilience
Hierarchical error types
class MCPError ( Exception ):
def __init__ ( self , message : str , error_code : str = "MCP_ERROR" ):
self .message = message
self .error_code = error_code
super (). __init__ (message)
class DatabaseError ( MCPError ):
def __init__ ( self , message : str , query : str = None ):
super (). __init__ (message, "DATABASE_ERROR" )
self .query = query
class AuthorizationError ( MCPError ):
def __init__ ( self , message : str , user_id : str = None ):
super (). __init__ (message, "AUTHORIZATION_ERROR" )
self .user_id = user_id
class QueryTimeoutError ( DatabaseError ):
def __init__ ( self , query : str ):
super (). __init__ ( f "Query timeout: { query[: 100 ] } ..." , query)
self .error_code = "QUERY_TIMEOUT"
class ValidationError ( MCPError ):
def __init__ ( self , field : str , value : Any, constraint : str ):
message = f "Validation failed for { field } : { constraint } "
super (). __init__ (message, "VALIDATION_ERROR" )
class QueryPerformanceMonitor :
"""Monitor and optimize query performance."""
def __init__ ( self ):
self .slow_query_threshold = 1.0 # seconds
@contextmanager
async def monitor_query ( self , query : str , operation_type : str = "unknown" ):
"""Monitor query execution time and log slow queries."""
start_time = time.time()
query_hash = hashlib.md5(query.encode()).hexdigest()[: 8 ]
try :
yield
duration = time.time() - start_time
if duration > self .slow_query_threshold:
logger.warning( "Slow query detected" , extra = {
"query_hash" : query_hash,
"duration" : duration,
"operation_type" : operation_type,
"query" : query[: 200 ]
})
metrics.query_duration.labels( type = operation_type).observe(duration)
except Exception as e:
logger.error( "Query failed" , extra = {
"query_hash" : query_hash,
"error" : str (e)
})
raise
Caching strategy
class QueryCache :
"""Intelligent query result caching."""
def __init__ ( self , redis_url : str = None ):
self .cache = {}
self .redis_client = redis.Redis.from_url(redis_url) if redis_url else None
self .cache_ttl = 300 # 5 minutes
async def get_cached_result (
self ,
cache_key : str ,
query_func : Callable,
ttl : int = None
) -> Any:
"""Return cached result or execute query and cache the result."""
ttl = ttl or self .cache_ttl
cached_result = await self ._get_from_cache(cache_key)
if cached_result is not None :
metrics.cache_hit.labels( type = "query" ).inc()
return cached_result
metrics.cache_miss.labels( type = "query" ).inc()
result = await query_func()
await self ._set_in_cache(cache_key, result, ttl)
return result
Key takeaways
Layered architecture separates protocol, business logic, data access, and infrastructure concerns
Store ID propagation in every table is the foundation of multi-tenant isolation
RLS provides security by default — the database enforces isolation automatically
Connection pooling with retry logic ensures resilience under load
Hierarchical error types enable precise handling and user-friendly messages
Query monitoring identifies slow operations before they become production incidents
Next: Lab 2 — Security & Multi-Tenancy Implement enterprise-grade Row Level Security, Azure Entra ID authentication, and comprehensive audit logging.