Skip to main content

Objectives

By the end of this lab you will be able to:
  • Implement comprehensive unit and integration test suites
  • Design effective testing strategies for MCP tools and database operations
  • Debug complex issues using advanced debugging techniques
  • Validate performance under load with realistic testing scenarios
  • Monitor production systems with effective alerting and observability
  • Automate testing workflows for continuous integration

Prerequisites

Testing architecture

Unit Tests
  • Tool execution logic
  • Database query validation
  • Authentication/authorization
  • Embedding generation

Integration Tests
  • End-to-end MCP workflows
  • Database schema validation
  • Multi-tool interactions

Performance Tests
  • Concurrent load testing
  • Database query benchmarks
  • Memory usage validation

E2E Tests
  • Complete user workflows
  • VS Code integration
  • Real-world scenarios

Step 1: Test configuration and fixtures

# tests/conftest.py
import pytest
import asyncio
import asyncpg
from typing import AsyncGenerator
from unittest.mock import AsyncMock
from datetime import datetime

TEST_DATABASE_URL = "postgresql://test_user:test_pass@localhost:5432/test_retail_db"
TEST_STORE_IDS = ['test_seattle', 'test_redmond', 'test_bellevue']

@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
async def test_database():
    """Create and populate the test database."""
    sys_conn = await asyncpg.connect(
        "postgresql://postgres:password@localhost:5432/postgres"
    )
    try:
        await sys_conn.execute("DROP DATABASE IF EXISTS test_retail_db")
        await sys_conn.execute("CREATE DATABASE test_retail_db")
    finally:
        await sys_conn.close()

    test_conn = await asyncpg.connect(TEST_DATABASE_URL)
    try:
        with open("../scripts/create_schema.sql") as f:
            await test_conn.execute(f.read())
        yield test_conn
    finally:
        await test_conn.close()

@pytest.fixture
async def db_connection(test_database):
    """Provide a transaction-isolated connection for each test."""
    conn = await asyncpg.connect(TEST_DATABASE_URL)
    tx = conn.transaction()
    await tx.start()
    try:
        yield conn
    finally:
        await tx.rollback()  # Undo all test data
        await conn.close()

@pytest.fixture
async def mock_embedding_manager():
    """Mock embedding manager — avoids real Azure OpenAI API calls."""
    mock = AsyncMock()
    mock.generate_embedding.return_value = [0.1] * 1536
    mock.generate_embeddings_batch.return_value = [[0.1] * 1536] * 10
    mock.deployment_name = 'text-embedding-3-small'
    return mock

class TestDataHelper:
    """Helper for creating isolated test fixtures."""

    @staticmethod
    async def create_test_store(conn, store_id: str):
        await conn.execute("""
            INSERT INTO retail.stores (store_id, store_name, store_location, store_type, region)
            VALUES ($1, $2, $3, $4, $5)
            ON CONFLICT (store_id) DO NOTHING
        """, store_id, f'Test Store {store_id}', 'Test Location', 'test', 'test')

    @staticmethod
    async def create_test_customer(conn, store_id: str) -> str:
        return await conn.fetchval("""
            INSERT INTO retail.customers (store_id, first_name, last_name, email, loyalty_tier)
            VALUES ($1, $2, $3, $4, $5)
            RETURNING customer_id
        """, store_id, 'Test', 'Customer', f'test_{store_id}@example.com', 'bronze')

Step 2: Unit tests for tools

# tests/test_tools.py
import pytest
from datetime import datetime, timedelta
from mcp_server.tools.sales_analysis import SalesAnalysisTool
from mcp_server.tools.semantic_search import SemanticProductSearchTool
from mcp_server.tools.schema_introspection import SchemaIntrospectionTool
from tests.conftest import TestDataHelper

