Skip to main content
The AutoGen Core API provides a foundation for building distributed, event-driven multi-agent systems. It implements the Actor model, where agents are independent entities that communicate through asynchronous message passing.

Architecture Principles

AutoGen Core is built on three fundamental concepts:
  1. Actor Model: Each agent is an independent actor with its own state and behavior
  2. Message Passing: Agents communicate exclusively through messages (no shared state)
  3. Event-Driven: Asynchronous, non-blocking message processing

Core Components

Agent Types

AutoGen Core provides three base classes for building agents:

Agent

Protocol defining the agent interface

BaseAgent

Abstract base class with lifecycle management

RoutedAgent

Handler-based routing with decorators

Runtime Environments

SingleThreadedAgentRuntime

Development runtime with in-process message queue

Distributed Runtime

Production runtime with cross-process/language support

Agent Protocol

The Agent protocol defines the minimal interface all agents must implement:
from autogen_core import Agent, AgentId, AgentMetadata, MessageContext
from typing import Any, Mapping

class Agent(Protocol):
    @property
    def metadata(self) -> AgentMetadata:
        """Metadata of the agent."""
        ...
    
    @property
    def id(self) -> AgentId:
        """ID of the agent."""
        ...
    
    async def bind_id_and_runtime(self, id: AgentId, runtime: AgentRuntime) -> None:
        """Bind agent to runtime and assign ID."""
        ...
    
    async def on_message(self, message: Any, ctx: MessageContext) -> Any:
        """Message handler called by the runtime."""
        ...
    
    async def save_state(self) -> Mapping[str, Any]:
        """Save agent state (must be JSON serializable)."""
        ...
    
    async def load_state(self, state: Mapping[str, Any]) -> None:
        """Load agent state."""
        ...
    
    async def close(self) -> None:
        """Cleanup when runtime is closed."""
        ...

BaseAgent

BaseAgent is an abstract base class that provides:
  • Agent lifecycle management (initialization, registration, cleanup)
  • Runtime binding and ID assignment
  • Message sending and publishing utilities
  • State persistence hooks
from autogen_core import BaseAgent, MessageContext, AgentId
from typing import Any

class MyAgent(BaseAgent):
    def __init__(self, description: str = "My custom agent"):
        super().__init__(description)
        self.state = {}
    
    async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any:
        # Implement your message handling logic
        if isinstance(message, MyMessage):
            return await self.handle_my_message(message, ctx)
        return None
    
    async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> Response:
        # Handle specific message type
        result = process(message)
        return Response(result=result)
Key methods:
  • on_message_impl(): Abstract method to implement message handling
  • send_message(): Send a direct message to another agent
  • publish_message(): Broadcast message to topic subscribers
  • save_state() / load_state(): Persist and restore agent state

RoutedAgent

RoutedAgent extends BaseAgent with decorator-based message routing:
from autogen_core import RoutedAgent, event, rpc, MessageContext
from dataclasses import dataclass

@dataclass
class TaskRequest:
    task_id: str
    data: dict

@dataclass
class TaskResponse:
    task_id: str
    result: str

@dataclass
class StatusUpdate:
    status: str

class TaskAgent(RoutedAgent):
    def __init__(self):
        super().__init__("Task processing agent")
        self.tasks = {}
    
    @rpc
    async def handle_task_request(self, message: TaskRequest, ctx: MessageContext) -> TaskResponse:
        """RPC handler - expects a response."""
        result = await self.process_task(message.task_id, message.data)
        return TaskResponse(task_id=message.task_id, result=result)
    
    @event
    async def handle_status_update(self, message: StatusUpdate, ctx: MessageContext) -> None:
        """Event handler - fire and forget."""
        print(f"Status update: {message.status}")
        # No return value
    
    async def process_task(self, task_id: str, data: dict) -> str:
        # Task processing logic
        return f"Processed {task_id}"
Key features:
  • @event: Handler for one-way messages (no response expected)
  • @rpc: Handler for request-response messages (returns a value)
  • @message_handler: Generic handler for both event and RPC messages
  • Automatic routing based on message type
  • Optional match function for secondary routing

Agent Identification

Every agent has a unique AgentId composed of:
from autogen_core import AgentId

# Create an agent ID
agent_id = AgentId(type="worker", key="instance-1")

