Objectives
By the end of this lab you will be able to:- Implement enterprise-grade Row Level Security for multi-tenant data isolation
- Design secure authentication and authorization patterns with Azure Entra ID
- Configure comprehensive audit logging for compliance requirements
- Apply defense-in-depth security strategies across all application layers
- Validate security implementations through systematic testing
- Monitor security events and respond to potential threats
Prerequisites
- Completed Lab 1: Core Architecture
- Basic understanding of JWT tokens and OAuth 2.0
- Familiarity with PostgreSQL roles and permissions
Security architecture overview
The system implements defense-in-depth with multiple security layers:┌─────────────────────────────────────────┐
│ Azure Front Door │ ← WAF, DDoS Protection
├─────────────────────────────────────────┤
│ Application Gateway │ ← SSL Termination, Rate Limiting
├─────────────────────────────────────────┤
│ MCP Server │ ← Authentication, Authorization
│ ┌──────────────────────────────────────┤
│ │ Connection Layer │ ← Connection Pooling, Circuit Breakers
│ ├──────────────────────────────────────┤
│ │ Business Logic Layer │ ← Input Validation, Business Rules
│ ├──────────────────────────────────────┤
│ │ Data Access Layer │ ← Query Sanitization, RLS Context
│ └──────────────────────────────────────┤
├─────────────────────────────────────────┤
│ PostgreSQL RLS │ ← Row Level Security, Audit Triggers
└─────────────────────────────────────────┘
The shared database, shared schema multi-tenancy model is used throughout. Row Level Security enforces tenant isolation at the database level, so even if application code has a bug it cannot leak cross-tenant data.
Row Level Security implementation
Step 1: Enable RLS and create the application role
-- Enable RLS on all multi-tenant tables
ALTER TABLE retail.customers ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.products ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.sales_transactions ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.sales_transaction_items ENABLE ROW LEVEL SECURITY;
ALTER TABLE retail.product_embeddings ENABLE ROW LEVEL SECURITY;
-- Create application role for MCP server
CREATE ROLE mcp_user LOGIN;
GRANT USAGE ON SCHEMA retail TO mcp_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA retail TO mcp_user;
Step 2: Create the store context function
This function validates the store exists and is active before setting the session variable that drives RLS.CREATE OR REPLACE FUNCTION retail.set_store_context(store_id_param VARCHAR(50))
RETURNS void
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = retail, pg_temp
AS $$
DECLARE
user_info RECORD;
BEGIN
SELECT store_id, store_name, is_active
INTO user_info
FROM retail.stores
WHERE store_id = store_id_param;
IF NOT FOUND THEN
RAISE EXCEPTION 'Store not found: %', store_id_param
USING ERRCODE = 'invalid_parameter_value',
HINT = 'Verify store ID and ensure it exists in the system';
END IF;
IF NOT user_info.is_active THEN
RAISE EXCEPTION 'Store is inactive: %', store_id_param
USING ERRCODE = 'insufficient_privilege',
HINT = 'Contact administrator to activate store';
END IF;
-- Set the secure context for RLS
PERFORM set_config('app.current_store_id', store_id_param, false);
PERFORM set_config('app.store_name', user_info.store_name, false);
PERFORM set_config('app.context_set_at', extract(epoch from current_timestamp)::text, false);
-- Log context change for audit
INSERT INTO retail.security_audit_log (
event_type, user_name, store_id, ip_address, details, severity
) VALUES (
'store_context_set',
current_user,
store_id_param,
inet_client_addr()::text,
jsonb_build_object('store_name', user_info.store_name, 'timestamp', current_timestamp),
'INFO'
);
END;
$$;
GRANT EXECUTE ON FUNCTION retail.set_store_context TO mcp_user;
Step 3: Create RLS policies for each table
-- Customers: store-level isolation
CREATE POLICY customers_store_isolation ON retail.customers
FOR ALL TO mcp_user
USING (
store_id = current_setting('app.current_store_id', true)
AND current_setting('app.current_store_id', true) IS NOT NULL
AND current_setting('app.current_store_id', true) != ''
)
WITH CHECK (
store_id = current_setting('app.current_store_id', true)
AND current_setting('app.current_store_id', true) IS NOT NULL
AND current_setting('app.current_store_id', true) != ''
);
-- Products: store isolation + only active products readable
CREATE POLICY products_store_isolation ON retail.products
FOR ALL TO mcp_user
USING (
store_id = current_setting('app.current_store_id', true)
AND current_setting('app.current_store_id', true) IS NOT NULL
AND current_setting('app.current_store_id', true) != ''
AND is_active = TRUE
);
-- Sales transactions: store-level isolation
CREATE POLICY sales_transactions_store_isolation ON retail.sales_transactions
FOR ALL TO mcp_user
USING (
store_id = current_setting('app.current_store_id', true)
AND current_setting('app.current_store_id', true) IS NOT NULL
AND current_setting('app.current_store_id', true) != ''
);
-- Transaction items: isolation via join with transactions
CREATE POLICY sales_transaction_items_store_isolation ON retail.sales_transaction_items
FOR ALL TO mcp_user
USING (
transaction_id IN (
SELECT transaction_id
FROM retail.sales_transactions
WHERE store_id = current_setting('app.current_store_id', true)
)
);
Step 4: Test RLS isolation
DO $$
DECLARE
customer_count INTEGER;
product_count INTEGER;
BEGIN
-- Test Seattle store context
PERFORM retail.set_store_context('seattle');
SELECT COUNT(*) INTO customer_count FROM retail.customers;
SELECT COUNT(*) INTO product_count FROM retail.products;
RAISE NOTICE 'Seattle store - Customers: %, Products: %', customer_count, product_count;
-- Test Redmond store context
PERFORM retail.set_store_context('redmond');
SELECT COUNT(*) INTO customer_count FROM retail.customers;
SELECT COUNT(*) INTO product_count FROM retail.products;
RAISE NOTICE 'Redmond store - Customers: %, Products: %', customer_count, product_count;
IF customer_count > 0 AND product_count > 0 THEN
RAISE NOTICE 'RLS policies are working correctly';
ELSE
RAISE WARNING 'RLS policies may not be configured correctly';
END IF;
END;
$$;
Azure Entra ID authentication
Token validation
# mcp_server/security/authentication.py
import jwt
import aiohttp
from azure.identity.aio import DefaultAzureCredential
from azure.keyvault.secrets.aio import SecretClient
class AzureAuthenticator:
"""Handle Azure Entra ID authentication and token validation."""
def __init__(self):
self.tenant_id = os.getenv('AZURE_TENANT_ID')
self.client_id = os.getenv('AZURE_CLIENT_ID')
self.audience = os.getenv('AZURE_AUDIENCE', self.client_id)
self.issuer = f"https://login.microsoftonline.com/{self.tenant_id}/v2.0"
self._jwks_cache = None
self._jwks_cache_expiry = None
async def validate_token(self, token: str) -> Dict:
"""Validate JWT token from Azure Entra ID."""
try:
signing_keys = await self._get_signing_keys()
unverified_header = jwt.get_unverified_header(token)
key_id = unverified_header.get('kid')
signing_key = next(
(jwt.algorithms.RSAAlgorithm.from_jwk(k) for k in signing_keys if k['kid'] == key_id),
None
)
if not signing_key:
raise ValueError(f"Unable to find signing key for kid: {key_id}")
payload = jwt.decode(
token,
signing_key,
algorithms=['RS256'],
audience=self.audience,
issuer=self.issuer,
options={'verify_exp': True, 'verify_aud': True, 'verify_iss': True}
)
return self._extract_user_info(payload)
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidAudienceError:
raise ValueError("Invalid token audience")
except jwt.InvalidIssuerError:
raise ValueError("Invalid token issuer")
except Exception as e:
raise ValueError(f"Token validation failed: {str(e)}")
def _extract_user_info(self, payload: Dict) -> Dict:
return {
'user_id': payload.get('oid') or payload.get('sub'),
'email': payload.get('email') or payload.get('preferred_username'),
'name': payload.get('name'),
'tenant_id': payload.get('tid'),
'roles': payload.get('roles', []),
'scope': payload.get('scp', '').split() if payload.get('scp') else [],
'expires_at': datetime.fromtimestamp(payload['exp'], timezone.utc),
}
Role-based authorization
# mcp_server/security/authorization.py
class RoleBasedAuth:
"""Role-based access control implementation."""
ROLE_HIERARCHY = {
'store_admin': ['store_manager', 'store_user', 'store_readonly'],
'store_manager': ['store_user', 'store_readonly'],
'store_user': ['store_readonly'],
'store_readonly': []
}
ROLE_PERMISSIONS = {
'store_admin': ['read_all', 'write_all', 'delete_all', 'manage_users'],
'store_manager': ['read_all', 'write_transactions', 'write_inventory', 'read_reports'],
'store_user': ['read_products', 'read_customers', 'write_transactions'],
'store_readonly': ['read_products', 'read_basic_reports']
}
@classmethod
def has_permission(cls, user_roles: List[str], required_permission: str) -> bool:
"""Check if user has required permission, including inherited permissions."""
user_permissions = set()
for role in user_roles:
user_permissions.update(cls.ROLE_PERMISSIONS.get(role, []))
for inherited_role in cls.ROLE_HIERARCHY.get(role, []):
user_permissions.update(cls.ROLE_PERMISSIONS.get(inherited_role, []))
return required_permission in user_permissions
Security audit logging
Audit log table
CREATE TABLE retail.security_audit_log (
log_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
event_type VARCHAR(100) NOT NULL,
user_name VARCHAR(100) NOT NULL,
user_id VARCHAR(100),
store_id VARCHAR(50),
ip_address INET,
action VARCHAR(50) NOT NULL,
success BOOLEAN NOT NULL DEFAULT TRUE,
failure_reason TEXT,
details JSONB,
severity VARCHAR(20) DEFAULT 'INFO',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT valid_severity CHECK (severity IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'))
);
-- Indexes for security query performance
CREATE INDEX idx_security_audit_event_type ON retail.security_audit_log(event_type);
CREATE INDEX idx_security_audit_user_name ON retail.security_audit_log(user_name);
CREATE INDEX idx_security_audit_store_id ON retail.security_audit_log(store_id);
CREATE INDEX idx_security_audit_created_at ON retail.security_audit_log(created_at);
CREATE INDEX idx_security_audit_success ON retail.security_audit_log(success);
CREATE INDEX idx_security_audit_details ON retail.security_audit_log USING GIN(details);
Security monitoring views
-- Failed authentication attempts (3+ failures in last 24h)
CREATE VIEW retail.security_failed_auth AS
SELECT
event_type,
user_name,
ip_address,
COUNT(*) as attempt_count,
MIN(created_at) as first_attempt,
MAX(created_at) as last_attempt,
ARRAY_AGG(DISTINCT failure_reason) as failure_reasons
FROM retail.security_audit_log
WHERE success = FALSE
AND event_type IN ('authentication_failed', 'token_validation_failed')
AND created_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY event_type, user_name, ip_address
HAVING COUNT(*) >= 3
ORDER BY attempt_count DESC;
-- Suspicious access patterns (multiple IPs or stores in 1 hour)
CREATE VIEW retail.security_suspicious_access AS
SELECT
user_name,
user_id,
COUNT(DISTINCT ip_address) as ip_count,
COUNT(DISTINCT store_id) as store_count,
ARRAY_AGG(DISTINCT ip_address::TEXT) as ip_addresses,
ARRAY_AGG(DISTINCT store_id) as stores_accessed
FROM retail.security_audit_log
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND success = TRUE
GROUP BY user_name, user_id
HAVING COUNT(DISTINCT ip_address) > 3 OR COUNT(DISTINCT store_id) > 2;
Security monitoring in Python
# mcp_server/security/monitoring.py
class SecurityMonitor:
"""Monitor security events and generate alerts."""
def __init__(self, db_connection_string: str):
self.db_connection_string = db_connection_string
self.thresholds = {
'failed_auth_attempts': 5,
'multiple_ip_access': 3,
'excessive_data_access': 1000,
'privilege_escalation': 1,
'unauthorized_store_access': 1
}
async def start_monitoring(self):
"""Start security monitoring loop."""
logger.info("Starting security monitoring")
while True:
try:
await self._check_security_events()
await asyncio.sleep(300) # Check every 5 minutes
except Exception as e:
logger.error(f"Security monitoring error: {e}")
await asyncio.sleep(60)
async def _check_failed_auth(self, conn):
"""Check for excessive failed authentication attempts."""
query = """
SELECT user_name, ip_address, COUNT(*) as attempt_count
FROM retail.security_audit_log
WHERE success = FALSE
AND event_type IN ('authentication_failed', 'token_validation_failed')
AND created_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY user_name, ip_address
HAVING COUNT(*) >= $1
"""
results = await conn.fetch(query, self.thresholds['failed_auth_attempts'])
for record in results:
await self._send_alert(SecurityAlert(
alert_type='failed_authentication',
severity='HIGH',
message=f"Excessive failed logins for {record['user_name']}",
details=dict(record),
timestamp=datetime.now()
))
Automated security tests
# tests/security/test_security.py
class TestRowLevelSecurity:
async def test_store_context_isolation(self, db_connection):
"""Test that RLS properly isolates data by store."""
await db_connection.execute("SELECT retail.set_store_context('seattle')")
seattle_customers = await db_connection.fetchval(
"SELECT COUNT(*) FROM retail.customers"
)
await db_connection.execute("SELECT retail.set_store_context('redmond')")
redmond_customers = await db_connection.fetchval(
"SELECT COUNT(*) FROM retail.customers"
)
# Counts must differ (each store sees only its own data)
assert seattle_customers != redmond_customers or (
seattle_customers == 0 and redmond_customers == 0
)
async def test_unauthorized_store_access(self, db_connection):
"""Test that invalid store access is blocked."""
with pytest.raises(Exception) as exc_info:
await db_connection.execute("SELECT retail.set_store_context('invalid_store')")
assert "Store not found" in str(exc_info.value)
async def test_cross_store_data_leakage(self, db_connection):
"""Test that users cannot insert data for other stores."""
await db_connection.execute("SELECT retail.set_store_context('seattle')")
with pytest.raises(Exception):
await db_connection.execute("""
INSERT INTO retail.customers (store_id, first_name, last_name, email)
VALUES ('redmond', 'Test', 'User', '[email protected]')
""")
Penetration testing checklist
# security-test-checklist.yml
penetration_testing:
authentication_bypass:
tests:
- "Missing Authorization header"
- "Malformed JWT tokens"
- "Replay attack with expired tokens"
- "Token signature manipulation"
- "Audience/issuer manipulation"
authorization_escalation:
tests:
- "Role manipulation in token"
- "Store access boundary testing"
- "Cross-tenant data access attempts"
- "Administrative function access"
sql_injection:
tests:
- "Parameter injection in search queries"
- "Store ID manipulation"
- "JSON parameter injection"
- "Union-based injection attempts"
rate_limiting:
tests:
- "Authentication endpoint flooding"
- "API endpoint rate limits"
- "Resource exhaustion attempts"
- "Connection pool exhaustion"
Key takeaways
- Shared schema + RLS provides cost-effective multi-tenancy with strong isolation
- SECURITY DEFINER functions validate context before setting session variables
- Azure Entra ID with JWKS caching provides scalable token validation
- Role hierarchy with inherited permissions simplifies access management
- Audit logging at the database level captures all security-relevant events
- Automated tests should verify both positive access and cross-store leakage prevention
Next: Lab 3 — Environment Setup
Configure the development environment, deploy Azure resources, and start Docker containers.