Skip to main content
Learn how to create custom extensions that integrate seamlessly with AutoGen’s architecture.

Component Model

All AutoGen extensions implement the Component interface, which provides:
  • Configuration serialization: Save and load component state
  • Lifecycle management: Start, stop, and restart components
  • Type safety: Pydantic-based configuration validation

Creating a Custom Model Client

Model clients implement the ChatCompletionClient protocol.

Basic Structure

from typing import AsyncGenerator, List, Sequence
from autogen_core import Component, CancellationToken
from autogen_core.models import (
    ChatCompletionClient,
    CreateResult,
    LLMMessage,
    ModelCapabilities,
    RequestUsage,
)
from pydantic import BaseModel
from typing_extensions import Self

class MyModelConfig(BaseModel):
    """Configuration for MyModelClient."""
    model: str
    api_key: str
    temperature: float = 0.7
    max_tokens: int = 1024

class MyModelClient(ChatCompletionClient, Component[MyModelConfig]):
    """Custom model client implementation."""
    
    component_config_schema = MyModelConfig
    component_provider_override = "mypackage.MyModelClient"
    
    def __init__(
        self,
        model: str,
        api_key: str,
        temperature: float = 0.7,
        max_tokens: int = 1024,
    ):
        self._model = model
        self._api_key = api_key
        self._temperature = temperature
        self._max_tokens = max_tokens
    
    async def create(
        self,
        messages: Sequence[LLMMessage],
        cancellation_token: CancellationToken,
        **kwargs,
    ) -> CreateResult:
        """Generate a completion."""
        # Implement your API call here
        response = await self._call_api(messages)
        
        return CreateResult(
            content=response["content"],
            usage=RequestUsage(
                prompt_tokens=response["usage"]["prompt_tokens"],
                completion_tokens=response["usage"]["completion_tokens"],
            ),
            finish_reason="stop",
            cached=False,
        )
    
    async def create_stream(
        self,
        messages: Sequence[LLMMessage],
        cancellation_token: CancellationToken,
        **kwargs,
    ) -> AsyncGenerator[str | CreateResult, None]:
        """Generate a streaming completion."""
        async for chunk in self._stream_api(messages):
            if chunk["type"] == "content":
                yield chunk["data"]
            elif chunk["type"] == "done":
                yield CreateResult(
                    content="",
                    usage=RequestUsage(
                        prompt_tokens=chunk["usage"]["prompt_tokens"],
                        completion_tokens=chunk["usage"]["completion_tokens"],
                    ),
                    finish_reason="stop",
                    cached=False,
                )
    
    @property
    def capabilities(self) -> ModelCapabilities:
        """Return model capabilities."""
        return ModelCapabilities(
            vision=False,
            function_calling=True,
            json_output=True,
        )
    
    def count_tokens(self, messages: Sequence[LLMMessage]) -> int:
        """Count tokens in messages."""
        # Implement token counting
        return sum(len(str(msg.content).split()) for msg in messages)
    
    def remaining_tokens(self, messages: Sequence[LLMMessage]) -> int:
        """Calculate remaining tokens."""
        return self._max_tokens - self.count_tokens(messages)
    
    @property
    def model(self) -> str:
        """Return model name."""
        return self._model
    
    def _to_config(self) -> MyModelConfig:
        """Export configuration."""
        return MyModelConfig(
            model=self._model,
            api_key=self._api_key,
            temperature=self._temperature,
            max_tokens=self._max_tokens,
        )
    
    @classmethod
    def _from_config(cls, config: MyModelConfig) -> Self:
        """Create from configuration."""
        return cls(
            model=config.model,
            api_key=config.api_key,
            temperature=config.temperature,
            max_tokens=config.max_tokens,
        )
    
    async def _call_api(self, messages: Sequence[LLMMessage]) -> dict:
        """Make API call to your LLM service."""
        # Implement your API logic
        raise NotImplementedError
    
    async def _stream_api(self, messages: Sequence[LLMMessage]):
        """Stream API responses."""
        # Implement your streaming logic
        raise NotImplementedError

