Skip to main content

Objectives

By the end of this lab you will be able to:
  • Implement enterprise-grade Row Level Security for multi-tenant data isolation
  • Design secure authentication and authorization patterns with Azure Entra ID
  • Configure comprehensive audit logging for compliance requirements
  • Apply defense-in-depth security strategies across all application layers
  • Validate security implementations through systematic testing
  • Monitor security events and respond to potential threats

Prerequisites

  • Completed Lab 1: Core Architecture
  • Basic understanding of JWT tokens and OAuth 2.0
  • Familiarity with PostgreSQL roles and permissions

Security architecture overview

The system implements defense-in-depth with multiple security layers:
┌─────────────────────────────────────────┐
│           Azure Front Door              │  ← WAF, DDoS Protection
├─────────────────────────────────────────┤
│          Application Gateway            │  ← SSL Termination, Rate Limiting
├─────────────────────────────────────────┤
│               MCP Server                │  ← Authentication, Authorization
│  ┌──────────────────────────────────────┤
│  │        Connection Layer              │  ← Connection Pooling, Circuit Breakers
│  ├──────────────────────────────────────┤
│  │      Business Logic Layer           │  ← Input Validation, Business Rules
│  ├──────────────────────────────────────┤
│  │        Data Access Layer            │  ← Query Sanitization, RLS Context
│  └──────────────────────────────────────┤
├─────────────────────────────────────────┤
│           PostgreSQL RLS               │  ← Row Level Security, Audit Triggers
└─────────────────────────────────────────┘
The shared database, shared schema multi-tenancy model is used throughout. Row Level Security enforces tenant isolation at the database level, so even if application code has a bug it cannot leak cross-tenant data.

Row Level Security implementation

Step 1: Enable RLS and create the application role

-- Enable RLS on all multi-tenant tables
ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.products ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.sales_transactions ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.sales_transaction_items ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.product_embeddings ENABLE ROW LEVEL SECURITY;

-- Create application role for MCP server
CREATE ROLE mcp_user LOGIN;
GRANT USAGE ON SCHEMA retail TO mcp_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA retail TO mcp_user;

Step 2: Create the store context function

This function validates the store exists and is active before setting the session variable that drives RLS.
CREATE OR REPLACE FUNCTION retail.set_store_context(store_id_param VARCHAR(50))
RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = retail, pg_temp
AS $$
DECLARE
    user_info RECORD;
BEGIN
    SELECT store_id, store_name, is_active
    INTO user_info
    FROM retail.stores
    WHERE store_id = store_id_param;

    IF NOT FOUND THEN
        RAISE EXCEPTION 'Store not found: %', store_id_param
            USING ERRCODE = 'invalid_parameter_value',
                  HINT = 'Verify store ID and ensure it exists in the system';
    END IF;

    IF NOT user_info.is_active THEN
        RAISE EXCEPTION 'Store is inactive: %', store_id_param
            USING ERRCODE = 'insufficient_privilege',
                  HINT = 'Contact administrator to activate store';
    END IF;

    -- Set the secure context for RLS
    PERFORM set_config('app.current_store_id', store_id_param, false);
    PERFORM set_config('app.store_name', user_info.store_name, false);
    PERFORM set_config('app.context_set_at', extract(epoch from current_timestamp)::text, false);

    -- Log context change for audit
    INSERT INTO retail.security_audit_log (
        event_type, user_name, store_id, ip_address, details, severity
    ) VALUES (
        'store_context_set',
        current_user,
        store_id_param,
        inet_client_addr()::text,
        jsonb_build_object('store_name', user_info.store_name, 'timestamp', current_timestamp),
        'INFO'
    );
END;
$$;

GRANT EXECUTE ON FUNCTION retail.set_store_context TO mcp_user;

Step 3: Create RLS policies for each table

-- Customers: store-level isolation
CREATE POLICY customers_store_isolation ON retail.customers
    FOR ALL TO mcp_user
    USING (
        store_id = current_setting('app.current_store_id', true)
        AND current_setting('app.current_store_id', true) IS NOT NULL
        AND current_setting('app.current_store_id', true) != ''
    )
    WITH CHECK (
        store_id = current_setting('app.current_store_id', true)
        AND current_setting('app.current_store_id', true) IS NOT NULL
        AND current_setting('app.current_store_id', true) != ''
    );

