Skip to main content

Overview

Kortix provides a flexible framework for creating custom tools that extend agent capabilities. This guide covers everything from basic tool creation to advanced patterns and best practices.

Tool Structure

All custom tools inherit from the Tool base class and use decorators to define metadata and schemas.

Basic Tool Template

from typing import Optional, Dict, Any
from core.agentpress.tool import Tool, ToolResult, openapi_schema, tool_metadata
from core.utils.logger import logger

@tool_metadata(
    display_name="My Custom Tool",
    description="Brief description of what this tool does",
    icon="Wrench",
    color="bg-blue-100 dark:bg-blue-800/50",
    is_core=False,
    weight=50,
    visible=True,
    usage_guide="""## My Custom Tool
    
    Detailed usage instructions for the AI agent.
    Include examples, best practices, and important notes.
    """
)
class MyCustomTool(Tool):
    def __init__(self):
        super().__init__()
        # Initialize any required resources
        self.api_client = None
    
    @openapi_schema({
        "type": "function",
        "function": {
            "name": "my_method",
            "description": "Clear description of what this method does",
            "parameters": {
                "type": "object",
                "properties": {
                    "input_param": {
                        "type": "string",
                        "description": "Description of this parameter"
                    },
                    "optional_param": {
                        "type": "integer",
                        "description": "Optional parameter with default"
                    }
                },
                "required": ["input_param"],
                "additionalProperties": False
            }
        }
    })
    async def my_method(
        self, 
        input_param: str,
        optional_param: int = 10
    ) -> ToolResult:
        """Method implementation."""
        try:
            # Your implementation here
            result = await self._process(input_param, optional_param)
            return self.success_response(result)
        except Exception as e:
            logger.error(f"Error in my_method: {e}")
            return self.fail_response(f"Failed to process: {str(e)}")
    
    async def _process(self, param: str, count: int) -> Dict[str, Any]:
        """Private helper method."""
        # Implementation
        return {"result": "data"}

Step-by-Step Guide

Step 1: Import Required Components

from typing import Optional, Dict, Any, List
from core.agentpress.tool import (
    Tool, 
    ToolResult, 
    openapi_schema, 
    tool_metadata,
    method_metadata  # Optional, for method-level metadata
)
from core.utils.logger import logger

Step 2: Define Tool Metadata

Use the @tool_metadata decorator to provide information about your tool:
@tool_metadata(
    display_name="Database Query Tool",
    description="Execute database queries and return results",
    icon="Database",
    color="bg-purple-100 dark:bg-purple-800/50",
    is_core=False,
    weight=60,
    visible=True,
    usage_guide="""## Database Query Tool
    
    Execute SQL queries against the configured database.
    
    ### Available Methods
    - query: Execute SELECT queries
    - execute: Execute INSERT/UPDATE/DELETE
    - transaction: Execute multiple queries in a transaction
    
    ### Best Practices
    - Always use parameterized queries to prevent SQL injection
    - Limit result sets with LIMIT clause
    - Use transactions for data modifications
    """
)
class DatabaseQueryTool(Tool):
    pass
Metadata Parameters:
  • display_name - Human-readable name shown in UI
  • description - Short description (1-2 sentences)
  • icon - Icon name from your icon library
  • color - Tailwind CSS color classes for UI
  • is_core - Whether tool is always enabled
  • weight - Sort priority (lower = higher priority)
  • visible - Whether shown in frontend UI
  • usage_guide - Detailed markdown documentation for AI

Step 3: Initialize Tool Resources

class DatabaseQueryTool(Tool):
    def __init__(self, connection_string: str):
        super().__init__()
        self.connection_string = connection_string
        self.connection_pool = None
        self._initialize_connection()
    
    def _initialize_connection(self):
        """Initialize database connection pool."""
        try:
            # Setup connection pool
            self.connection_pool = create_pool(self.connection_string)
            logger.info("Database connection pool initialized")
        except Exception as e:
            logger.error(f"Failed to initialize database: {e}")
            raise

Step 4: Define Tool Methods with OpenAPI Schemas

