Skip to main content
The autogen_core package provides the foundational components for building distributed, event-driven multi-agent systems.

Agent Classes

Base class for implementing agents with message handling capabilities.
from autogen_core import BaseAgent, AgentId, AgentRuntime, MessageContext

class MyAgent(BaseAgent):
    def __init__(self, agent_id: AgentId, runtime: AgentRuntime):
        super().__init__(agent_id, runtime)
agent_id
AgentId
required
Unique identifier for the agent instance
runtime
AgentRuntime
required
Runtime environment for message passing and agent coordination
id
AgentId
Returns the agent’s unique identifier
Decorator-based agent with automatic message routing.
from autogen_core import RoutedAgent, message_handler, rpc
from dataclasses import dataclass

@dataclass
class TaskMessage:
    task: str

class WorkerAgent(RoutedAgent):
    @message_handler
    async def handle_task(self, message: TaskMessage, ctx: MessageContext) -> str:
        return f"Completed: {message.task}"

    @rpc
    async def get_status(self) -> dict:
        return {"status": "ready"}
description
str
Optional description of the agent’s capabilities

Decorators

@message_handler
decorator
Marks a method as a handler for published messages of the parameter type
@rpc
decorator
Marks a method as an RPC endpoint for direct invocation
@event
decorator
Marks a method as an event handler
Agent implementation using closure functions for simple message handling.
from autogen_core import ClosureAgent, ClosureContext

async def my_handler(ctx: ClosureContext, message: str) -> str:
    await ctx.send_message("processed", recipient_id)
    return "Done"

agent = ClosureAgent("my_agent", my_handler, runtime)
name
str
required
Agent name identifier
handler
Callable
required
Async function to handle incoming messages
runtime
AgentRuntime
required
Runtime environment

Runtime & Messaging

Protocol defining the runtime environment for agent execution and communication.
from autogen_core import AgentRuntime, AgentId, TopicId

async def example(runtime: AgentRuntime):
    # Send direct message
    response = await runtime.send_message(
        message="Hello",
        recipient=AgentId("worker", "default")
    )

    # Publish to topic
    await runtime.publish_message(
        message={"event": "started"},
        topic_id=TopicId("events", "default")
    )

Methods

send_message
async method
Send a message to a specific agent and await response
message
Any
required
The message payload to send
recipient
AgentId
required
Target agent identifier
sender
AgentId | None
Sender agent identifier (None for external)
cancellation_token
CancellationToken | None
Token to cancel the operation
message_id
str | None
Unique message identifier
publish_message
async method
Publish a message to all subscribers of a topic
message
Any
required
The message payload
topic_id
TopicId
required
Topic to publish to
sender
AgentId | None
Sender agent identifier
register_factory
async method
Register an agent factory for dynamic agent creation
type
str | AgentType
required
Unique agent type identifier
agent_factory
Callable
required
Factory function that creates agent instances
expected_class
type[Agent] | None
Expected agent class for type checking
In-process runtime for single-threaded agent execution.
from autogen_core import SingleThreadedAgentRuntime, TypeSubscription

runtime = SingleThreadedAgentRuntime()

# Register agent type
await runtime.register_factory(
    type="worker",
    agent_factory=lambda: WorkerAgent(),
    expected_class=WorkerAgent
)

# Add subscription
await runtime.add_subscription(
    TypeSubscription(topic_type="tasks", agent_type="worker")
)

# Start runtime
runtime.start()
start
method
Start the runtime message processing loop
stop
async method
Gracefully stop the runtime
Context information for message handling.
from autogen_core import MessageContext

@message_handler
async def handle(self, message: str, ctx: MessageContext) -> None:
    print(f"From: {ctx.sender}")
    print(f"Topic: {ctx.topic_id}")
    print(f"Message ID: {ctx.message_id}")
sender
AgentId | None
Identifier of the message sender
topic_id
TopicId | None
Topic the message was published to
message_id
str
Unique message identifier
cancellation_token
CancellationToken
Token for cancelling operations

Identity & Types

Unique identifier for an agent instance.
from autogen_core import AgentId

# Create agent ID
agent_id = AgentId(type="worker", key="instance_1")

# Access components
print(agent_id.type)  # "worker"
print(agent_id.key)   # "instance_1"
type
str
required
Agent type name (alphanumeric and underscores)
key
str
required
Instance key (ASCII 32-126 characters)
Agent type definition for factory registration.
from autogen_core import AgentType

worker_type = AgentType("worker")
name
str
required
Type name identifier
Topic identifier for pub/sub messaging.
from autogen_core import TopicId

topic = TopicId(type="events", source="system")
type
str
required
Topic type
source
str
required
Topic source namespace
Proxy for remote agent communication.
from autogen_core import AgentProxy

proxy = AgentProxy(agent_id, runtime)
result = await proxy.send("Hello")

Subscriptions

