Overview
Kortix provides a flexible framework for creating custom tools that extend agent capabilities. This guide covers everything from basic tool creation to advanced patterns and best practices.Tool Structure
All custom tools inherit from theTool base class and use decorators to define metadata and schemas.
Basic Tool Template
from typing import Optional, Dict, Any
from core.agentpress.tool import Tool, ToolResult, openapi_schema, tool_metadata
from core.utils.logger import logger
@tool_metadata(
display_name="My Custom Tool",
description="Brief description of what this tool does",
icon="Wrench",
color="bg-blue-100 dark:bg-blue-800/50",
is_core=False,
weight=50,
visible=True,
usage_guide="""## My Custom Tool
Detailed usage instructions for the AI agent.
Include examples, best practices, and important notes.
"""
)
class MyCustomTool(Tool):
def __init__(self):
super().__init__()
# Initialize any required resources
self.api_client = None
@openapi_schema({
"type": "function",
"function": {
"name": "my_method",
"description": "Clear description of what this method does",
"parameters": {
"type": "object",
"properties": {
"input_param": {
"type": "string",
"description": "Description of this parameter"
},
"optional_param": {
"type": "integer",
"description": "Optional parameter with default"
}
},
"required": ["input_param"],
"additionalProperties": False
}
}
})
async def my_method(
self,
input_param: str,
optional_param: int = 10
) -> ToolResult:
"""Method implementation."""
try:
# Your implementation here
result = await self._process(input_param, optional_param)
return self.success_response(result)
except Exception as e:
logger.error(f"Error in my_method: {e}")
return self.fail_response(f"Failed to process: {str(e)}")
async def _process(self, param: str, count: int) -> Dict[str, Any]:
"""Private helper method."""
# Implementation
return {"result": "data"}
Step-by-Step Guide
Step 1: Import Required Components
from typing import Optional, Dict, Any, List
from core.agentpress.tool import (
Tool,
ToolResult,
openapi_schema,
tool_metadata,
method_metadata # Optional, for method-level metadata
)
from core.utils.logger import logger
Step 2: Define Tool Metadata
Use the@tool_metadata decorator to provide information about your tool:
@tool_metadata(
display_name="Database Query Tool",
description="Execute database queries and return results",
icon="Database",
color="bg-purple-100 dark:bg-purple-800/50",
is_core=False,
weight=60,
visible=True,
usage_guide="""## Database Query Tool
Execute SQL queries against the configured database.
### Available Methods
- query: Execute SELECT queries
- execute: Execute INSERT/UPDATE/DELETE
- transaction: Execute multiple queries in a transaction
### Best Practices
- Always use parameterized queries to prevent SQL injection
- Limit result sets with LIMIT clause
- Use transactions for data modifications
"""
)
class DatabaseQueryTool(Tool):
pass
display_name- Human-readable name shown in UIdescription- Short description (1-2 sentences)icon- Icon name from your icon librarycolor- Tailwind CSS color classes for UIis_core- Whether tool is always enabledweight- Sort priority (lower = higher priority)visible- Whether shown in frontend UIusage_guide- Detailed markdown documentation for AI
Step 3: Initialize Tool Resources
class DatabaseQueryTool(Tool):
def __init__(self, connection_string: str):
super().__init__()
self.connection_string = connection_string
self.connection_pool = None
self._initialize_connection()
def _initialize_connection(self):
"""Initialize database connection pool."""
try:
# Setup connection pool
self.connection_pool = create_pool(self.connection_string)
logger.info("Database connection pool initialized")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
Step 4: Define Tool Methods with OpenAPI Schemas
@openapi_schema({
"type": "function",
"function": {
"name": "query",
"description": "Execute a SELECT query and return results",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL SELECT query to execute"
},
"params": {
"type": "object",
"description": "Query parameters for prepared statement",
"additionalProperties": True
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return",
"default": 100
}
},
"required": ["sql"],
"additionalProperties": False
}
}
})
async def query(
self,
sql: str,
params: Optional[Dict[str, Any]] = None,
limit: int = 100
) -> ToolResult:
"""Execute a SELECT query."""
try:
# Validate query
if not sql.strip().upper().startswith('SELECT'):
return self.fail_response("Only SELECT queries allowed")
# Execute query
async with self.connection_pool.acquire() as conn:
rows = await conn.fetch(sql, **(params or {}), limit=limit)
# Format results
results = [dict(row) for row in rows]
return self.success_response({
"rows": results,
"count": len(results)
})
except Exception as e:
logger.error(f"Query failed: {e}")
return self.fail_response(f"Database query failed: {str(e)}")
Step 5: Add Multiple Methods
@openapi_schema({
"type": "function",
"function": {
"name": "execute",
"description": "Execute INSERT, UPDATE, or DELETE statement",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL statement to execute"
},
"params": {
"type": "object",
"description": "Statement parameters"
}
},
"required": ["sql"]
}
}
})
async def execute(
self,
sql: str,
params: Optional[Dict[str, Any]] = None
) -> ToolResult:
"""Execute a data modification statement."""
try:
async with self.connection_pool.acquire() as conn:
result = await conn.execute(sql, **(params or {}))
return self.success_response({
"affected_rows": result.split()[1] if result else 0,
"status": "success"
})
except Exception as e:
logger.error(f"Execute failed: {e}")
return self.fail_response(f"Failed to execute: {str(e)}")
Real-World Example: API Integration Tool
Here’s a complete example integrating with an external API:import httpx
from typing import Optional, Dict, Any, List
from core.agentpress.tool import Tool, ToolResult, openapi_schema, tool_metadata
from core.utils.logger import logger
@tool_metadata(
display_name="GitHub API",
description="Interact with GitHub repositories, issues, and pull requests",
icon="Github",
color="bg-gray-100 dark:bg-gray-800/50",
weight=40,
visible=True,
usage_guide="""## GitHub API Tool
Access GitHub repositories, issues, PRs, and more.
### Authentication
Requires GITHUB_TOKEN environment variable with repo scope.
### Available Methods
- search_repositories: Search for repos
- get_repository: Get repo details
- create_issue: Create an issue
- list_pull_requests: List PRs for a repo
### Rate Limits
- Authenticated: 5,000 requests/hour
- Unauthenticated: 60 requests/hour
"""
)
class GitHubTool(Tool):
def __init__(self, github_token: Optional[str] = None):
super().__init__()
self.token = github_token or os.getenv('GITHUB_TOKEN')
self.base_url = "https://api.github.com"
self.headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {self.token}" if self.token else None
}
@openapi_schema({
"type": "function",
"function": {
"name": "search_repositories",
"description": "Search GitHub repositories",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'machine learning language:python')"
},
"sort": {
"type": "string",
"enum": ["stars", "forks", "updated"],
"description": "Sort results by",
"default": "stars"
},
"limit": {
"type": "integer",
"description": "Max results to return",
"default": 10
}
},
"required": ["query"]
}
}
})
async def search_repositories(
self,
query: str,
sort: str = "stars",
limit: int = 10
) -> ToolResult:
"""Search GitHub repositories."""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/search/repositories",
params={"q": query, "sort": sort, "per_page": limit},
headers=self.headers
)
response.raise_for_status()
data = response.json()
repos = [{
"name": item["full_name"],
"description": item["description"],
"stars": item["stargazers_count"],
"forks": item["forks_count"],
"url": item["html_url"]
} for item in data["items"]]
return self.success_response({
"repositories": repos,
"total_count": data["total_count"]
})
except httpx.HTTPError as e:
logger.error(f"GitHub API error: {e}")
return self.fail_response(f"Failed to search repositories: {str(e)}")
@openapi_schema({
"type": "function",
"function": {
"name": "create_issue",
"description": "Create an issue in a repository",
"parameters": {
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository (owner/repo format)"
},
"title": {
"type": "string",
"description": "Issue title"
},
"body": {
"type": "string",
"description": "Issue description"
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "Labels to add"
}
},
"required": ["repo", "title"]
}
}
})
async def create_issue(
self,
repo: str,
title: str,
body: Optional[str] = None,
labels: Optional[List[str]] = None
) -> ToolResult:
"""Create a GitHub issue."""
try:
payload = {
"title": title,
"body": body or "",
"labels": labels or []
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/repos/{repo}/issues",
json=payload,
headers=self.headers
)
response.raise_for_status()
issue = response.json()
return self.success_response({
"issue_number": issue["number"],
"url": issue["html_url"],
"state": issue["state"]
})
except httpx.HTTPError as e:
logger.error(f"Failed to create issue: {e}")
return self.fail_response(f"Issue creation failed: {str(e)}")
Advanced Patterns
1. Sandbox-Aware Tools
For tools that need access to the sandbox environment:from core.sandbox.tool_base import SandboxToolsBase
from core.agentpress.thread_manager import ThreadManager
@tool_metadata(
display_name="File Processor",
description="Process files in the sandbox"
)
class FileProcessorTool(SandboxToolsBase):
def __init__(self, project_id: str, thread_manager: ThreadManager):
super().__init__(project_id, thread_manager)
@openapi_schema({...})
async def process_file(self, filepath: str) -> ToolResult:
# Access sandbox
await self._ensure_sandbox()
# Read file from sandbox
content = await self.sandbox.fs.read_file(filepath)
# Process and return
result = self._process_content(content)
return self.success_response(result)
2. Stateful Tools
Tools that maintain state across calls:@tool_metadata(
display_name="Conversation Memory",
description="Store and retrieve conversation context"
)
class ConversationMemoryTool(Tool):
def __init__(self):
super().__init__()
self.memory: Dict[str, List[Dict]] = {}
@openapi_schema({...})
async def store_memory(
self,
conversation_id: str,
key: str,
value: Any
) -> ToolResult:
"""Store information in conversation memory."""
if conversation_id not in self.memory:
self.memory[conversation_id] = []
self.memory[conversation_id].append({
"key": key,
"value": value,
"timestamp": time.time()
})
return self.success_response({"stored": True})
@openapi_schema({...})
async def recall_memory(
self,
conversation_id: str,
key: Optional[str] = None
) -> ToolResult:
"""Retrieve stored information."""
if conversation_id not in self.memory:
return self.success_response({"memories": []})
memories = self.memory[conversation_id]
if key:
memories = [m for m in memories if m["key"] == key]
return self.success_response({"memories": memories})
3. Tools with Method-Level Metadata
from core.agentpress.tool import method_metadata
@tool_metadata(...)
class AdvancedTool(Tool):
@method_metadata(
display_name="Advanced Search",
description="Search with advanced filters",
is_core=False,
visible=True
)
@openapi_schema({...})
async def advanced_search(self, query: str) -> ToolResult:
pass
@method_metadata(
display_name="Internal Helper",
description="Internal method not shown in UI",
visible=False # Hidden from UI
)
@openapi_schema({...})
async def internal_helper(self, data: Any) -> ToolResult:
pass
Registering Custom Tools
Add to Tool Registry
Editcore/tools/tool_registry.py:
CUSTOM_TOOLS = [
('github_tool', 'core.tools.github_tool', 'GitHubTool'),
('database_tool', 'core.tools.database_tool', 'DatabaseQueryTool'),
]
ALL_TOOLS = CORE_TOOLS + SANDBOX_TOOLS + SEARCH_TOOLS + UTILITY_TOOLS + CUSTOM_TOOLS
Dynamic Registration
from core.tools.tool_registry import get_all_tools
# Get tool class
tool_class = get_tool_class('core.tools.github_tool', 'GitHubTool')
# Instantiate
github_tool = tool_class(github_token='xxx')
# Register with thread manager
thread_manager.tool_registry.register_tool('github', github_tool)
Testing Custom Tools
import pytest
from core.tools.github_tool import GitHubTool
@pytest.mark.asyncio
async def test_search_repositories():
tool = GitHubTool(github_token="test_token")
result = await tool.search_repositories(
query="python machine learning",
sort="stars",
limit=5
)
assert result.success is True
data = json.loads(result.output)
assert "repositories" in data
assert len(data["repositories"]) <= 5
@pytest.mark.asyncio
async def test_create_issue():
tool = GitHubTool(github_token="test_token")
result = await tool.create_issue(
repo="owner/repo",
title="Test Issue",
body="Test description"
)
assert result.success is True
data = json.loads(result.output)
assert "issue_number" in data
Best Practices
1. Error Handling
try:
# Operation
result = await operation()
return self.success_response(result)
except SpecificException as e:
logger.error(f"Specific error: {e}")
return self.fail_response(f"Specific error message: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return self.fail_response(f"Unexpected error: {str(e)}")
2. Input Validation
def _validate_input(self, param: str) -> bool:
if not param or len(param) < 3:
return False
if not re.match(r'^[a-zA-Z0-9_]+$', param):
return False
return True
async def my_method(self, param: str) -> ToolResult:
if not self._validate_input(param):
return self.fail_response("Invalid input format")
# Continue...
3. Async Operations
# Good - proper async usage
async def fetch_data(self, url: str) -> ToolResult:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return self.success_response(response.json())
# Bad - blocking operation
def fetch_data_blocking(self, url: str) -> ToolResult:
response = requests.get(url) # Blocks event loop!
return self.success_response(response.json())
4. Resource Cleanup
class MyTool(Tool):
def __init__(self):
super().__init__()
self.connection = None
async def cleanup(self):
"""Clean up resources."""
if self.connection:
await self.connection.close()
self.connection = None
5. Comprehensive Documentation
usage_guide="""## Tool Name
Brief overview of what this tool does.
### When to Use
- Use case 1
- Use case 2
### When NOT to Use
- Scenario where another tool is better
### Methods
#### method_name
Detailed description of the method.
**Parameters:**
- `param1` - What it does
- `param2` - What it does
**Returns:**
{"key": "description of return value"}
**Example:**
### Best Practices
1. Practice 1
2. Practice 2
### Common Pitfalls
- Pitfall 1 and how to avoid it
- Pitfall 2 and how to avoid it
"""
Next Steps
- MCP Integration - Integrate external MCP servers
- Built-in Tools - Learn from existing tools
- Composio Integration - Use Composio platform