@openapi_schema({
    "type": "function",
    "function": {
        "name": "query",
        "description": "Execute a SELECT query and return results",
        "parameters": {
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "SQL SELECT query to execute"
                },
                "params": {
                    "type": "object",
                    "description": "Query parameters for prepared statement",
                    "additionalProperties": True
                },
                "limit": {
                    "type": "integer",
                    "description": "Maximum number of rows to return",
                    "default": 100
                }
            },
            "required": ["sql"],
            "additionalProperties": False
        }
    }
})
async def query(
    self,
    sql: str,
    params: Optional[Dict[str, Any]] = None,
    limit: int = 100
) -> ToolResult:
    """Execute a SELECT query."""
    try:
        # Validate query
        if not sql.strip().upper().startswith('SELECT'):
            return self.fail_response("Only SELECT queries allowed")
        
        # Execute query
        async with self.connection_pool.acquire() as conn:
            rows = await conn.fetch(sql, **(params or {}), limit=limit)
        
        # Format results
        results = [dict(row) for row in rows]
        
        return self.success_response({
            "rows": results,
            "count": len(results)
        })
    except Exception as e:
        logger.error(f"Query failed: {e}")
        return self.fail_response(f"Database query failed: {str(e)}")

Step 5: Add Multiple Methods

@openapi_schema({
    "type": "function",
    "function": {
        "name": "execute",
        "description": "Execute INSERT, UPDATE, or DELETE statement",
        "parameters": {
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "SQL statement to execute"
                },
                "params": {
                    "type": "object",
                    "description": "Statement parameters"
                }
            },
            "required": ["sql"]
        }
    }
})
async def execute(
    self,
    sql: str,
    params: Optional[Dict[str, Any]] = None
) -> ToolResult:
    """Execute a data modification statement."""
    try:
        async with self.connection_pool.acquire() as conn:
            result = await conn.execute(sql, **(params or {}))
        
        return self.success_response({
            "affected_rows": result.split()[1] if result else 0,
            "status": "success"
        })
    except Exception as e:
        logger.error(f"Execute failed: {e}")
        return self.fail_response(f"Failed to execute: {str(e)}")

Real-World Example: API Integration Tool

Here’s a complete example integrating with an external API:
import httpx
from typing import Optional, Dict, Any, List
from core.agentpress.tool import Tool, ToolResult, openapi_schema, tool_metadata
from core.utils.logger import logger