Message Conversion

Convert AutoGen messages to your API format:
from autogen_core.models import (
    SystemMessage,
    UserMessage,
    AssistantMessage,
    FunctionExecutionResultMessage,
)

def convert_messages(messages: Sequence[LLMMessage]) -> List[dict]:
    """Convert AutoGen messages to API format."""
    result = []
    
    for msg in messages:
        if isinstance(msg, SystemMessage):
            result.append({
                "role": "system",
                "content": msg.content,
            })
        elif isinstance(msg, UserMessage):
            result.append({
                "role": "user",
                "content": msg.content,
            })
        elif isinstance(msg, AssistantMessage):
            result.append({
                "role": "assistant",
                "content": msg.content,
                "tool_calls": msg.function_calls if msg.function_calls else None,
            })
        elif isinstance(msg, FunctionExecutionResultMessage):
            result.append({
                "role": "tool",
                "content": msg.content,
            })
    
    return result

Tool Support

Handle function calling:
from autogen_core import FunctionCall
from autogen_core.tools import Tool, ToolSchema

async def create(
    self,
    messages: Sequence[LLMMessage],
    cancellation_token: CancellationToken,
    tools: Sequence[Tool] | None = None,
    **kwargs,
) -> CreateResult:
    # Convert tools to API format
    api_tools = None
    if tools:
        api_tools = [self._convert_tool(tool) for tool in tools]
    
    response = await self._call_api(messages, tools=api_tools)
    
    # Handle tool calls in response
    function_calls = []
    if "tool_calls" in response:
        for tool_call in response["tool_calls"]:
            function_calls.append(
                FunctionCall(
                    id=tool_call["id"],
                    name=tool_call["function"]["name"],
                    arguments=tool_call["function"]["arguments"],
                )
            )
    
    return CreateResult(
        content=response["content"],
        usage=RequestUsage(
            prompt_tokens=response["usage"]["prompt_tokens"],
            completion_tokens=response["usage"]["completion_tokens"],
        ),
        finish_reason="tool_calls" if function_calls else "stop",
        cached=False,
        function_calls=function_calls,
    )

def _convert_tool(self, tool: Tool) -> dict:
    """Convert AutoGen tool to API format."""
    schema = tool.schema
    return {
        "type": "function",
        "function": {
            "name": schema["name"],
            "description": schema.get("description", ""),
            "parameters": schema.get("parameters", {}),
        },
    }

Creating a Custom Code Executor

Code executors implement the CodeExecutor protocol.

Basic Structure

from pathlib import Path
from typing import List, Optional
from autogen_core import Component, CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from pydantic import BaseModel
from typing_extensions import Self

class MyExecutorConfig(BaseModel):
    """Configuration for MyExecutor."""
    timeout: int = 60
    work_dir: Optional[str] = None

class MyCodeResult(BaseModel):
    """Execution result."""
    exit_code: int
    output: str
    code_file: Optional[str] = None

