Skip to main content

Overview

Kortix provides comprehensive support for the Model Context Protocol (MCP), enabling agents to interact with external tools and services through standardized MCP servers. The MCP integration supports multiple connection types and includes advanced features like schema caching, JIT loading, and connection pooling.

MCP Architecture

Kortix’s MCP system consists of several key components:

MCP Registry

The MCPRegistry (core/agentpress/mcp_registry.py) manages all MCP tool discovery and activation:
class MCPRegistry:
    """Central registry for MCP tools with status tracking and caching."""
    
    def __init__(self):
        self._tools: Dict[str, MCPToolInfo] = {}
        self._toolkit_mapping: Dict[str, Set[str]] = {}
        self._status_index: Dict[MCPToolStatus, Set[str]] = {}
        self._schema_cache: Dict[str, Dict[str, Any]] = {}

Tool Status Lifecycle

class MCPToolStatus(Enum):
    DISCOVERED = "discovered"  # Tool found but not loaded
    LOADING = "loading"        # Currently being loaded
    ACTIVE = "active"          # Ready to use
    FAILED = "failed"          # Loading/execution failed
    DISABLED = "disabled"      # Manually disabled

MCP Tool Wrapper

The MCPToolWrapper (core/tools/mcp_tool_wrapper.py) provides the interface for using MCP tools:
@tool_metadata(
    display_name="MCP Tool Wrapper",
    description="Internal wrapper for MCP external tool integration",
    visible=False
)
class MCPToolWrapper(Tool):
    def __init__(self, mcp_configs: Optional[List[Dict[str, Any]]] = None, 
                 use_cache: bool = True, account_id: str = None):
        self.mcp_manager = mcp_service
        self.mcp_configs = mcp_configs or []
        self.connection_manager = MCPConnectionManager()
        self.custom_handler = CustomMCPHandler(self.connection_manager, account_id)

Supported MCP Types

Kortix supports four types of MCP server connections:

1. SSE (Server-Sent Events)

For HTTP-based streaming connections:
{
    "name": "my_sse_server",
    "isCustom": True,
    "customType": "sse",
    "config": {
        "url": "https://mcp.example.com/sse",
        "headers": {  # Optional
            "Authorization": "Bearer token"
        }
    },
    "enabledTools": ["tool1", "tool2"]  # Optional filter
}
Implementation (custom_mcp_handler.py):
async def _initialize_sse_mcp(self, server_name: str, 
                               server_config: Dict[str, Any], 
                               enabled_tools: List[str]):
    from mcp.client.sse import sse_client
    from mcp import ClientSession
    
    url = server_config.get('url')
    headers = server_config.get('headers', {})
    
    async with sse_client(url, headers=headers) as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tools_result = await session.list_tools()
            tools = tools_result.tools
            
            self._register_custom_tools(tools, server_name, enabled_tools, 'sse', server_config)

2. HTTP (Streamable HTTP)

For standard HTTP-based MCP servers:
{
    "name": "my_http_server",
    "isCustom": True,
    "customType": "http",
    "config": {
        "url": "https://mcp.example.com"
    },
    "enabledTools": []  # Empty = all tools
}
Implementation:
async def _initialize_http_mcp(self, server_name: str,
                                server_config: Dict[str, Any],
                                enabled_tools: List[str]):
    from mcp.client.streamable_http import streamablehttp_client
    from mcp import ClientSession
    
    url = server_config.get('url')
    
    async with streamablehttp_client(url) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tools_result = await session.list_tools()
            tools = tools_result.tools
            
            self._register_custom_tools(tools, server_name, enabled_tools, 'http', server_config)

3. JSON/stdio (Process-based)

For local MCP servers running as processes:
{
    "name": "my_stdio_server",
    "isCustom": True,
    "customType": "json",
    "config": {
        "command": "node",
        "args": ["/path/to/server.js"],
        "env": {  # Optional environment variables
            "API_KEY": "xxx"
        }
    }
}
Implementation:
async def _initialize_json_mcp(self, server_name: str,
                                server_config: Dict[str, Any],
                                enabled_tools: List[str]):
    from mcp import ClientSession, StdioServerParameters
    from mcp.client.stdio import stdio_client
    
    command = server_config.get('command')
    args = server_config.get('args', [])
    env = server_config.get('env', {})
    
    server_params = StdioServerParameters(
        command=command,
        args=args,
        env=env
    )
    
    async with stdio_client(server_params) as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tools_result = await session.list_tools()
            tools = tools_result.tools
            
            self._register_custom_tools(tools, server_name, enabled_tools, 'json', server_config)

