Objectives
By the end of this lab you will be able to:- Implement comprehensive unit and integration test suites
- Design effective testing strategies for MCP tools and database operations
- Debug complex issues using advanced debugging techniques
- Validate performance under load with realistic testing scenarios
- Monitor production systems with effective alerting and observability
- Automate testing workflows for continuous integration
Prerequisites
- Completed Lab 7: Semantic Search
- Running development environment with sample data
Testing architecture
Unit Tests
• Tool execution logic
• Database query validation
• Authentication/authorization
• Embedding generation
│
Integration Tests
• End-to-end MCP workflows
• Database schema validation
• Multi-tool interactions
│
Performance Tests
• Concurrent load testing
• Database query benchmarks
• Memory usage validation
│
E2E Tests
• Complete user workflows
• VS Code integration
• Real-world scenarios
Step 1: Test configuration and fixtures
# tests/conftest.py
import pytest
import asyncio
import asyncpg
from typing import AsyncGenerator
from unittest.mock import AsyncMock
from datetime import datetime
TEST_DATABASE_URL = "postgresql://test_user:test_pass@localhost:5432/test_retail_db"
TEST_STORE_IDS = ['test_seattle', 'test_redmond', 'test_bellevue']
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_database():
"""Create and populate the test database."""
sys_conn = await asyncpg.connect(
"postgresql://postgres:password@localhost:5432/postgres"
)
try:
await sys_conn.execute("DROP DATABASE IF EXISTS test_retail_db")
await sys_conn.execute("CREATE DATABASE test_retail_db")
finally:
await sys_conn.close()
test_conn = await asyncpg.connect(TEST_DATABASE_URL)
try:
with open("../scripts/create_schema.sql") as f:
await test_conn.execute(f.read())
yield test_conn
finally:
await test_conn.close()
@pytest.fixture
async def db_connection(test_database):
"""Provide a transaction-isolated connection for each test."""
conn = await asyncpg.connect(TEST_DATABASE_URL)
tx = conn.transaction()
await tx.start()
try:
yield conn
finally:
await tx.rollback() # Undo all test data
await conn.close()
@pytest.fixture
async def mock_embedding_manager():
"""Mock embedding manager — avoids real Azure OpenAI API calls."""
mock = AsyncMock()
mock.generate_embedding.return_value = [0.1] * 1536
mock.generate_embeddings_batch.return_value = [[0.1] * 1536] * 10
mock.deployment_name = 'text-embedding-3-small'
return mock
class TestDataHelper:
"""Helper for creating isolated test fixtures."""
@staticmethod
async def create_test_store(conn, store_id: str):
await conn.execute("""
INSERT INTO retail.stores (store_id, store_name, store_location, store_type, region)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (store_id) DO NOTHING
""", store_id, f'Test Store {store_id}', 'Test Location', 'test', 'test')
@staticmethod
async def create_test_customer(conn, store_id: str) -> str:
return await conn.fetchval("""
INSERT INTO retail.customers (store_id, first_name, last_name, email, loyalty_tier)
VALUES ($1, $2, $3, $4, $5)
RETURNING customer_id
""", store_id, 'Test', 'Customer', f'test_{store_id}@example.com', 'bronze')
Step 2: Unit tests for tools
# tests/test_tools.py
import pytest
from datetime import datetime, timedelta
from mcp_server.tools.sales_analysis import SalesAnalysisTool
from mcp_server.tools.semantic_search import SemanticProductSearchTool
from mcp_server.tools.schema_introspection import SchemaIntrospectionTool
from tests.conftest import TestDataHelper
class TestSalesAnalysisTool:
@pytest.fixture
async def sales_tool(self, test_mcp_server):
return SalesAnalysisTool(test_mcp_server.db_provider)
async def test_daily_sales_query(self, sales_tool, db_connection):
"""Verify daily sales template query returns expected fields."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
customer_id = await TestDataHelper.create_test_customer(db_connection, store_id)
await db_connection.execute("""
INSERT INTO retail.sales_transactions
(store_id, customer_id, transaction_date, total_amount, subtotal, transaction_type)
VALUES ($1, $2, $3, $4, $5, $6)
""", store_id, customer_id, datetime.now(), 150.00, 150.00, 'sale')
result = await sales_tool.execute(
query_type='daily_sales',
store_id=store_id,
start_date=(datetime.now() - timedelta(days=7)).date(),
end_date=datetime.now().date()
)
assert result.success is True
assert len(result.data) > 0
assert 'total_revenue' in result.data[0]
async def test_custom_query_blocks_dangerous_sql(self, sales_tool, db_connection):
"""Verify that DROP TABLE is rejected by the query validator."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
result = await sales_tool.execute(
query_type='custom',
store_id=store_id,
query="DROP TABLE retail.customers"
)
assert result.success is False
assert 'validation failed' in result.error.lower()
async def test_store_data_isolation(self, sales_tool, db_connection):
"""Each store context should only return that store's customers."""
store1, store2 = 'test_store1', 'test_store2'
await TestDataHelper.create_test_store(db_connection, store1)
await TestDataHelper.create_test_store(db_connection, store2)
await TestDataHelper.create_test_customer(db_connection, store1)
await TestDataHelper.create_test_customer(db_connection, store2)
result1 = await sales_tool.execute(
query_type='custom', store_id=store1,
query="SELECT COUNT(*) as count FROM retail.customers"
)
result2 = await sales_tool.execute(
query_type='custom', store_id=store2,
query="SELECT COUNT(*) as count FROM retail.customers"
)
assert result1.success is True
assert result2.success is True
assert result1.data[0]['count'] == 1
assert result2.data[0]['count'] == 1
class TestSemanticSearchTool:
async def test_missing_required_parameters(self, search_tool):
"""Missing query or store_id should return a clear error."""
result = await search_tool.execute(store_id='test_store')
assert result.success is False
assert 'query is required' in result.error.lower()
result = await search_tool.execute(query='test query')
assert result.success is False
assert 'store_id is required' in result.error.lower()
Step 3: Database RLS tests
# tests/test_database.py
class TestRowLevelSecurity:
async def test_store_context_setting(self, db_connection):
"""Verify that set_store_context sets the correct session variable."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
await db_connection.execute("SELECT retail.set_store_context($1)", store_id)
current_store = await db_connection.fetchval(
"SELECT current_setting('app.current_store_id', true)"
)
assert current_store == store_id
async def test_invalid_store_raises_error(self, db_connection):
"""set_store_context with a non-existent store must raise an exception."""
with pytest.raises(Exception) as exc_info:
await db_connection.execute("SELECT retail.set_store_context($1)", 'invalid_store')
assert "Store not found" in str(exc_info.value)
async def test_cross_store_insert_blocked(self, db_connection):
"""RLS must prevent inserting a customer for a different store."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
await db_connection.execute("SELECT retail.set_store_context($1)", store_id)
with pytest.raises(Exception):
await db_connection.execute("""
INSERT INTO retail.customers (store_id, first_name, last_name, email)
VALUES ($1, $2, $3, $4)
""", 'different_store', 'Test', 'User', '[email protected]')
async def test_sql_injection_prevention(self, test_mcp_server, db_connection):
"""Multi-statement injection must be blocked by the query validator."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
result = await test_mcp_server.execute_tool(
'execute_sales_query',
{
'query_type': 'custom',
'store_id': store_id,
'query': "SELECT * FROM retail.customers; DROP TABLE retail.customers; --"
}
)
assert result['success'] is False
assert 'validation failed' in result['error'].lower()
Step 4: Integration tests
# tests/test_integration.py
class TestMCPWorkflows:
async def test_schema_then_query_workflow(self, test_mcp_server, db_connection):
"""Verify the typical AI workflow: get schema, then execute query."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
# Step 1: AI fetches table schema
schema_result = await test_mcp_server.execute_tool(
'get_table_schema', {'table_name': 'products'}
)
assert schema_result['success'] is True
assert schema_result['data']['table_name'] == 'products'
# Step 2: AI uses schema to write a valid query
query_result = await test_mcp_server.execute_tool(
'execute_sales_query',
{'query_type': 'custom', 'store_id': store_id,
'query': 'SELECT COUNT(*) as product_count FROM retail.products'}
)
assert query_result['success'] is True
async def test_multi_store_isolation(self, test_mcp_server, db_connection):
"""Each store must see exactly one customer when isolated test data exists."""
stores = ['test_seattle', 'test_redmond', 'test_bellevue']
for store_id in stores:
await TestDataHelper.create_test_store(db_connection, store_id)
await TestDataHelper.create_test_customer(db_connection, store_id)
for store_id in stores:
result = await test_mcp_server.execute_tool(
'execute_sales_query',
{'query_type': 'custom', 'store_id': store_id,
'query': 'SELECT COUNT(*) as customer_count FROM retail.customers'}
)
assert result['success'] is True
assert result['data'][0]['customer_count'] == 1
Step 5: Performance tests
# tests/test_performance.py
import asyncio
import time
import statistics
class TestPerformance:
async def test_concurrent_tool_execution(self, test_mcp_server, db_connection):
"""20 concurrent tool calls should all complete under 5 seconds each."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
async def execute_one():
start = time.time()
result = await test_mcp_server.execute_tool(
'execute_sales_query',
{'query_type': 'custom', 'store_id': store_id,
'query': 'SELECT COUNT(*) as count FROM retail.customers'}
)
return {'success': result['success'], 'execution_time': time.time() - start}
results = await asyncio.gather(*[execute_one() for _ in range(20)])
successful = [r for r in results if r['success']]
times = [r['execution_time'] for r in successful]
assert len(successful) == 20
assert statistics.mean(times) < 1.0 # Average under 1 second
assert max(times) < 5.0 # No individual call over 5 seconds
print(f"Mean: {statistics.mean(times):.3f}s, Max: {max(times):.3f}s")
async def test_query_performance_scaling(self, test_mcp_server, db_connection):
"""COUNT queries must stay under 1 second even with large datasets."""
store_id = 'test_seattle'
await TestDataHelper.create_test_store(db_connection, store_id)
for _ in range(1000):
await TestDataHelper.create_test_customer(db_connection, store_id)
start = time.time()
result = await test_mcp_server.execute_tool(
'execute_sales_query',
{'query_type': 'custom', 'store_id': store_id,
'query': 'SELECT COUNT(*) FROM retail.customers'}
)
elapsed = time.time() - start
assert result['success'] is True
assert elapsed < 1.0
Step 6: Debugging tools
# mcp_server/debugging/debug_tools.py
import traceback
import time
from contextlib import asynccontextmanager
class MCPDebugger:
"""Comprehensive debugging utilities for MCP server."""
def __init__(self, server_instance):
self.server = server_instance
self.debug_logs = []
self.active_traces = {}
@asynccontextmanager
async def trace_execution(self, operation_name: str, context=None):
"""Trace an operation with timing and error capture."""
trace_id = f"{operation_name}_{int(time.time() * 1000)}"
start_time = time.time()
trace_info = {
'trace_id': trace_id,
'operation': operation_name,
'start_time': start_time,
'context': context or {},
'status': 'running'
}
self.active_traces[trace_id] = trace_info
try:
yield trace_info
trace_info.update({
'status': 'completed',
'execution_time': time.time() - start_time
})
except Exception as e:
trace_info.update({
'status': 'error',
'execution_time': time.time() - start_time,
'error': str(e),
'traceback': traceback.format_exc()
})
raise
finally:
self.debug_logs.append(trace_info.copy())
del self.active_traces[trace_id]
if len(self.debug_logs) > 1000:
self.debug_logs = self.debug_logs[-500:]
def get_debug_summary(self):
recent = self.debug_logs[-50:]
return {
'total_operations': len(self.debug_logs),
'active_traces': len(self.active_traces),
'recent_operations': [
{'operation': l['operation'], 'status': l['status'],
'execution_time': l.get('execution_time', 0)}
for l in recent
]
}
Running the test suite
# Install test dependencies
pip install pytest pytest-asyncio pytest-cov
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=mcp_server --cov-report=html
# Run only unit tests
pytest tests/test_tools.py tests/test_database.py -v
# Run performance tests (slow — excluded by default)
pytest tests/test_performance.py -v -m slow
Key takeaways
- Transaction-scoped fixtures keep tests isolated — each test rolls back its own data
- Mock embedding managers prevent real API calls during unit and integration tests
- RLS tests must explicitly verify that cross-store data insertion and reads are blocked
- Query validator tests should include both safe queries and known injection patterns
- Performance tests measure concurrent execution to catch connection pool bottlenecks early
- Debug traces capture execution time, errors, and context for post-mortem analysis
Next: Lab 9 — VS Code Integration
Configure VS Code MCP settings, use AI Chat for natural language database queries, and debug server connections.