class TestSalesAnalysisTool:

    @pytest.fixture
    async def sales_tool(self, test_mcp_server):
        return SalesAnalysisTool(test_mcp_server.db_provider)

    async def test_daily_sales_query(self, sales_tool, db_connection):
        """Verify daily sales template query returns expected fields."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)
        customer_id = await TestDataHelper.create_test_customer(db_connection, store_id)

        await db_connection.execute("""
            INSERT INTO retail.sales_transactions
                (store_id, customer_id, transaction_date, total_amount, subtotal, transaction_type)
            VALUES ($1, $2, $3, $4, $5, $6)
        """, store_id, customer_id, datetime.now(), 150.00, 150.00, 'sale')

        result = await sales_tool.execute(
            query_type='daily_sales',
            store_id=store_id,
            start_date=(datetime.now() - timedelta(days=7)).date(),
            end_date=datetime.now().date()
        )

        assert result.success is True
        assert len(result.data) > 0
        assert 'total_revenue' in result.data[0]

    async def test_custom_query_blocks_dangerous_sql(self, sales_tool, db_connection):
        """Verify that DROP TABLE is rejected by the query validator."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)

        result = await sales_tool.execute(
            query_type='custom',
            store_id=store_id,
            query="DROP TABLE retail.customers"
        )

        assert result.success is False
        assert 'validation failed' in result.error.lower()

    async def test_store_data_isolation(self, sales_tool, db_connection):
        """Each store context should only return that store's customers."""
        store1, store2 = 'test_store1', 'test_store2'

        await TestDataHelper.create_test_store(db_connection, store1)
        await TestDataHelper.create_test_store(db_connection, store2)
        await TestDataHelper.create_test_customer(db_connection, store1)
        await TestDataHelper.create_test_customer(db_connection, store2)

        result1 = await sales_tool.execute(
            query_type='custom', store_id=store1,
            query="SELECT COUNT(*) as count FROM retail.customers"
        )
        result2 = await sales_tool.execute(
            query_type='custom', store_id=store2,
            query="SELECT COUNT(*) as count FROM retail.customers"
        )

        assert result1.success is True
        assert result2.success is True
        assert result1.data[0]['count'] == 1
        assert result2.data[0]['count'] == 1

class TestSemanticSearchTool:

    async def test_missing_required_parameters(self, search_tool):
        """Missing query or store_id should return a clear error."""
        result = await search_tool.execute(store_id='test_store')
        assert result.success is False
        assert 'query is required' in result.error.lower()

        result = await search_tool.execute(query='test query')
        assert result.success is False
        assert 'store_id is required' in result.error.lower()

Step 3: Database RLS tests

# tests/test_database.py
class TestRowLevelSecurity:

    async def test_store_context_setting(self, db_connection):
        """Verify that set_store_context sets the correct session variable."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)
        await db_connection.execute("SELECT retail.set_store_context($1)", store_id)

        current_store = await db_connection.fetchval(
            "SELECT current_setting('app.current_store_id', true)"
        )
        assert current_store == store_id

    async def test_invalid_store_raises_error(self, db_connection):
        """set_store_context with a non-existent store must raise an exception."""
        with pytest.raises(Exception) as exc_info:
            await db_connection.execute("SELECT retail.set_store_context($1)", 'invalid_store')
        assert "Store not found" in str(exc_info.value)

    async def test_cross_store_insert_blocked(self, db_connection):
        """RLS must prevent inserting a customer for a different store."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)
        await db_connection.execute("SELECT retail.set_store_context($1)", store_id)

        with pytest.raises(Exception):
            await db_connection.execute("""
                INSERT INTO retail.customers (store_id, first_name, last_name, email)
                VALUES ($1, $2, $3, $4)
            """, 'different_store', 'Test', 'User', '[email protected]')

    async def test_sql_injection_prevention(self, test_mcp_server, db_connection):
        """Multi-statement injection must be blocked by the query validator."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)

        result = await test_mcp_server.execute_tool(
            'execute_sales_query',
            {
                'query_type': 'custom',
                'store_id': store_id,
                'query': "SELECT * FROM retail.customers; DROP TABLE retail.customers; --"
            }
        )
        assert result['success'] is False
        assert 'validation failed' in result['error'].lower()

Step 4: Integration tests

# tests/test_integration.py
class TestMCPWorkflows:

    async def test_schema_then_query_workflow(self, test_mcp_server, db_connection):
        """Verify the typical AI workflow: get schema, then execute query."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)

        # Step 1: AI fetches table schema
        schema_result = await test_mcp_server.execute_tool(
            'get_table_schema', {'table_name': 'products'}
        )
        assert schema_result['success'] is True
        assert schema_result['data']['table_name'] == 'products'

        # Step 2: AI uses schema to write a valid query
        query_result = await test_mcp_server.execute_tool(
            'execute_sales_query',
            {'query_type': 'custom', 'store_id': store_id,
             'query': 'SELECT COUNT(*) as product_count FROM retail.products'}
        )
        assert query_result['success'] is True

    async def test_multi_store_isolation(self, test_mcp_server, db_connection):
        """Each store must see exactly one customer when isolated test data exists."""
        stores = ['test_seattle', 'test_redmond', 'test_bellevue']

        for store_id in stores:
            await TestDataHelper.create_test_store(db_connection, store_id)
            await TestDataHelper.create_test_customer(db_connection, store_id)

        for store_id in stores:
            result = await test_mcp_server.execute_tool(
                'execute_sales_query',
                {'query_type': 'custom', 'store_id': store_id,
                 'query': 'SELECT COUNT(*) as customer_count FROM retail.customers'}
            )
            assert result['success'] is True
            assert result['data'][0]['customer_count'] == 1