-- Products: store isolation + only active products readable
CREATE POLICY products_store_isolation ON retail.products
    FOR ALL TO mcp_user
    USING (
        store_id = current_setting('app.current_store_id', true)
        AND current_setting('app.current_store_id', true) IS NOT NULL
        AND current_setting('app.current_store_id', true) != ''
        AND is_active = TRUE
    );

-- Sales transactions: store-level isolation
CREATE POLICY sales_transactions_store_isolation ON retail.sales_transactions
    FOR ALL TO mcp_user
    USING (
        store_id = current_setting('app.current_store_id', true)
        AND current_setting('app.current_store_id', true) IS NOT NULL
        AND current_setting('app.current_store_id', true) != ''
    );

-- Transaction items: isolation via join with transactions
CREATE POLICY sales_transaction_items_store_isolation ON retail.sales_transaction_items
    FOR ALL TO mcp_user
    USING (
        transaction_id IN (
            SELECT transaction_id
            FROM retail.sales_transactions
            WHERE store_id = current_setting('app.current_store_id', true)
        )
    );

Step 4: Test RLS isolation

DO $$
DECLARE
    customer_count INTEGER;
    product_count INTEGER;
BEGIN
    -- Test Seattle store context
    PERFORM retail.set_store_context('seattle');

    SELECT COUNT(*) INTO customer_count FROM retail.customers;
    SELECT COUNT(*) INTO product_count FROM retail.products;
    RAISE NOTICE 'Seattle store - Customers: %, Products: %', customer_count, product_count;

    -- Test Redmond store context
    PERFORM retail.set_store_context('redmond');

    SELECT COUNT(*) INTO customer_count FROM retail.customers;
    SELECT COUNT(*) INTO product_count FROM retail.products;
    RAISE NOTICE 'Redmond store - Customers: %, Products: %', customer_count, product_count;

    IF customer_count > 0 AND product_count > 0 THEN
        RAISE NOTICE 'RLS policies are working correctly';
    ELSE
        RAISE WARNING 'RLS policies may not be configured correctly';
    END IF;
END;
$$;

Azure Entra ID authentication

Token validation

# mcp_server/security/authentication.py
import jwt
import aiohttp
from azure.identity.aio import DefaultAzureCredential
from azure.keyvault.secrets.aio import SecretClient

class AzureAuthenticator:
    """Handle Azure Entra ID authentication and token validation."""

    def __init__(self):
        self.tenant_id = os.getenv('AZURE_TENANT_ID')
        self.client_id = os.getenv('AZURE_CLIENT_ID')
        self.audience = os.getenv('AZURE_AUDIENCE', self.client_id)
        self.issuer = f"https://login.microsoftonline.com/{self.tenant_id}/v2.0"
        self._jwks_cache = None
        self._jwks_cache_expiry = None

    async def validate_token(self, token: str) -> Dict:
        """Validate JWT token from Azure Entra ID."""
        try:
            signing_keys = await self._get_signing_keys()
            unverified_header = jwt.get_unverified_header(token)
            key_id = unverified_header.get('kid')

            signing_key = next(
                (jwt.algorithms.RSAAlgorithm.from_jwk(k) for k in signing_keys if k['kid'] == key_id),
                None
            )
            if not signing_key:
                raise ValueError(f"Unable to find signing key for kid: {key_id}")

            payload = jwt.decode(
                token,
                signing_key,
                algorithms=['RS256'],
                audience=self.audience,
                issuer=self.issuer,
                options={'verify_exp': True, 'verify_aud': True, 'verify_iss': True}
            )
            return self._extract_user_info(payload)

        except jwt.ExpiredSignatureError:
            raise ValueError("Token has expired")
        except jwt.InvalidAudienceError:
            raise ValueError("Invalid token audience")
        except jwt.InvalidIssuerError:
            raise ValueError("Invalid token issuer")
        except Exception as e:
            raise ValueError(f"Token validation failed: {str(e)}")

    def _extract_user_info(self, payload: Dict) -> Dict:
        return {
            'user_id': payload.get('oid') or payload.get('sub'),
            'email': payload.get('email') or payload.get('preferred_username'),
            'name': payload.get('name'),
            'tenant_id': payload.get('tid'),
            'roles': payload.get('roles', []),
            'scope': payload.get('scp', '').split() if payload.get('scp') else [],
            'expires_at': datetime.fromtimestamp(payload['exp'], timezone.utc),
        }

