Overview
Secure MCP Gateway provides comprehensive OAuth 2.0 and OAuth 2.1 support for authenticating with remote MCP servers. The OAuth service handles token acquisition, caching, refresh, and revocation with full compliance to RFC specifications.Features
OAuth 2.0 & 2.1
Full spec compliance with security best practices
Client Credentials
Server-to-server authentication flow
Mutual TLS (mTLS)
Client certificate authentication (RFC 8705)
Token Caching
Automatic caching with expiration tracking
Proactive Refresh
Tokens refreshed 5 minutes before expiry
Scope Validation
Verifies tokens have requested scopes
Architecture
The OAuth service consists of several specialized components:┌─────────────────────────────────────────────────────────┐
│ OAuthService │
│ - Token acquisition with retry logic │
│ - Grant type routing (client_credentials, auth_code) │
│ - Error handling and correlation IDs │
└──────────────────┬──────────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌───────▼──────────┐ ┌──────▼────────────┐
│ TokenManager │ │ OAuthMetrics │
│ - Token cache │ │ - Acquisitions │
│ - Expiration │ │ - Cache hits │
│ - Refresh │ │ - Latency │
└──────────────────┘ └───────────────────┘
OAuth Configuration
OAuthConfig Data Model
src/secure_mcp_gateway/services/oauth/models.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
class OAuthVersion(Enum):
OAUTH_2_0 = "2.0"
OAUTH_2_1 = "2.1"
class OAuthGrantType(Enum):
CLIENT_CREDENTIALS = "client_credentials"
AUTHORIZATION_CODE = "authorization_code"
REFRESH_TOKEN = "refresh_token"
@dataclass
class OAuthConfig:
# Core configuration
enabled: bool = False
version: OAuthVersion = OAuthVersion.OAUTH_2_0
grant_type: OAuthGrantType = OAuthGrantType.CLIENT_CREDENTIALS
token_url: str = ""
# Client credentials
client_id: Optional[str] = None
client_secret: Optional[str] = None
# Optional parameters
audience: Optional[str] = None
organization: Optional[str] = None
scope: Optional[str] = None
# OAuth 2.1 specific
resource: Optional[str] = None # Resource Indicator (RFC 8707)
use_pkce: bool = False # PKCE for Authorization Code flow
# Authorization Code Grant
authorization_url: Optional[str] = None
redirect_uri: Optional[str] = None
state: Optional[str] = None
code_verifier: Optional[str] = None
code_challenge: Optional[str] = None
code_challenge_method: str = "S256"
# Token management
token_expiry_buffer: int = 300 # 5 minutes
# Security settings
use_basic_auth: bool = True
enforce_https: bool = True
token_in_header_only: bool = True
# mTLS settings (RFC 8705)
use_mtls: bool = False
client_cert_path: Optional[str] = None
client_key_path: Optional[str] = None
ca_bundle_path: Optional[str] = None
# Token revocation (RFC 7009)
revocation_url: Optional[str] = None
# Scope validation
validate_scopes: bool = True
# Advanced
additional_params: Dict[str, Any] = field(default_factory=dict)
custom_headers: Dict[str, str] = field(default_factory=dict)
Server Configuration Example
{
"server_name": "remote_mcp_server",
"config": {
"url": "https://api.example.com/mcp",
"transport": "http"
},
"oauth": {
"enabled": true,
"version": "2.1",
"grant_type": "client_credentials",
"token_url": "https://auth.example.com/oauth/token",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"audience": "https://api.example.com",
"scope": "mcp:read mcp:write",
"enforce_https": true
}
}
OAuthService Implementation
Token Acquisition
src/secure_mcp_gateway/services/oauth/oauth_service.py
class OAuthService:
async def get_access_token(
self,
server_name: str,
oauth_config: OAuthConfig,
config_id: str,
project_id: str,
force_refresh: bool = False
) -> Tuple[Optional[str], Optional[str]]:
"""
Get access token with caching and retry logic.
Returns:
Tuple of (access_token, error_message)
"""
# Validate configuration
is_valid, error_msg = oauth_config.validate()
if not is_valid:
return None, error_msg
# Check cache unless force refresh
if not force_refresh:
cached_token = await self.token_manager.get_token(
server_name, oauth_config, config_id, project_id
)
if cached_token:
self.metrics.record_cache_hit()
return cached_token.access_token, None
else:
self.metrics.record_cache_miss()
# Obtain new token
if oauth_config.grant_type == OAuthGrantType.CLIENT_CREDENTIALS:
token = await self._client_credentials_flow_with_retry(
server_name, oauth_config
)
else:
return None, f"Unsupported grant type: {oauth_config.grant_type.value}"
if token:
# Store in cache
await self.token_manager.store_token(
server_name, token, config_id, project_id
)
self.metrics.record_token_acquisition(True)
return token.access_token, None
return None, "Failed to obtain token"
Client Credentials Flow with Retry
src/secure_mcp_gateway/services/oauth/oauth_service.py
async def _client_credentials_flow_with_retry(
self, server_name: str, oauth_config: OAuthConfig
) -> Optional[OAuthToken]:
"""
Execute client credentials flow with exponential backoff retry.
Retries: 3 attempts with 2s, 4s, 8s delays
"""
correlation_id = str(uuid.uuid4())
logger.info(
f"[OAuthService] Client credentials flow started",
extra={"server_name": server_name, "correlation_id": correlation_id}
)
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=2, max=8),
retry=retry_if_exception_type(
(aiohttp.ClientError, asyncio.TimeoutError)
),
):
with attempt:
return await self._execute_client_credentials_flow(
server_name, oauth_config, correlation_id
)
except RetryError as e:
logger.error(
f"[OAuthService] All retry attempts failed for {server_name}",
extra={"correlation_id": correlation_id, "error": str(e)}
)
return None
Token Execution with mTLS Support
src/secure_mcp_gateway/services/oauth/oauth_service.py
async def _execute_client_credentials_flow(
self, server_name: str, oauth_config: OAuthConfig, correlation_id: str
) -> OAuthToken:
# Build request body
data = {
"grant_type": "client_credentials",
"client_id": oauth_config.client_id,
}
if not oauth_config.use_basic_auth:
data["client_secret"] = oauth_config.client_secret
if oauth_config.scope:
data["scope"] = oauth_config.scope
if oauth_config.audience:
data["audience"] = oauth_config.audience
if oauth_config.resource:
data["resource"] = oauth_config.resource
# Merge additional params
data.update(oauth_config.additional_params)
# Build headers
headers = {"Content-Type": "application/x-www-form-urlencoded"}
if oauth_config.use_basic_auth:
# RFC 6749: client_secret_basic authentication
credentials = f"{oauth_config.client_id}:{oauth_config.client_secret}"
encoded = base64.b64encode(credentials.encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
headers.update(oauth_config.custom_headers)
# Configure mTLS if enabled
ssl_context = None
if oauth_config.use_mtls:
ssl_context = ssl.create_default_context(
cafile=oauth_config.ca_bundle_path
)
ssl_context.load_cert_chain(
certfile=oauth_config.client_cert_path,
keyfile=oauth_config.client_key_path
)
# Make request
timeout = aiohttp.ClientTimeout(total=self.timeout_manager.get_timeout("auth"))
async with aiohttp.ClientSession(
timeout=timeout, connector=aiohttp.TCPConnector(ssl=ssl_context)
) as session:
async with session.post(
oauth_config.token_url, data=data, headers=headers
) as response:
response_data = await response.json()
if response.status != 200:
raise AuthenticationError(
f"Token request failed: {response_data.get('error')}"
)
return OAuthToken.from_dict(response_data)
Token Management
TokenManager
src/secure_mcp_gateway/services/oauth/token_manager.py
class TokenManager:
def __init__(self):
self._token_cache: Dict[str, OAuthToken] = {}
self._lock = asyncio.Lock()
async def get_token(
self,
server_name: str,
oauth_config: OAuthConfig,
config_id: str,
project_id: str
) -> Optional[OAuthToken]:
"""Get cached token if valid."""
cache_key = self._build_cache_key(server_name, config_id, project_id)
async with self._lock:
token = self._token_cache.get(cache_key)
if token and not self.is_token_expired(
token, oauth_config.token_expiry_buffer
):
return token
# Token expired or not found
if token:
logger.debug(f"[TokenManager] Token expired for {server_name}")
return None
async def store_token(
self,
server_name: str,
token: OAuthToken,
config_id: str,
project_id: str
) -> None:
"""Store token in cache."""
cache_key = self._build_cache_key(server_name, config_id, project_id)
async with self._lock:
self._token_cache[cache_key] = token
logger.info(
f"[TokenManager] Token cached for {server_name}",
extra={"expires_in": token.expires_in}
)
def is_token_expired(
self, token: OAuthToken, buffer_seconds: int = 300
) -> bool:
"""Check if token is expired or will expire soon."""
if not token.created_at or not token.expires_in:
return True
expiry_time = token.created_at + timedelta(seconds=token.expires_in)
buffer_time = expiry_time - timedelta(seconds=buffer_seconds)
return datetime.now(timezone.utc) >= buffer_time
Client Integration
The gateway automatically injects OAuth tokens into MCP server connections:src/secure_mcp_gateway/services/oauth/integration.py
async def inject_oauth_token(
server_entry: Dict[str, Any], config_id: str
) -> Dict[str, Any]:
"""
Inject OAuth token into server connection.
For remote servers: Adds Authorization header
For local servers: Adds ENKRYPT_ACCESS_TOKEN env var
"""
oauth_config = server_entry.get("oauth", {})
if not oauth_config.get("enabled"):
return server_entry
# Get or acquire token
oauth_service = OAuthService()
access_token, error = await oauth_service.get_access_token(
server_entry["server_name"],
OAuthConfig.from_dict(oauth_config),
config_id,
project_id
)
if not access_token:
raise AuthenticationError(f"OAuth token acquisition failed: {error}")
# Inject based on transport type
if server_entry["config"].get("transport") == "http":
# Remote server - add Authorization header
if "headers" not in server_entry["config"]:
server_entry["config"]["headers"] = {}
server_entry["config"]["headers"]["Authorization"] = f"Bearer {access_token}"
else:
# Local server - add environment variable
if "env" not in server_entry["config"]:
server_entry["config"]["env"] = {}
server_entry["config"]["env"]["ENKRYPT_ACCESS_TOKEN"] = access_token
return server_entry
OAuth Metrics
The OAuth service tracks comprehensive metrics:src/secure_mcp_gateway/services/oauth/metrics.py
class OAuthMetrics:
def get_metrics(self) -> Dict[str, Any]:
return {
"token_acquisitions_total": self._acquisitions_total,
"token_acquisitions_success": self._acquisitions_success,
"token_acquisitions_failure": self._acquisitions_failure,
"token_cache_hits": self._cache_hits,
"token_cache_misses": self._cache_misses,
"token_refreshes": self._refreshes,
"cache_hit_ratio": self._cache_hits / (self._cache_hits + self._cache_misses),
"success_rate": self._acquisitions_success / self._acquisitions_total,
"avg_latency_ms": sum(self._latencies) / len(self._latencies),
"max_latency_ms": max(self._latencies),
"min_latency_ms": min(self._latencies),
"active_tokens": len(self.token_manager._token_cache)
}
Security Best Practices
HTTPS Only: OAuth 2.1 requires HTTPS for all token endpoints. Set
enforce_https: true to validate.Token Storage: Tokens are stored in memory only. For multi-instance deployments, consider external cache with encryption.
mTLS Certificates: Store client certificates securely and rotate regularly. Never commit certificates to source control.
Testing OAuth Configuration
Use the echo OAuth test server to verify configuration:bad_mcps/echo_oauth_mcp.py
# This test server echoes back the Authorization header
import os
from mcp import FastMCP
mcp = FastMCP("Echo OAuth Server")
@mcp.tool()
def echo_auth_header() -> str:
"""Returns the Authorization header value for testing."""
auth_header = os.environ.get("ENKRYPT_ACCESS_TOKEN", "No token found")
return f"Received token: {auth_header[:20]}..."