Subscribe to messages by exact type match.
from autogen_core import TypeSubscription

subscription = TypeSubscription(
    topic_type="tasks",
    agent_type="worker"
)
await runtime.add_subscription(subscription)
topic_type
str
required
Topic type to subscribe to
agent_type
str
required
Agent type that will receive messages
Subscribe to messages matching a type prefix.
from autogen_core import TypePrefixSubscription

subscription = TypePrefixSubscription(
    topic_type_prefix="task.",
    agent_type="worker"
)
topic_type_prefix
str
required
Topic type prefix to match
agent_type
str
required
Agent type to receive matching messages
Subscribe to all messages in a namespace.
from autogen_core import DefaultSubscription

subscription = DefaultSubscription(
    agent_type="logger"
)

Serialization & Components

Protocol for custom message serialization.
from autogen_core import MessageSerializer

class MySerializer(MessageSerializer):
    def serialize(self, message: Any) -> bytes:
        return json.dumps(message).encode()

    def deserialize(self, data: bytes, type_name: str) -> Any:
        return json.loads(data.decode())
Declarative component configuration.
from autogen_core import Component, ComponentModel

@Component
class MyComponent:
    def __init__(self, config: dict):
        self.value = config["value"]

    def to_config(self) -> ComponentModel:
        return ComponentModel(
            component_type=self.__class__.__name__,
            config={"value": self.value}
        )
Component
decorator
Marks a class as a configurable component
ComponentModel
class
Serializable component configuration
component_type
str
required
Component type identifier
config
dict
required
Component configuration data
Image data container for multimodal messages.
from autogen_core import Image

# From bytes
image = Image.from_bytes(image_bytes, format="png")

# From file
image = Image.from_file("photo.jpg")

# From URL
image = Image.from_url("https://example.com/image.png")
from_bytes
classmethod
Create image from byte data
from_file
classmethod
Create image from file path
from_url
classmethod
Create image from URL

State Management

Protocol for agent state persistence.
from autogen_core import CacheStore

class CustomStore(CacheStore):
    async def get(self, key: str) -> Any:
        # Retrieve value
        pass

    async def set(self, key: str, value: Any) -> None:
        # Store value
        pass
In-memory cache store implementation.
from autogen_core import InMemoryStore

store = InMemoryStore()
await store.set("key", "value")
result = await store.get("key")

Intervention & Control

Protocol for intercepting and modifying messages.
from autogen_core import InterventionHandler, MessageContext

class ApprovalHandler(InterventionHandler):
    async def on_send(self, message: Any, ctx: MessageContext) -> Any:
        # Review and modify message before sending
        if needs_approval(message):
            approved = await get_approval(message)
            if not approved:
                from autogen_core import DropMessage
                raise DropMessage("Not approved")
        return message
Exception to prevent message delivery.
from autogen_core import DropMessage

raise DropMessage("Message rejected by policy")
Token for cancelling long-running operations.
from autogen_core import CancellationToken

token = CancellationToken()

# Cancel after timeout
token.cancel_after(10.0)

# Check if cancelled
if token.is_cancelled:
    return

Telemetry

OpenTelemetry integration for distributed tracing.
from autogen_core import (
    trace_create_agent_span,
    trace_invoke_agent_span,
    trace_tool_span
)

# Trace agent creation
with trace_create_agent_span(agent_type="worker"):
    agent = WorkerAgent()

# Trace agent invocation
with trace_invoke_agent_span(agent_id, message_type="Task"):
    result = await agent.handle(task)

# Trace tool execution
with trace_tool_span(tool_name="calculator"):
    result = calculator.add(1, 2)
Structured logging configuration.
from autogen_core import ROOT_LOGGER_NAME, EVENT_LOGGER_NAME, TRACE_LOGGER_NAME
import logging

# Configure root logger
logging.getLogger(ROOT_LOGGER_NAME).setLevel(logging.INFO)

# Configure event logger for structured events
event_logger = logging.getLogger(EVENT_LOGGER_NAME)

# Configure trace logger for development debugging
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)
ROOT_LOGGER_NAME
str
Name of the root logger: "autogen_core"
EVENT_LOGGER_NAME
str
Name for structured event logging
TRACE_LOGGER_NAME
str
Name for development trace logging

Types & Protocols

Represents a tool or function call.
from autogen_core import FunctionCall

call = FunctionCall(
    id="call_123",
    name="calculator",
    arguments={"a": 1, "b": 2}
)
id
str
required
Unique call identifier
name
str
required
Function or tool name
arguments
dict
required
Function arguments
Represents a message with unknown type.
from autogen_core import UnknownPayload

# Raised when deserializer encounters unknown type

Constants

JSON_DATA_CONTENT_TYPE = "application/json"
PROTOBUF_DATA_CONTENT_TYPE = "application/protobuf"

See Also

Build docs developers (and LLMs) love