@tool_metadata(
    display_name="GitHub API",
    description="Interact with GitHub repositories, issues, and pull requests",
    icon="Github",
    color="bg-gray-100 dark:bg-gray-800/50",
    weight=40,
    visible=True,
    usage_guide="""## GitHub API Tool
    
    Access GitHub repositories, issues, PRs, and more.
    
    ### Authentication
    Requires GITHUB_TOKEN environment variable with repo scope.
    
    ### Available Methods
    - search_repositories: Search for repos
    - get_repository: Get repo details
    - create_issue: Create an issue
    - list_pull_requests: List PRs for a repo
    
    ### Rate Limits
    - Authenticated: 5,000 requests/hour
    - Unauthenticated: 60 requests/hour
    """
)
class GitHubTool(Tool):
    def __init__(self, github_token: Optional[str] = None):
        super().__init__()
        self.token = github_token or os.getenv('GITHUB_TOKEN')
        self.base_url = "https://api.github.com"
        self.headers = {
            "Accept": "application/vnd.github.v3+json",
            "Authorization": f"token {self.token}" if self.token else None
        }
    
    @openapi_schema({
        "type": "function",
        "function": {
            "name": "search_repositories",
            "description": "Search GitHub repositories",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query (e.g., 'machine learning language:python')"
                    },
                    "sort": {
                        "type": "string",
                        "enum": ["stars", "forks", "updated"],
                        "description": "Sort results by",
                        "default": "stars"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "Max results to return",
                        "default": 10
                    }
                },
                "required": ["query"]
            }
        }
    })
    async def search_repositories(
        self,
        query: str,
        sort: str = "stars",
        limit: int = 10
    ) -> ToolResult:
        """Search GitHub repositories."""
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"{self.base_url}/search/repositories",
                    params={"q": query, "sort": sort, "per_page": limit},
                    headers=self.headers
                )
                response.raise_for_status()
                data = response.json()
            
            repos = [{
                "name": item["full_name"],
                "description": item["description"],
                "stars": item["stargazers_count"],
                "forks": item["forks_count"],
                "url": item["html_url"]
            } for item in data["items"]]
            
            return self.success_response({
                "repositories": repos,
                "total_count": data["total_count"]
            })
        except httpx.HTTPError as e:
            logger.error(f"GitHub API error: {e}")
            return self.fail_response(f"Failed to search repositories: {str(e)}")
    
    @openapi_schema({
        "type": "function",
        "function": {
            "name": "create_issue",
            "description": "Create an issue in a repository",
            "parameters": {
                "type": "object",
                "properties": {
                    "repo": {
                        "type": "string",
                        "description": "Repository (owner/repo format)"
                    },
                    "title": {
                        "type": "string",
                        "description": "Issue title"
                    },
                    "body": {
                        "type": "string",
                        "description": "Issue description"
                    },
                    "labels": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Labels to add"
                    }
                },
                "required": ["repo", "title"]
            }
        }
    })
    async def create_issue(
        self,
        repo: str,
        title: str,
        body: Optional[str] = None,
        labels: Optional[List[str]] = None
    ) -> ToolResult:
        """Create a GitHub issue."""
        try:
            payload = {
                "title": title,
                "body": body or "",
                "labels": labels or []
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{self.base_url}/repos/{repo}/issues",
                    json=payload,
                    headers=self.headers
                )
                response.raise_for_status()
                issue = response.json()
            
            return self.success_response({
                "issue_number": issue["number"],
                "url": issue["html_url"],
                "state": issue["state"]
            })
        except httpx.HTTPError as e:
            logger.error(f"Failed to create issue: {e}")
            return self.fail_response(f"Issue creation failed: {str(e)}")

Advanced Patterns

1. Sandbox-Aware Tools

For tools that need access to the sandbox environment:
from core.sandbox.tool_base import SandboxToolsBase
from core.agentpress.thread_manager import ThreadManager

@tool_metadata(
    display_name="File Processor",
    description="Process files in the sandbox"
)
class FileProcessorTool(SandboxToolsBase):
    def __init__(self, project_id: str, thread_manager: ThreadManager):
        super().__init__(project_id, thread_manager)
    
    @openapi_schema({...})
    async def process_file(self, filepath: str) -> ToolResult:
        # Access sandbox
        await self._ensure_sandbox()
        
        # Read file from sandbox
        content = await self.sandbox.fs.read_file(filepath)
        
        # Process and return
        result = self._process_content(content)
        return self.success_response(result)

2. Stateful Tools

Tools that maintain state across calls:
@tool_metadata(
    display_name="Conversation Memory",
    description="Store and retrieve conversation context"
)
class ConversationMemoryTool(Tool):
    def __init__(self):
        super().__init__()
        self.memory: Dict[str, List[Dict]] = {}
    
    @openapi_schema({...})
    async def store_memory(
        self,
        conversation_id: str,
        key: str,
        value: Any
    ) -> ToolResult:
        """Store information in conversation memory."""
        if conversation_id not in self.memory:
            self.memory[conversation_id] = []
        
        self.memory[conversation_id].append({
            "key": key,
            "value": value,
            "timestamp": time.time()
        })
        
        return self.success_response({"stored": True})
    
    @openapi_schema({...})
    async def recall_memory(
        self,
        conversation_id: str,
        key: Optional[str] = None
    ) -> ToolResult:
        """Retrieve stored information."""
        if conversation_id not in self.memory:
            return self.success_response({"memories": []})
        
        memories = self.memory[conversation_id]
        if key:
            memories = [m for m in memories if m["key"] == key]
        
        return self.success_response({"memories": memories})

3. Tools with Method-Level Metadata

from core.agentpress.tool import method_metadata

