The AutoGen Core API provides a foundation for building distributed, event-driven multi-agent systems. It implements the Actor model, where agents are independent entities that communicate through asynchronous message passing.
Architecture Principles
AutoGen Core is built on three fundamental concepts:
Actor Model : Each agent is an independent actor with its own state and behavior
Message Passing : Agents communicate exclusively through messages (no shared state)
Event-Driven : Asynchronous, non-blocking message processing
Core Components
Agent Types
AutoGen Core provides three base classes for building agents:
Agent Protocol defining the agent interface
BaseAgent Abstract base class with lifecycle management
RoutedAgent Handler-based routing with decorators
Runtime Environments
SingleThreadedAgentRuntime Development runtime with in-process message queue
Distributed Runtime Production runtime with cross-process/language support
Agent Protocol
The Agent protocol defines the minimal interface all agents must implement:
from autogen_core import Agent, AgentId, AgentMetadata, MessageContext
from typing import Any, Mapping
class Agent ( Protocol ):
@ property
def metadata ( self ) -> AgentMetadata:
"""Metadata of the agent."""
...
@ property
def id ( self ) -> AgentId:
"""ID of the agent."""
...
async def bind_id_and_runtime ( self , id : AgentId, runtime : AgentRuntime) -> None :
"""Bind agent to runtime and assign ID."""
...
async def on_message ( self , message : Any, ctx : MessageContext) -> Any:
"""Message handler called by the runtime."""
...
async def save_state ( self ) -> Mapping[ str , Any]:
"""Save agent state (must be JSON serializable)."""
...
async def load_state ( self , state : Mapping[ str , Any]) -> None :
"""Load agent state."""
...
async def close ( self ) -> None :
"""Cleanup when runtime is closed."""
...
BaseAgent
BaseAgent is an abstract base class that provides:
Agent lifecycle management (initialization, registration, cleanup)
Runtime binding and ID assignment
Message sending and publishing utilities
State persistence hooks
from autogen_core import BaseAgent, MessageContext, AgentId
from typing import Any
class MyAgent ( BaseAgent ):
def __init__ ( self , description : str = "My custom agent" ):
super (). __init__ (description)
self .state = {}
async def on_message_impl ( self , message : Any, ctx : MessageContext) -> Any:
# Implement your message handling logic
if isinstance (message, MyMessage):
return await self .handle_my_message(message, ctx)
return None
async def handle_my_message ( self , message : MyMessage, ctx : MessageContext) -> Response:
# Handle specific message type
result = process(message)
return Response( result = result)
Key methods:
on_message_impl(): Abstract method to implement message handling
send_message(): Send a direct message to another agent
publish_message(): Broadcast message to topic subscribers
save_state() / load_state(): Persist and restore agent state
RoutedAgent
RoutedAgent extends BaseAgent with decorator-based message routing:
from autogen_core import RoutedAgent, event, rpc, MessageContext
from dataclasses import dataclass
@dataclass
class TaskRequest :
task_id: str
data: dict
@dataclass
class TaskResponse :
task_id: str
result: str
@dataclass
class StatusUpdate :
status: str
class TaskAgent ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Task processing agent" )
self .tasks = {}
@rpc
async def handle_task_request ( self , message : TaskRequest, ctx : MessageContext) -> TaskResponse:
"""RPC handler - expects a response."""
result = await self .process_task(message.task_id, message.data)
return TaskResponse( task_id = message.task_id, result = result)
@event
async def handle_status_update ( self , message : StatusUpdate, ctx : MessageContext) -> None :
"""Event handler - fire and forget."""
print ( f "Status update: { message.status } " )
# No return value
async def process_task ( self , task_id : str , data : dict ) -> str :
# Task processing logic
return f "Processed { task_id } "
Key features:
@event: Handler for one-way messages (no response expected)
@rpc: Handler for request-response messages (returns a value)
@message_handler: Generic handler for both event and RPC messages
Automatic routing based on message type
Optional match function for secondary routing
Agent Identification
Every agent has a unique AgentId composed of:
from autogen_core import AgentId
# Create an agent ID
agent_id = AgentId( type = "worker" , key = "instance-1" )
print (agent_id.type) # "worker" - associates with factory
print (agent_id.key) # "instance-1" - unique instance identifier
print ( str (agent_id)) # "worker/instance-1"
# Parse from string
agent_id = AgentId.from_str( "worker/instance-1" )
AgentId components:
Agent type that associates with a factory function. Must match pattern: ^[\w\-\.]+$
Unique instance identifier within the agent type
Message Flow
AutoGen Core supports two message patterns:
Direct Messaging (RPC)
from autogen_core import AgentRuntime, AgentId
# Send message to specific agent and await response
response = await runtime.send_message(
message = TaskRequest( task_id = "123" , data = {}),
recipient = AgentId( "worker" , "instance-1" )
)
Publish-Subscribe (Events)
from autogen_core import TopicId
# Publish message to all subscribers
await runtime.publish_message(
message = StatusUpdate( status = "processing" ),
topic_id = TopicId( type = "status" , source = "system" )
)
Registration Patterns
Agents can be registered using factories or instances:
Factory Registration
from autogen_core import SingleThreadedAgentRuntime, AgentType
runtime = SingleThreadedAgentRuntime()
# Register agent factory
await TaskAgent.register(
runtime = runtime,
type = "task_agent" ,
factory = lambda : TaskAgent()
)
# Runtime creates instances on-demand
Instance Registration
from autogen_core import AgentId
# Create and register specific instance
agent = TaskAgent()
await agent.register_instance(
runtime = runtime,
agent_id = AgentId( "task_agent" , "instance-1" )
)
State Management
Agents can persist and restore state:
class StatefulAgent ( BaseAgent ):
def __init__ ( self ):
super (). __init__ ( "Stateful agent" )
self .counter = 0
self .data = {}
async def save_state ( self ) -> Mapping[ str , Any]:
return {
"counter" : self .counter,
"data" : self .data
}
async def load_state ( self , state : Mapping[ str , Any]) -> None :
self .counter = state[ "counter" ]
self .data = state[ "data" ]
# Save all agent states
state = await runtime.save_state()
# Later, restore state
await runtime.load_state(state)
Complete Example
import asyncio
from dataclasses import dataclass
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
MessageContext,
AgentId,
rpc,
event
)
@dataclass
class CalculateRequest :
operation: str
a: float
b: float
@dataclass
class CalculateResponse :
result: float
@dataclass
class LogEvent :
message: str
class CalculatorAgent ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Calculator agent" )
@rpc
async def calculate ( self , message : CalculateRequest, ctx : MessageContext) -> CalculateResponse:
if message.operation == "add" :
result = message.a + message.b
elif message.operation == "multiply" :
result = message.a * message.b
else :
raise ValueError ( f "Unknown operation: { message.operation } " )
# Log the calculation
if ctx.topic_id:
await self .publish_message(
LogEvent( f "Calculated { message.operation } : { result } " ),
ctx.topic_id
)
return CalculateResponse( result = result)
class LoggerAgent ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Logger agent" )
@event
async def log ( self , message : LogEvent, ctx : MessageContext) -> None :
print ( f "[LOG] { message.message } " )
async def main ():
# Create runtime
runtime = SingleThreadedAgentRuntime()
# Register agents
await CalculatorAgent.register(runtime, "calculator" , lambda : CalculatorAgent())
await LoggerAgent.register(runtime, "logger" , lambda : LoggerAgent())
# Start runtime
runtime.start()
# Send calculation request
response = await runtime.send_message(
CalculateRequest( operation = "add" , a = 5 , b = 3 ),
recipient = AgentId( "calculator" , "default" )
)
print ( f "Result: { response.result } " ) # Output: Result: 8.0
# Stop runtime
await runtime.stop_when_idle()
if __name__ == "__main__" :
asyncio.run(main())
Next Steps
Agent Runtime Learn about runtime environments and lifecycle
Message Passing Deep dive into messages, contexts, and subscriptions
Event-Driven Architecture Explore event handlers and message routing
Distributed Runtime Scale across processes and languages