Component Model
All AutoGen extensions implement theComponent interface, which provides:
- Configuration serialization: Save and load component state
- Lifecycle management: Start, stop, and restart components
- Type safety: Pydantic-based configuration validation
Creating a Custom Model Client
Model clients implement theChatCompletionClient protocol.
Basic Structure
from typing import AsyncGenerator, List, Sequence
from autogen_core import Component, CancellationToken
from autogen_core.models import (
ChatCompletionClient,
CreateResult,
LLMMessage,
ModelCapabilities,
RequestUsage,
)
from pydantic import BaseModel
from typing_extensions import Self
class MyModelConfig(BaseModel):
"""Configuration for MyModelClient."""
model: str
api_key: str
temperature: float = 0.7
max_tokens: int = 1024
class MyModelClient(ChatCompletionClient, Component[MyModelConfig]):
"""Custom model client implementation."""
component_config_schema = MyModelConfig
component_provider_override = "mypackage.MyModelClient"
def __init__(
self,
model: str,
api_key: str,
temperature: float = 0.7,
max_tokens: int = 1024,
):
self._model = model
self._api_key = api_key
self._temperature = temperature
self._max_tokens = max_tokens
async def create(
self,
messages: Sequence[LLMMessage],
cancellation_token: CancellationToken,
**kwargs,
) -> CreateResult:
"""Generate a completion."""
# Implement your API call here
response = await self._call_api(messages)
return CreateResult(
content=response["content"],
usage=RequestUsage(
prompt_tokens=response["usage"]["prompt_tokens"],
completion_tokens=response["usage"]["completion_tokens"],
),
finish_reason="stop",
cached=False,
)
async def create_stream(
self,
messages: Sequence[LLMMessage],
cancellation_token: CancellationToken,
**kwargs,
) -> AsyncGenerator[str | CreateResult, None]:
"""Generate a streaming completion."""
async for chunk in self._stream_api(messages):
if chunk["type"] == "content":
yield chunk["data"]
elif chunk["type"] == "done":
yield CreateResult(
content="",
usage=RequestUsage(
prompt_tokens=chunk["usage"]["prompt_tokens"],
completion_tokens=chunk["usage"]["completion_tokens"],
),
finish_reason="stop",
cached=False,
)
@property
def capabilities(self) -> ModelCapabilities:
"""Return model capabilities."""
return ModelCapabilities(
vision=False,
function_calling=True,
json_output=True,
)
def count_tokens(self, messages: Sequence[LLMMessage]) -> int:
"""Count tokens in messages."""
# Implement token counting
return sum(len(str(msg.content).split()) for msg in messages)
def remaining_tokens(self, messages: Sequence[LLMMessage]) -> int:
"""Calculate remaining tokens."""
return self._max_tokens - self.count_tokens(messages)
@property
def model(self) -> str:
"""Return model name."""
return self._model
def _to_config(self) -> MyModelConfig:
"""Export configuration."""
return MyModelConfig(
model=self._model,
api_key=self._api_key,
temperature=self._temperature,
max_tokens=self._max_tokens,
)
@classmethod
def _from_config(cls, config: MyModelConfig) -> Self:
"""Create from configuration."""
return cls(
model=config.model,
api_key=config.api_key,
temperature=config.temperature,
max_tokens=config.max_tokens,
)
async def _call_api(self, messages: Sequence[LLMMessage]) -> dict:
"""Make API call to your LLM service."""
# Implement your API logic
raise NotImplementedError
async def _stream_api(self, messages: Sequence[LLMMessage]):
"""Stream API responses."""
# Implement your streaming logic
raise NotImplementedError
Message Conversion
Convert AutoGen messages to your API format:from autogen_core.models import (
SystemMessage,
UserMessage,
AssistantMessage,
FunctionExecutionResultMessage,
)
def convert_messages(messages: Sequence[LLMMessage]) -> List[dict]:
"""Convert AutoGen messages to API format."""
result = []
for msg in messages:
if isinstance(msg, SystemMessage):
result.append({
"role": "system",
"content": msg.content,
})
elif isinstance(msg, UserMessage):
result.append({
"role": "user",
"content": msg.content,
})
elif isinstance(msg, AssistantMessage):
result.append({
"role": "assistant",
"content": msg.content,
"tool_calls": msg.function_calls if msg.function_calls else None,
})
elif isinstance(msg, FunctionExecutionResultMessage):
result.append({
"role": "tool",
"content": msg.content,
})
return result
Tool Support
Handle function calling:from autogen_core import FunctionCall
from autogen_core.tools import Tool, ToolSchema
async def create(
self,
messages: Sequence[LLMMessage],
cancellation_token: CancellationToken,
tools: Sequence[Tool] | None = None,
**kwargs,
) -> CreateResult:
# Convert tools to API format
api_tools = None
if tools:
api_tools = [self._convert_tool(tool) for tool in tools]
response = await self._call_api(messages, tools=api_tools)
# Handle tool calls in response
function_calls = []
if "tool_calls" in response:
for tool_call in response["tool_calls"]:
function_calls.append(
FunctionCall(
id=tool_call["id"],
name=tool_call["function"]["name"],
arguments=tool_call["function"]["arguments"],
)
)
return CreateResult(
content=response["content"],
usage=RequestUsage(
prompt_tokens=response["usage"]["prompt_tokens"],
completion_tokens=response["usage"]["completion_tokens"],
),
finish_reason="tool_calls" if function_calls else "stop",
cached=False,
function_calls=function_calls,
)
def _convert_tool(self, tool: Tool) -> dict:
"""Convert AutoGen tool to API format."""
schema = tool.schema
return {
"type": "function",
"function": {
"name": schema["name"],
"description": schema.get("description", ""),
"parameters": schema.get("parameters", {}),
},
}
Creating a Custom Code Executor
Code executors implement theCodeExecutor protocol.
Basic Structure
from pathlib import Path
from typing import List, Optional
from autogen_core import Component, CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from pydantic import BaseModel
from typing_extensions import Self
class MyExecutorConfig(BaseModel):
"""Configuration for MyExecutor."""
timeout: int = 60
work_dir: Optional[str] = None
class MyCodeResult(BaseModel):
"""Execution result."""
exit_code: int
output: str
code_file: Optional[str] = None
class MyCodeExecutor(CodeExecutor, Component[MyExecutorConfig]):
"""Custom code executor implementation."""
component_config_schema = MyExecutorConfig
component_provider_override = "mypackage.MyCodeExecutor"
SUPPORTED_LANGUAGES = ["python", "bash"]
def __init__(
self,
timeout: int = 60,
work_dir: Optional[Path] = None,
):
self._timeout = timeout
self._work_dir = work_dir or Path.cwd()
self._started = False
async def execute_code_blocks(
self,
code_blocks: List[CodeBlock],
cancellation_token: CancellationToken,
) -> MyCodeResult:
"""Execute code blocks."""
if not self._started:
raise RuntimeError("Executor not started")
outputs = []
exit_code = 0
for block in code_blocks:
if block.language not in self.SUPPORTED_LANGUAGES:
return MyCodeResult(
exit_code=1,
output=f"Unsupported language: {block.language}",
)
result = await self._execute_block(block, cancellation_token)
outputs.append(result["output"])
if result["exit_code"] != 0:
exit_code = result["exit_code"]
break
return MyCodeResult(
exit_code=exit_code,
output="\n".join(outputs),
)
async def _execute_block(
self,
block: CodeBlock,
cancellation_token: CancellationToken,
) -> dict:
"""Execute a single code block."""
# Implement your execution logic
raise NotImplementedError
async def start(self) -> None:
"""Start the executor."""
self._started = True
# Initialize resources
async def stop(self) -> None:
"""Stop the executor."""
self._started = False
# Clean up resources
async def restart(self) -> None:
"""Restart the executor."""
await self.stop()
await self.start()
def _to_config(self) -> MyExecutorConfig:
"""Export configuration."""
return MyExecutorConfig(
timeout=self._timeout,
work_dir=str(self._work_dir),
)
@classmethod
def _from_config(cls, config: MyExecutorConfig) -> Self:
"""Create from configuration."""
return cls(
timeout=config.timeout,
work_dir=Path(config.work_dir) if config.work_dir else None,
)
Sandboxing
Implement safe execution:import asyncio
import subprocess
from pathlib import Path
async def _execute_block(
self,
block: CodeBlock,
cancellation_token: CancellationToken,
) -> dict:
"""Execute code block safely."""
# Write code to file
code_file = self._work_dir / f"code_{hash(block.code)}.py"
code_file.write_text(block.code)
try:
# Execute with timeout
process = await asyncio.create_subprocess_exec(
"python",
str(code_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self._work_dir,
)
# Link cancellation token
cancellation_token.link_future(asyncio.create_task(process.wait()))
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self._timeout,
)
return {
"exit_code": process.returncode or 0,
"output": stdout.decode() + stderr.decode(),
}
except asyncio.TimeoutError:
process.kill()
return {
"exit_code": 124,
"output": "Execution timed out",
}
finally:
# Cleanup
code_file.unlink(missing_ok=True)
Creating a Custom Tool
Tools implement theBaseTool interface.
Basic Structure
from typing import Any
from autogen_core import CancellationToken, Component
from autogen_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing_extensions import Self
class MyToolArgs(BaseModel):
"""Arguments for MyTool."""
query: str = Field(..., description="Search query")
max_results: int = Field(10, description="Maximum results to return")
class MyToolResult(BaseModel):
"""Result from MyTool."""
results: list[dict]
count: int
class MyToolConfig(BaseModel):
"""Configuration for MyTool."""
api_key: str
timeout: float = 30.0
class MyTool(BaseTool[MyToolArgs, MyToolResult], Component[MyToolConfig]):
"""Custom tool implementation."""
component_config_schema = MyToolConfig
component_provider_override = "mypackage.MyTool"
def __init__(self, api_key: str, timeout: float = 30.0):
self._api_key = api_key
self._timeout = timeout
async def run(self, args: MyToolArgs, cancellation_token: CancellationToken) -> MyToolResult:
"""Execute the tool."""
# Implement your tool logic
results = await self._search(args.query, args.max_results)
return MyToolResult(
results=results,
count=len(results),
)
async def _search(self, query: str, max_results: int) -> list[dict]:
"""Perform search."""
# Implement your search logic
raise NotImplementedError
def _to_config(self) -> MyToolConfig:
"""Export configuration."""
return MyToolConfig(
api_key=self._api_key,
timeout=self._timeout,
)
@classmethod
def _from_config(cls, config: MyToolConfig) -> Self:
"""Create from configuration."""
return cls(
api_key=config.api_key,
timeout=config.timeout,
)
Tool Schema
The schema is automatically generated from yourArgs model:
tool = MyTool(api_key="...")
schema = tool.schema
# Schema includes:
# - name: Tool name
# - description: Tool description
# - parameters: JSON schema from MyToolArgs
Using with Agents
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
tool = MyTool(api_key="...")
agent = AssistantAgent(
name="assistant",
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
tools=[tool],
)
Best Practices
Type Safety
Use Pydantic models for configuration:from pydantic import BaseModel, Field, SecretStr
class MyConfig(BaseModel):
api_key: SecretStr # Secure string handling
timeout: float = Field(30.0, gt=0, le=300) # Validation
max_retries: int = Field(3, ge=0)
Error Handling
Provide clear error messages:async def create(self, messages, cancellation_token, **kwargs):
try:
return await self._call_api(messages)
except ConnectionError as e:
raise RuntimeError(
f"Failed to connect to API: {e}. "
"Check your network connection and API endpoint."
) from e
except TimeoutError as e:
raise RuntimeError(
f"API request timed out after {self._timeout}s. "
"Consider increasing the timeout value."
) from e
Logging
Use AutoGen’s logging infrastructure:import logging
from autogen_core import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME
logger = logging.getLogger(EVENT_LOGGER_NAME)
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)
class MyClient:
async def create(self, messages, cancellation_token):
logger.info(f"Creating completion with {len(messages)} messages")
trace_logger.debug(f"Messages: {messages}")
result = await self._call_api(messages)
logger.info(f"Completion created: {result.usage}")
return result
Resource Management
Implement proper lifecycle methods:class MyExecutor:
def __init__(self):
self._process = None
self._temp_dir = None
async def start(self) -> None:
"""Initialize resources."""
self._temp_dir = tempfile.mkdtemp()
self._process = await self._start_process()
async def stop(self) -> None:
"""Clean up resources."""
if self._process:
self._process.terminate()
await self._process.wait()
if self._temp_dir:
shutil.rmtree(self._temp_dir)
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
Testing
Write comprehensive tests:import pytest
from autogen_core import CancellationToken
@pytest.mark.asyncio
async def test_my_client():
client = MyModelClient(model="test", api_key="test")
messages = [UserMessage(content="Hello", source="user")]
result = await client.create(messages, CancellationToken())
assert result.content
assert result.usage.prompt_tokens > 0
assert result.finish_reason == "stop"
@pytest.mark.asyncio
async def test_my_executor():
executor = MyCodeExecutor()
await executor.start()
try:
result = await executor.execute_code_blocks(
[CodeBlock(language="python", code="print('test')")],
CancellationToken(),
)
assert result.exit_code == 0
assert "test" in result.output
finally:
await executor.stop()
Publishing Extensions
Package Structure
my-autogen-extension/
├── pyproject.toml
├── README.md
├── src/
│ └── autogen_my_ext/
│ ├── __init__.py
│ ├── models/
│ │ └── my_client.py
│ └── tools/
│ └── my_tool.py
└── tests/
├── test_client.py
└── test_tool.py
pyproject.toml
[project]
name = "autogen-my-ext"
version = "0.1.0"
description = "My AutoGen extension"
requires-python = ">=3.10"
dependencies = [
"autogen-core>=0.4.0",
"autogen-ext>=0.4.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-asyncio>=0.21",
]
Next Steps
Model Clients
See existing model client implementations
Code Executors
See existing code executor implementations
Tools
See existing tool implementations
Extensions Overview
Back to extensions overview