print(agent_id.type)  # "worker" - associates with factory
print(agent_id.key)   # "instance-1" - unique instance identifier
print(str(agent_id))  # "worker/instance-1"

# Parse from string
agent_id = AgentId.from_str("worker/instance-1")
AgentId components:
type
string
required
Agent type that associates with a factory function. Must match pattern: ^[\w\-\.]+$
key
string
required
Unique instance identifier within the agent type

Message Flow

AutoGen Core supports two message patterns:

Direct Messaging (RPC)

from autogen_core import AgentRuntime, AgentId

# Send message to specific agent and await response
response = await runtime.send_message(
    message=TaskRequest(task_id="123", data={}),
    recipient=AgentId("worker", "instance-1")
)

Publish-Subscribe (Events)

from autogen_core import TopicId

# Publish message to all subscribers
await runtime.publish_message(
    message=StatusUpdate(status="processing"),
    topic_id=TopicId(type="status", source="system")
)

Registration Patterns

Agents can be registered using factories or instances:

Factory Registration

from autogen_core import SingleThreadedAgentRuntime, AgentType

runtime = SingleThreadedAgentRuntime()

# Register agent factory
await TaskAgent.register(
    runtime=runtime,
    type="task_agent",
    factory=lambda: TaskAgent()
)

# Runtime creates instances on-demand

Instance Registration

from autogen_core import AgentId

# Create and register specific instance
agent = TaskAgent()
await agent.register_instance(
    runtime=runtime,
    agent_id=AgentId("task_agent", "instance-1")
)

State Management

Agents can persist and restore state:
class StatefulAgent(BaseAgent):
    def __init__(self):
        super().__init__("Stateful agent")
        self.counter = 0
        self.data = {}
    
    async def save_state(self) -> Mapping[str, Any]:
        return {
            "counter": self.counter,
            "data": self.data
        }
    
    async def load_state(self, state: Mapping[str, Any]) -> None:
        self.counter = state["counter"]
        self.data = state["data"]

# Save all agent states
state = await runtime.save_state()

# Later, restore state
await runtime.load_state(state)

Complete Example

import asyncio
from dataclasses import dataclass
from autogen_core import (
    SingleThreadedAgentRuntime,
    RoutedAgent,
    MessageContext,
    AgentId,
    rpc,
    event
)

@dataclass
class CalculateRequest:
    operation: str
    a: float
    b: float

@dataclass
class CalculateResponse:
    result: float

@dataclass
class LogEvent:
    message: str

class CalculatorAgent(RoutedAgent):
    def __init__(self):
        super().__init__("Calculator agent")
    
    @rpc
    async def calculate(self, message: CalculateRequest, ctx: MessageContext) -> CalculateResponse:
        if message.operation == "add":
            result = message.a + message.b
        elif message.operation == "multiply":
            result = message.a * message.b
        else:
            raise ValueError(f"Unknown operation: {message.operation}")
        
        # Log the calculation
        if ctx.topic_id:
            await self.publish_message(
                LogEvent(f"Calculated {message.operation}: {result}"),
                ctx.topic_id
            )
        
        return CalculateResponse(result=result)

class LoggerAgent(RoutedAgent):
    def __init__(self):
        super().__init__("Logger agent")
    
    @event
    async def log(self, message: LogEvent, ctx: MessageContext) -> None:
        print(f"[LOG] {message.message}")

async def main():
    # Create runtime
    runtime = SingleThreadedAgentRuntime()
    
    # Register agents
    await CalculatorAgent.register(runtime, "calculator", lambda: CalculatorAgent())
    await LoggerAgent.register(runtime, "logger", lambda: LoggerAgent())
    
    # Start runtime
    runtime.start()
    
    # Send calculation request
    response = await runtime.send_message(
        CalculateRequest(operation="add", a=5, b=3),
        recipient=AgentId("calculator", "default")
    )
    print(f"Result: {response.result}")  # Output: Result: 8.0
    
    # Stop runtime
    await runtime.stop_when_idle()

if __name__ == "__main__":
    asyncio.run(main())

Next Steps

Agent Runtime

Learn about runtime environments and lifecycle

Message Passing

Deep dive into messages, contexts, and subscriptions

Event-Driven Architecture

Explore event handlers and message routing

Distributed Runtime

Scale across processes and languages

Build docs developers (and LLMs) love