class MyCodeExecutor(CodeExecutor, Component[MyExecutorConfig]):
    """Custom code executor implementation."""
    
    component_config_schema = MyExecutorConfig
    component_provider_override = "mypackage.MyCodeExecutor"
    
    SUPPORTED_LANGUAGES = ["python", "bash"]
    
    def __init__(
        self,
        timeout: int = 60,
        work_dir: Optional[Path] = None,
    ):
        self._timeout = timeout
        self._work_dir = work_dir or Path.cwd()
        self._started = False
    
    async def execute_code_blocks(
        self,
        code_blocks: List[CodeBlock],
        cancellation_token: CancellationToken,
    ) -> MyCodeResult:
        """Execute code blocks."""
        if not self._started:
            raise RuntimeError("Executor not started")
        
        outputs = []
        exit_code = 0
        
        for block in code_blocks:
            if block.language not in self.SUPPORTED_LANGUAGES:
                return MyCodeResult(
                    exit_code=1,
                    output=f"Unsupported language: {block.language}",
                )
            
            result = await self._execute_block(block, cancellation_token)
            outputs.append(result["output"])
            
            if result["exit_code"] != 0:
                exit_code = result["exit_code"]
                break
        
        return MyCodeResult(
            exit_code=exit_code,
            output="\n".join(outputs),
        )
    
    async def _execute_block(
        self,
        block: CodeBlock,
        cancellation_token: CancellationToken,
    ) -> dict:
        """Execute a single code block."""
        # Implement your execution logic
        raise NotImplementedError
    
    async def start(self) -> None:
        """Start the executor."""
        self._started = True
        # Initialize resources
    
    async def stop(self) -> None:
        """Stop the executor."""
        self._started = False
        # Clean up resources
    
    async def restart(self) -> None:
        """Restart the executor."""
        await self.stop()
        await self.start()
    
    def _to_config(self) -> MyExecutorConfig:
        """Export configuration."""
        return MyExecutorConfig(
            timeout=self._timeout,
            work_dir=str(self._work_dir),
        )
    
    @classmethod
    def _from_config(cls, config: MyExecutorConfig) -> Self:
        """Create from configuration."""
        return cls(
            timeout=config.timeout,
            work_dir=Path(config.work_dir) if config.work_dir else None,
        )

Sandboxing

Implement safe execution:
import asyncio
import subprocess
from pathlib import Path

async def _execute_block(
    self,
    block: CodeBlock,
    cancellation_token: CancellationToken,
) -> dict:
    """Execute code block safely."""
    # Write code to file
    code_file = self._work_dir / f"code_{hash(block.code)}.py"
    code_file.write_text(block.code)
    
    try:
        # Execute with timeout
        process = await asyncio.create_subprocess_exec(
            "python",
            str(code_file),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=self._work_dir,
        )
        
        # Link cancellation token
        cancellation_token.link_future(asyncio.create_task(process.wait()))
        
        stdout, stderr = await asyncio.wait_for(
            process.communicate(),
            timeout=self._timeout,
        )
        
        return {
            "exit_code": process.returncode or 0,
            "output": stdout.decode() + stderr.decode(),
        }
    except asyncio.TimeoutError:
        process.kill()
        return {
            "exit_code": 124,
            "output": "Execution timed out",
        }
    finally:
        # Cleanup
        code_file.unlink(missing_ok=True)

Creating a Custom Tool

Tools implement the BaseTool interface.

Basic Structure

from typing import Any
from autogen_core import CancellationToken, Component
from autogen_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing_extensions import Self

class MyToolArgs(BaseModel):
    """Arguments for MyTool."""
    query: str = Field(..., description="Search query")
    max_results: int = Field(10, description="Maximum results to return")

class MyToolResult(BaseModel):
    """Result from MyTool."""
    results: list[dict]
    count: int

class MyToolConfig(BaseModel):
    """Configuration for MyTool."""
    api_key: str
    timeout: float = 30.0

class MyTool(BaseTool[MyToolArgs, MyToolResult], Component[MyToolConfig]):
    """Custom tool implementation."""
    
    component_config_schema = MyToolConfig
    component_provider_override = "mypackage.MyTool"
    
    def __init__(self, api_key: str, timeout: float = 30.0):
        self._api_key = api_key
        self._timeout = timeout
    
    async def run(self, args: MyToolArgs, cancellation_token: CancellationToken) -> MyToolResult:
        """Execute the tool."""
        # Implement your tool logic
        results = await self._search(args.query, args.max_results)
        
        return MyToolResult(
            results=results,
            count=len(results),
        )
    
    async def _search(self, query: str, max_results: int) -> list[dict]:
        """Perform search."""
        # Implement your search logic
        raise NotImplementedError
    
    def _to_config(self) -> MyToolConfig:
        """Export configuration."""
        return MyToolConfig(
            api_key=self._api_key,
            timeout=self._timeout,
        )
    
    @classmethod
    def _from_config(cls, config: MyToolConfig) -> Self:
        """Create from configuration."""
        return cls(
            api_key=config.api_key,
            timeout=config.timeout,
        )