4. Composio MCP

For Composio-managed MCP servers (see Composio Integration):
{
    "name": "gmail_integration",
    "isCustom": True,
    "customType": "composio",
    "config": {
        "profile_id": "prof_xxx"
    }
}

Schema Caching

Kortix implements Redis-based schema caching for fast MCP tool loading:

Cache Configuration

class MCPRegistry:
    SCHEMA_CACHE_TTL_HOURS = 24
    SCHEMA_CACHE_KEY_PREFIX = "mcp_schema:"

Cache Flow

async def _load_schemas_from_mcp(self, tool_names: List[str], 
                                  account_id: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
    # 1. Check cache first
    cached_schemas = await self._get_cached_toolkit_schemas(toolkit_slug)
    if cached_schemas:
        logger.info(f"⚡ Cache HIT for {toolkit_slug}")
        return cached_schemas
    
    # 2. Load from MCP server
    toolkit_schemas = await self._load_custom_mcp_schemas(custom_type, config)
    
    # 3. Cache the result
    await self._cache_toolkit_schemas(toolkit_slug, toolkit_schemas)
    
    return toolkit_schemas

Benefits

  • Instant startup: Cached schemas load in milliseconds
  • Reduced API calls: No need to reconnect to MCP servers
  • Improved reliability: Works even if MCP server is temporarily down
  • Lower latency: Cached schemas reduce tool activation time

JIT (Just-In-Time) Tool Loading

MCP tools use JIT loading to minimize startup time:

Tool Discovery

# Tools are discovered but not loaded initially
async def get_discovery_info(self, filter_pattern: Optional[str] = None,
                             load_schemas: bool = True) -> Dict[str, Any]:
    available_tools = {}
    tools_needing_schemas = []
    
    # Return cached schemas immediately
    for tool_name, tool_info in self._tools.items():
        if tool_info.schema:
            available_tools[tool_name] = tool_info.schema
        else:
            tools_needing_schemas.append(tool_name)
    
    # Optionally load missing schemas
    if load_schemas and tools_needing_schemas:
        schemas_loaded = await self._load_schemas_from_mcp(tools_needing_schemas)
        available_tools.update(schemas_loaded)
    
    return {"available_tools": available_tools, "total_count": len(available_tools)}

Auto-Activation

async def execute_tool(self, tool_name: str, args: Dict[str, Any],
                       context: MCPExecutionContext) -> ToolResult:
    # Check if tool is already active
    if not self.is_tool_active(tool_name):
        logger.info(f"🔄 Auto-activating {tool_name}")
        success = await self._auto_activate_tool(tool_name, context)
        if not success:
            return self._fail_response(f"Failed to activate {tool_name}")
    
    # Execute the tool
    tool_info = self._tools[tool_name]
    method = getattr(tool_info.instance, tool_name)
    result = await method(**args)
    
    # Update statistics
    tool_info.call_count += 1
    tool_info.last_used_ms = time.time() * 1000
    
    return result

Using MCP Tools

Initialize MCP Wrapper

from core.tools.mcp_tool_wrapper import MCPToolWrapper

mcp_configs = [
    {
        "name": "github_mcp",
        "isCustom": True,
        "customType": "http",
        "config": {"url": "https://mcp.github.com"}
    },
    {
        "name": "notion_mcp",
        "isCustom": True,
        "customType": "sse",
        "config": {
            "url": "https://mcp.notion.com/sse",
            "headers": {"Authorization": f"Bearer {notion_token}"}
        }
    }
]

mcp_wrapper = MCPToolWrapper(
    mcp_configs=mcp_configs,
    use_cache=True,
    account_id=user_account_id
)

await mcp_wrapper.initialize_and_register_tools()

Dynamic Method Access

# MCP tools become methods on the wrapper
result = await mcp_wrapper.github_search_repositories(
    query="machine learning",
    limit=10
)

# Or use execute method
result = await mcp_wrapper._execute_mcp_tool(
    tool_name="github_search_repositories",
    arguments={"query": "machine learning", "limit": 10}
)

Tool Registry Integration

from core.agentpress.mcp_registry import get_mcp_registry, init_mcp_registry_from_loader

# Initialize registry from loader
init_mcp_registry_from_loader(mcp_loader)

# Get registry instance
registry = get_mcp_registry()

# Check tool availability
if registry.is_tool_available('github_create_issue'):
    tool_info = registry.get_tool_info('github_create_issue')
    print(f"Status: {tool_info.status}")
    print(f"Toolkit: {tool_info.toolkit_slug}")

Connection Management

The MCPConnectionManager handles connection pooling:
class MCPConnectionManager:
    def __init__(self):
        self.connections: Dict[str, Any] = {}
        self.sessions: Dict[str, Any] = {}
    
    async def connect_sse_server(self, server_name: str, 
                                  config: Dict[str, Any]) -> Dict[str, Any]:
        # Establish SSE connection
        # Store in connection pool
        # Return server info with tools
    
    async def connect_http_server(self, server_name: str,
                                   config: Dict[str, Any]) -> Dict[str, Any]:
        # Establish HTTP connection
        # Store in connection pool
        # Return server info with tools

Error Handling

Failed Tool Loading

async def _initialize_single_custom_mcp(self, config: Dict[str, Any]):
    try:
        config_name = config.get('name', 'Unknown')
        logger.debug(f"Initializing custom MCP: {config_name}")
        
        await self.custom_handler._initialize_single_custom_mcp(config)
        
        return {'tools': custom_tools, 'type': 'custom', 'success': True}
    except Exception as e:
        logger.error(f"✗ Failed to initialize custom MCP {config_name}: {e}")
        # Return error info instead of raising - allows execution to continue
        return {
            'tools': {}, 
            'type': 'custom', 
            'success': False, 
            'error': str(e), 
            'config_name': config_name
        }

Execution Errors

async def execute_tool(self, tool_name: str, args: Dict[str, Any],
                       context: MCPExecutionContext) -> ToolResult:
    try:
        # Execute tool
        result = await method(**args)
        return result
    except Exception as e:
        # Track error
        tool_info = self._tools.get(tool_name)
        if tool_info:
            tool_info.last_error = str(e)
            tool_info.error_count += 1
            self._update_tool_status(tool_name, MCPToolStatus.FAILED)
        
        logger.error(f"❌ {tool_name} failed: {e}")
        return self._fail_response(f"MCP tool execution error: {str(e)}")

Performance Optimization

Parallel Initialization

async def _initialize_servers(self):
    initialization_tasks = []
    
    # Queue all initialization tasks
    for config in self.mcp_configs:
        task = self._initialize_single_custom_mcp(config)
        initialization_tasks.append(('custom', config, task))
    
    # Execute in parallel
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    logger.debug(f"⚡ Initialized {successful} servers in {elapsed_time:.2f}s")

Schema Pre-warming

async def prewarm_schemas(self, account_id: Optional[str] = None) -> int:
    """Pre-load schemas from cache at startup."""
    toolkits = self.get_available_toolkits()
    
    warmed_count = 0
    for toolkit_slug in toolkits:
        cached = await self._get_cached_toolkit_schemas(toolkit_slug)
        if cached:
            for tool_name, schema in cached.items():
                if tool_name in self._tools:
                    self._tools[tool_name].schema = schema
                    warmed_count += 1
    
    logger.info(f"✅ Pre-warmed {warmed_count} schemas from Redis")
    return warmed_count

Monitoring and Statistics

# Get registry statistics
stats = registry.get_registry_stats()
print(f"Total tools: {stats['total_tools']}")
print(f"Active tools: {stats['active_tools']}")
print(f"Failed tools: {stats['failed_tools']}")
print(f"Toolkits: {stats['toolkits']}")
print(f"Status breakdown: {stats['status_breakdown']}")

# Get tool-specific info
tool_info = registry.get_tool_info('github_create_issue')
print(f"Call count: {tool_info.call_count}")
print(f"Last used: {tool_info.last_used_ms}")
print(f"Error count: {tool_info.error_count}")

Best Practices

  1. Enable Caching: Always use use_cache=True for production
  2. Filter Tools: Use enabledTools to load only needed tools
  3. Handle Failures: MCP servers may be unavailable - design for resilience
  4. Monitor Performance: Track activation times and cache hit rates
  5. Batch Operations: Initialize multiple MCP servers in parallel
  6. Error Recovery: Implement retry logic for transient failures

Next Steps

Build docs developers (and LLMs) love