Step 5: Performance tests

# tests/test_performance.py
import asyncio
import time
import statistics

class TestPerformance:

    async def test_concurrent_tool_execution(self, test_mcp_server, db_connection):
        """20 concurrent tool calls should all complete under 5 seconds each."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)

        async def execute_one():
            start = time.time()
            result = await test_mcp_server.execute_tool(
                'execute_sales_query',
                {'query_type': 'custom', 'store_id': store_id,
                 'query': 'SELECT COUNT(*) as count FROM retail.customers'}
            )
            return {'success': result['success'], 'execution_time': time.time() - start}

        results = await asyncio.gather(*[execute_one() for _ in range(20)])
        successful = [r for r in results if r['success']]
        times = [r['execution_time'] for r in successful]

        assert len(successful) == 20
        assert statistics.mean(times) < 1.0   # Average under 1 second
        assert max(times) < 5.0               # No individual call over 5 seconds

        print(f"Mean: {statistics.mean(times):.3f}s, Max: {max(times):.3f}s")

    async def test_query_performance_scaling(self, test_mcp_server, db_connection):
        """COUNT queries must stay under 1 second even with large datasets."""
        store_id = 'test_seattle'
        await TestDataHelper.create_test_store(db_connection, store_id)

        for _ in range(1000):
            await TestDataHelper.create_test_customer(db_connection, store_id)

        start = time.time()
        result = await test_mcp_server.execute_tool(
            'execute_sales_query',
            {'query_type': 'custom', 'store_id': store_id,
             'query': 'SELECT COUNT(*) FROM retail.customers'}
        )
        elapsed = time.time() - start

        assert result['success'] is True
        assert elapsed < 1.0

Step 6: Debugging tools

# mcp_server/debugging/debug_tools.py
import traceback
import time
from contextlib import asynccontextmanager

class MCPDebugger:
    """Comprehensive debugging utilities for MCP server."""

    def __init__(self, server_instance):
        self.server = server_instance
        self.debug_logs = []
        self.active_traces = {}

    @asynccontextmanager
    async def trace_execution(self, operation_name: str, context=None):
        """Trace an operation with timing and error capture."""
        trace_id = f"{operation_name}_{int(time.time() * 1000)}"
        start_time = time.time()
        trace_info = {
            'trace_id': trace_id,
            'operation': operation_name,
            'start_time': start_time,
            'context': context or {},
            'status': 'running'
        }
        self.active_traces[trace_id] = trace_info

        try:
            yield trace_info
            trace_info.update({
                'status': 'completed',
                'execution_time': time.time() - start_time
            })
        except Exception as e:
            trace_info.update({
                'status': 'error',
                'execution_time': time.time() - start_time,
                'error': str(e),
                'traceback': traceback.format_exc()
            })
            raise
        finally:
            self.debug_logs.append(trace_info.copy())
            del self.active_traces[trace_id]
            if len(self.debug_logs) > 1000:
                self.debug_logs = self.debug_logs[-500:]

    def get_debug_summary(self):
        recent = self.debug_logs[-50:]
        return {
            'total_operations': len(self.debug_logs),
            'active_traces': len(self.active_traces),
            'recent_operations': [
                {'operation': l['operation'], 'status': l['status'],
                 'execution_time': l.get('execution_time', 0)}
                for l in recent
            ]
        }

Running the test suite

# Install test dependencies
pip install pytest pytest-asyncio pytest-cov

# Run all tests
pytest tests/ -v

# Run with coverage
pytest tests/ --cov=mcp_server --cov-report=html

# Run only unit tests
pytest tests/test_tools.py tests/test_database.py -v

# Run performance tests (slow — excluded by default)
pytest tests/test_performance.py -v -m slow

Key takeaways

  • Transaction-scoped fixtures keep tests isolated — each test rolls back its own data
  • Mock embedding managers prevent real API calls during unit and integration tests
  • RLS tests must explicitly verify that cross-store data insertion and reads are blocked
  • Query validator tests should include both safe queries and known injection patterns
  • Performance tests measure concurrent execution to catch connection pool bottlenecks early
  • Debug traces capture execution time, errors, and context for post-mortem analysis

Next: Lab 9 — VS Code Integration

Configure VS Code MCP settings, use AI Chat for natural language database queries, and debug server connections.

Build docs developers (and LLMs) love