@tool_metadata(...)
class AdvancedTool(Tool):
    @method_metadata(
        display_name="Advanced Search",
        description="Search with advanced filters",
        is_core=False,
        visible=True
    )
    @openapi_schema({...})
    async def advanced_search(self, query: str) -> ToolResult:
        pass
    
    @method_metadata(
        display_name="Internal Helper",
        description="Internal method not shown in UI",
        visible=False  # Hidden from UI
    )
    @openapi_schema({...})
    async def internal_helper(self, data: Any) -> ToolResult:
        pass

Registering Custom Tools

Add to Tool Registry

Edit core/tools/tool_registry.py:
CUSTOM_TOOLS = [
    ('github_tool', 'core.tools.github_tool', 'GitHubTool'),
    ('database_tool', 'core.tools.database_tool', 'DatabaseQueryTool'),
]

ALL_TOOLS = CORE_TOOLS + SANDBOX_TOOLS + SEARCH_TOOLS + UTILITY_TOOLS + CUSTOM_TOOLS

Dynamic Registration

from core.tools.tool_registry import get_all_tools

# Get tool class
tool_class = get_tool_class('core.tools.github_tool', 'GitHubTool')

# Instantiate
github_tool = tool_class(github_token='xxx')

# Register with thread manager
thread_manager.tool_registry.register_tool('github', github_tool)

Testing Custom Tools

import pytest
from core.tools.github_tool import GitHubTool

@pytest.mark.asyncio
async def test_search_repositories():
    tool = GitHubTool(github_token="test_token")
    
    result = await tool.search_repositories(
        query="python machine learning",
        sort="stars",
        limit=5
    )
    
    assert result.success is True
    data = json.loads(result.output)
    assert "repositories" in data
    assert len(data["repositories"]) <= 5

@pytest.mark.asyncio
async def test_create_issue():
    tool = GitHubTool(github_token="test_token")
    
    result = await tool.create_issue(
        repo="owner/repo",
        title="Test Issue",
        body="Test description"
    )
    
    assert result.success is True
    data = json.loads(result.output)
    assert "issue_number" in data

Best Practices

1. Error Handling

try:
    # Operation
    result = await operation()
    return self.success_response(result)
except SpecificException as e:
    logger.error(f"Specific error: {e}")
    return self.fail_response(f"Specific error message: {str(e)}")
except Exception as e:
    logger.error(f"Unexpected error: {e}", exc_info=True)
    return self.fail_response(f"Unexpected error: {str(e)}")

2. Input Validation

def _validate_input(self, param: str) -> bool:
    if not param or len(param) < 3:
        return False
    if not re.match(r'^[a-zA-Z0-9_]+$', param):
        return False
    return True

async def my_method(self, param: str) -> ToolResult:
    if not self._validate_input(param):
        return self.fail_response("Invalid input format")
    # Continue...

3. Async Operations

# Good - proper async usage
async def fetch_data(self, url: str) -> ToolResult:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    return self.success_response(response.json())

# Bad - blocking operation
def fetch_data_blocking(self, url: str) -> ToolResult:
    response = requests.get(url)  # Blocks event loop!
    return self.success_response(response.json())

4. Resource Cleanup

class MyTool(Tool):
    def __init__(self):
        super().__init__()
        self.connection = None
    
    async def cleanup(self):
        """Clean up resources."""
        if self.connection:
            await self.connection.close()
            self.connection = None

5. Comprehensive Documentation

usage_guide="""## Tool Name

Brief overview of what this tool does.

### When to Use
- Use case 1
- Use case 2

### When NOT to Use
- Scenario where another tool is better

### Methods

#### method_name
Detailed description of the method.

**Parameters:**
- `param1` - What it does
- `param2` - What it does

**Returns:**
{"key": "description of return value"}

**Example:**
method_name(param1=“value”, param2=123)

### Best Practices
1. Practice 1
2. Practice 2

### Common Pitfalls
- Pitfall 1 and how to avoid it
- Pitfall 2 and how to avoid it
"""

Next Steps

Build docs developers (and LLMs) love