Tool Schema

The schema is automatically generated from your Args model:
tool = MyTool(api_key="...")
schema = tool.schema

# Schema includes:
# - name: Tool name
# - description: Tool description
# - parameters: JSON schema from MyToolArgs

Using with Agents

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

tool = MyTool(api_key="...")

agent = AssistantAgent(
    name="assistant",
    model_client=OpenAIChatCompletionClient(model="gpt-4o"),
    tools=[tool],
)

Best Practices

Type Safety

Use Pydantic models for configuration:
from pydantic import BaseModel, Field, SecretStr

class MyConfig(BaseModel):
    api_key: SecretStr  # Secure string handling
    timeout: float = Field(30.0, gt=0, le=300)  # Validation
    max_retries: int = Field(3, ge=0)

Error Handling

Provide clear error messages:
async def create(self, messages, cancellation_token, **kwargs):
    try:
        return await self._call_api(messages)
    except ConnectionError as e:
        raise RuntimeError(
            f"Failed to connect to API: {e}. "
            "Check your network connection and API endpoint."
        ) from e
    except TimeoutError as e:
        raise RuntimeError(
            f"API request timed out after {self._timeout}s. "
            "Consider increasing the timeout value."
        ) from e

Logging

Use AutoGen’s logging infrastructure:
import logging
from autogen_core import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME

logger = logging.getLogger(EVENT_LOGGER_NAME)
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)

class MyClient:
    async def create(self, messages, cancellation_token):
        logger.info(f"Creating completion with {len(messages)} messages")
        trace_logger.debug(f"Messages: {messages}")
        
        result = await self._call_api(messages)
        
        logger.info(f"Completion created: {result.usage}")
        return result

Resource Management

Implement proper lifecycle methods:
class MyExecutor:
    def __init__(self):
        self._process = None
        self._temp_dir = None
    
    async def start(self) -> None:
        """Initialize resources."""
        self._temp_dir = tempfile.mkdtemp()
        self._process = await self._start_process()
    
    async def stop(self) -> None:
        """Clean up resources."""
        if self._process:
            self._process.terminate()
            await self._process.wait()
        
        if self._temp_dir:
            shutil.rmtree(self._temp_dir)
    
    async def __aenter__(self):
        await self.start()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop()

Testing

Write comprehensive tests:
import pytest
from autogen_core import CancellationToken

@pytest.mark.asyncio
async def test_my_client():
    client = MyModelClient(model="test", api_key="test")
    
    messages = [UserMessage(content="Hello", source="user")]
    result = await client.create(messages, CancellationToken())
    
    assert result.content
    assert result.usage.prompt_tokens > 0
    assert result.finish_reason == "stop"

@pytest.mark.asyncio
async def test_my_executor():
    executor = MyCodeExecutor()
    await executor.start()
    
    try:
        result = await executor.execute_code_blocks(
            [CodeBlock(language="python", code="print('test')")],
            CancellationToken(),
        )
        
        assert result.exit_code == 0
        assert "test" in result.output
    finally:
        await executor.stop()

Publishing Extensions

Package Structure

my-autogen-extension/
├── pyproject.toml
├── README.md
├── src/
│   └── autogen_my_ext/
│       ├── __init__.py
│       ├── models/
│       │   └── my_client.py
│       └── tools/
│           └── my_tool.py
└── tests/
    ├── test_client.py
    └── test_tool.py

pyproject.toml

[project]
name = "autogen-my-ext"
version = "0.1.0"
description = "My AutoGen extension"
requires-python = ">=3.10"
dependencies = [
    "autogen-core>=0.4.0",
    "autogen-ext>=0.4.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "pytest-asyncio>=0.21",
]

Next Steps

Model Clients

See existing model client implementations

Code Executors

See existing code executor implementations

Tools

See existing tool implementations

Extensions Overview

Back to extensions overview

Build docs developers (and LLMs) love