Role-based authorization

# mcp_server/security/authorization.py
class RoleBasedAuth:
    """Role-based access control implementation."""

    ROLE_HIERARCHY = {
        'store_admin': ['store_manager', 'store_user', 'store_readonly'],
        'store_manager': ['store_user', 'store_readonly'],
        'store_user': ['store_readonly'],
        'store_readonly': []
    }

    ROLE_PERMISSIONS = {
        'store_admin': ['read_all', 'write_all', 'delete_all', 'manage_users'],
        'store_manager': ['read_all', 'write_transactions', 'write_inventory', 'read_reports'],
        'store_user': ['read_products', 'read_customers', 'write_transactions'],
        'store_readonly': ['read_products', 'read_basic_reports']
    }

    @classmethod
    def has_permission(cls, user_roles: List[str], required_permission: str) -> bool:
        """Check if user has required permission, including inherited permissions."""
        user_permissions = set()

        for role in user_roles:
            user_permissions.update(cls.ROLE_PERMISSIONS.get(role, []))
            for inherited_role in cls.ROLE_HIERARCHY.get(role, []):
                user_permissions.update(cls.ROLE_PERMISSIONS.get(inherited_role, []))

        return required_permission in user_permissions

Security audit logging

Audit log table

CREATE TABLE retail.security_audit_log (
    log_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    event_type VARCHAR(100) NOT NULL,
    user_name VARCHAR(100) NOT NULL,
    user_id VARCHAR(100),
    store_id VARCHAR(50),
    ip_address INET,
    action VARCHAR(50) NOT NULL,
    success BOOLEAN NOT NULL DEFAULT TRUE,
    failure_reason TEXT,
    details JSONB,
    severity VARCHAR(20) DEFAULT 'INFO',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT valid_severity CHECK (severity IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'))
);

-- Indexes for security query performance
CREATE INDEX idx_security_audit_event_type ON retail.security_audit_log(event_type);
CREATE INDEX idx_security_audit_user_name ON retail.security_audit_log(user_name);
CREATE INDEX idx_security_audit_store_id ON retail.security_audit_log(store_id);
CREATE INDEX idx_security_audit_created_at ON retail.security_audit_log(created_at);
CREATE INDEX idx_security_audit_success ON retail.security_audit_log(success);
CREATE INDEX idx_security_audit_details ON retail.security_audit_log USING GIN(details);

Security monitoring views

-- Failed authentication attempts (3+ failures in last 24h)
CREATE VIEW retail.security_failed_auth AS
SELECT
    event_type,
    user_name,
    ip_address,
    COUNT(*) as attempt_count,
    MIN(created_at) as first_attempt,
    MAX(created_at) as last_attempt,
    ARRAY_AGG(DISTINCT failure_reason) as failure_reasons
FROM retail.security_audit_log
WHERE success = FALSE
  AND event_type IN ('authentication_failed', 'token_validation_failed')
  AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY event_type, user_name, ip_address
HAVING COUNT(*) >= 3
ORDER BY attempt_count DESC;

-- Suspicious access patterns (multiple IPs or stores in 1 hour)
CREATE VIEW retail.security_suspicious_access AS
SELECT
    user_name,
    user_id,
    COUNT(DISTINCT ip_address) as ip_count,
    COUNT(DISTINCT store_id) as store_count,
    ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses,
    ARRAY_AGG(DISTINCT store_id) as stores_accessed
FROM retail.security_audit_log
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
  AND success = TRUE
GROUP BY user_name, user_id
HAVING COUNT(DISTINCT ip_address) > 3 OR COUNT(DISTINCT store_id) > 2;

Security monitoring in Python

# mcp_server/security/monitoring.py
class SecurityMonitor:
    """Monitor security events and generate alerts."""

    def __init__(self, db_connection_string: str):
        self.db_connection_string = db_connection_string
        self.thresholds = {
            'failed_auth_attempts': 5,
            'multiple_ip_access': 3,
            'excessive_data_access': 1000,
            'privilege_escalation': 1,
            'unauthorized_store_access': 1
        }

    async def start_monitoring(self):
        """Start security monitoring loop."""
        logger.info("Starting security monitoring")
        while True:
            try:
                await self._check_security_events()
                await asyncio.sleep(300)  # Check every 5 minutes
            except Exception as e:
                logger.error(f"Security monitoring error: {e}")
                await asyncio.sleep(60)

    async def _check_failed_auth(self, conn):
        """Check for excessive failed authentication attempts."""
        query = """
        SELECT user_name, ip_address, COUNT(*) as attempt_count
        FROM retail.security_audit_log
        WHERE success = FALSE
          AND event_type IN ('authentication_failed', 'token_validation_failed')
          AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
        GROUP BY user_name, ip_address
        HAVING COUNT(*) >= $1
        """
        results = await conn.fetch(query, self.thresholds['failed_auth_attempts'])
        for record in results:
            await self._send_alert(SecurityAlert(
                alert_type='failed_authentication',
                severity='HIGH',
                message=f"Excessive failed logins for {record['user_name']}",
                details=dict(record),
                timestamp=datetime.now()
            ))

Automated security tests

# tests/security/test_security.py
class TestRowLevelSecurity:

    async def test_store_context_isolation(self, db_connection):
        """Test that RLS properly isolates data by store."""
        await db_connection.execute("SELECT retail.set_store_context('seattle')")
        seattle_customers = await db_connection.fetchval(
            "SELECT COUNT(*) FROM retail.customers"
        )

        await db_connection.execute("SELECT retail.set_store_context('redmond')")
        redmond_customers = await db_connection.fetchval(
            "SELECT COUNT(*) FROM retail.customers"
        )

        # Counts must differ (each store sees only its own data)
        assert seattle_customers != redmond_customers or (
            seattle_customers == 0 and redmond_customers == 0
        )

    async def test_unauthorized_store_access(self, db_connection):
        """Test that invalid store access is blocked."""
        with pytest.raises(Exception) as exc_info:
            await db_connection.execute("SELECT retail.set_store_context('invalid_store')")
        assert "Store not found" in str(exc_info.value)

    async def test_cross_store_data_leakage(self, db_connection):
        """Test that users cannot insert data for other stores."""
        await db_connection.execute("SELECT retail.set_store_context('seattle')")
        with pytest.raises(Exception):
            await db_connection.execute("""
                INSERT INTO retail.customers (store_id, first_name, last_name, email)
                VALUES ('redmond', 'Test', 'User', '[email protected]')
            """)

Penetration testing checklist

# security-test-checklist.yml
penetration_testing:

  authentication_bypass:
    tests:
      - "Missing Authorization header"
      - "Malformed JWT tokens"
      - "Replay attack with expired tokens"
      - "Token signature manipulation"
      - "Audience/issuer manipulation"

  authorization_escalation:
    tests:
      - "Role manipulation in token"
      - "Store access boundary testing"
      - "Cross-tenant data access attempts"
      - "Administrative function access"

  sql_injection:
    tests:
      - "Parameter injection in search queries"
      - "Store ID manipulation"
      - "JSON parameter injection"
      - "Union-based injection attempts"

  rate_limiting:
    tests:
      - "Authentication endpoint flooding"
      - "API endpoint rate limits"
      - "Resource exhaustion attempts"
      - "Connection pool exhaustion"

Key takeaways

  • Shared schema + RLS provides cost-effective multi-tenancy with strong isolation
  • SECURITY DEFINER functions validate context before setting session variables
  • Azure Entra ID with JWKS caching provides scalable token validation
  • Role hierarchy with inherited permissions simplifies access management
  • Audit logging at the database level captures all security-relevant events
  • Automated tests should verify both positive access and cross-store leakage prevention

Next: Lab 3 — Environment Setup

Configure the development environment, deploy Azure resources, and start Docker containers.

Build docs developers (and LLMs) love