Skip to main content

Building Custom Agents

While AutoGen provides built-in agents like AssistantAgent and CodeExecutorAgent, you can create custom agents with specialized behaviors by extending the BaseChatAgent class.

Understanding BaseChatAgent

All agents in AutoGen inherit from BaseChatAgent and must implement:
  • on_messages: Defines agent behavior in response to messages
  • on_reset: Resets the agent to its initial state
  • produced_message_types: List of message types the agent can produce
Optionally, implement on_messages_stream for streaming responses.

Simple Custom Agent Example

Here’s a countdown agent that produces a stream of messages:
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_core import CancellationToken


class CountDownAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts down.")
        self._count = count

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        # Calls the on_messages_stream
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
        for i in range(self._count, 0, -1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        # Return response at the end with all inner messages
        yield Response(
            chat_message=TextMessage(content="Done!", source=self.name), 
            inner_messages=inner_messages
        )

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


# Usage
async def main():
    countdown_agent = CountDownAgent("countdown", count=5)
    async for message in countdown_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(f"Final: {message.chat_message.content}")
        else:
            print(message.content)

Arithmetic Agent Example

A more practical example - an agent that performs arithmetic operations:
from typing import Callable, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage
from autogen_core import CancellationToken


class ArithmeticAgent(BaseChatAgent):
    def __init__(
        self, 
        name: str, 
        description: str, 
        operator_func: Callable[[int], int]
    ) -> None:
        super().__init__(name, description=description)
        self._operator_func = operator_func
        self._message_history: List[BaseChatMessage] = []

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        # Update message history
        self._message_history.extend(messages)
        
        # Parse number from last message
        assert isinstance(self._message_history[-1], TextMessage)
        number = int(self._message_history[-1].content)
        
        # Apply operation
        result = self._operator_func(number)
        
        # Create response
        response_message = TextMessage(content=str(result), source=self.name)
        self._message_history.append(response_message)
        
        return Response(chat_message=response_message)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        self._message_history.clear()


# Create specialized agents
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies by 2", lambda x: x * 2)
divide_agent = ArithmeticAgent("divide_agent", "Divides by 2, rounds down", lambda x: x // 2)
The on_messages method may be called with an empty list, meaning the agent was called previously without new messages. Always maintain message history internally.

Custom Agent with LLM

Create custom agents that use LLMs with custom behavior:
import asyncio
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage
from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient, UserMessage


class CustomLLMAgent(BaseChatAgent):
    def __init__(
        self, 
        name: str, 
        model_client: ChatCompletionClient,
        system_prompt: str = "You are a helpful assistant."
    ):
        super().__init__(name, "Custom LLM-powered agent")
        self._model_client = model_client
        self._system_prompt = system_prompt
        self._history: List[BaseChatMessage] = []

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        self._history.extend(messages)
        
        # Prepare messages for model
        model_messages = [
            UserMessage(content=self._system_prompt, source="system")
        ]
        for msg in self._history:
            if isinstance(msg, TextMessage):
                model_messages.append(
                    UserMessage(content=msg.content, source=msg.source)
                )
        
        # Get model response
        result = await self._model_client.create(model_messages)
        response_text = result.content
        
        response_message = TextMessage(content=response_text, source=self.name)
        self._history.append(response_message)
        
        return Response(chat_message=response_message)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        self._history.clear()


# Usage
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = CustomLLMAgent(
        "custom_assistant", 
        model_client,
        system_prompt="You are a helpful coding assistant."
    )
    
    response = await agent.on_messages(
        [TextMessage(content="Write a hello world in Python", source="user")],
        CancellationToken()
    )
    print(response.chat_message.content)
    
    await model_client.close()

State Management

Implement state persistence for your custom agents:
from typing import Any, Mapping

class StatefulAgent(BaseChatAgent):
    def __init__(self, name: str, initial_count: int = 0):
        super().__init__(name, "Agent with state")
        self._count = initial_count
        self._history: List[BaseChatMessage] = []

    async def save_state(self) -> Mapping[str, Any]:
        return {
            "count": self._count,
            "history": [msg.model_dump() for msg in self._history]
        }

    async def load_state(self, state: Mapping[str, Any]) -> None:
        self._count = state["count"]
        # Reconstruct messages from state
        self._history = [
            TextMessage.model_validate(msg) for msg in state["history"]
        ]

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        self._history.extend(messages)
        self._count += len(messages)
        
        response_message = TextMessage(
            content=f"Processed {self._count} total messages", 
            source=self.name
        )
        return Response(chat_message=response_message)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        self._count = 0
        self._history.clear()

Using Custom Agents in Teams

Custom agents work seamlessly with AutoGen’s team patterns:
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    # Create custom agents
    add_agent = ArithmeticAgent("add_agent", "Adds 1", lambda x: x + 1)
    multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies by 2", lambda x: x * 2)
    divide_agent = ArithmeticAgent("divide_agent", "Divides by 2", lambda x: x // 2)
    
    # Create team with custom agents
    team = SelectorGroupChat(
        [add_agent, multiply_agent, divide_agent],
        model_client=OpenAIChatCompletionClient(model="gpt-4o"),
        termination_condition=MaxMessageTermination(10),
        allow_repeated_speaker=True
    )
    
    # Run the team
    result = await team.run(
        task=[TextMessage(content="Transform 10 into 25", source="user")]
    )
    print(f"Final result: {result.messages[-1].content}")

Best Practices

1
Maintain Message History
2
Always track message history internally, as on_messages may be called with empty lists.
3
Handle Cancellation
4
Respect the CancellationToken for long-running operations:
5
async def on_messages(self, messages, cancellation_token):
    for i in range(100):
        if cancellation_token.is_cancelled():
            raise asyncio.CancelledError()
        # Do work
        await asyncio.sleep(0.1)
6
Implement Proper Reset
7
Clear all state in on_reset to ensure clean restarts:
8
async def on_reset(self, cancellation_token: CancellationToken) -> None:
    self._history.clear()
    self._counter = 0
    self._cache.clear()
9
Use Type Hints
10
Properly specify message types your agent produces:
11
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
    return (TextMessage, ToolCallSummaryMessage)
Custom agents are not thread-safe or coroutine-safe. Do not share agents between multiple tasks or call methods concurrently.

Advanced: Nested Agents

Create agents that contain other agents or teams:
class NestedAgent(BaseChatAgent):
    def __init__(self, name: str, inner_team: RoundRobinGroupChat):
        super().__init__(name, "Agent with nested team")
        self._inner_team = inner_team

    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        # Run inner team with messages
        result = await self._inner_team.run(
            task=messages, 
            cancellation_token=cancellation_token
        )
        
        # Return last message from inner team
        return Response(
            chat_message=result.messages[-1],
            inner_messages=result.messages[:-1]
        )

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        await self._inner_team.reset()

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

Build docs developers (and LLMs) love