AutoGen Core implements an event-driven architecture where agents react to messages asynchronously. Message handlers are decorated methods that process specific message types.
Handler Decorators
AutoGen Core provides three decorators for defining message handlers:
@event One-way messages (no response)
@rpc Request-response messages
@message_handler Generic handler (both event and RPC)
@event Decorator
Use @event for handlers that process messages without returning a response (fire-and-forget).
Basic Usage
from autogen_core import RoutedAgent, event, MessageContext
from dataclasses import dataclass
@dataclass
class LogEntry :
level: str
message: str
@dataclass
class StatusUpdate :
status: str
progress: float
class Logger ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Logger agent" )
self .logs = []
@event
async def handle_log ( self , message : LogEntry, ctx : MessageContext) -> None :
"""Event handlers must return None."""
self .logs.append((message.level, message.message))
print ( f "[ { message.level } ] { message.message } " )
@event
async def handle_status ( self , message : StatusUpdate, ctx : MessageContext) -> None :
print ( f "Status: { message.status } ( { message.progress * 100 } %)" )
Event handlers must have return type None. They’re called when ctx.is_rpc == False.
Event Handler Signature
@event
async def handler_name (
self ,
message : MessageType, # Type-hinted message parameter
ctx : MessageContext # Context parameter
) -> None : # Must return None
# Handler implementation
pass
Requirements:
Must be an async method
Must have exactly 3 parameters: self, message, ctx
message must be type-hinted with the message type to handle
ctx must be type MessageContext
Return type must be None
Event Parameters
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default: "None"
Secondary routing function. Applied in alphabetical order of handlers. First matching handler is called.
With Match Function
@dataclass
class Alert :
severity: str
message: str
class AlertHandler ( RoutedAgent ):
@event ( match = lambda msg , ctx : msg.severity == "critical" )
async def handle_critical ( self , message : Alert, ctx : MessageContext) -> None :
print ( f "CRITICAL: { message.message } " )
# Send notification
@event ( match = lambda msg , ctx : msg.severity == "warning" )
async def handle_warning ( self , message : Alert, ctx : MessageContext) -> None :
print ( f "Warning: { message.message } " )
@event # Catch-all (evaluated last alphabetically)
async def handle_info ( self , message : Alert, ctx : MessageContext) -> None :
print ( f "Info: { message.message } " )
Match functions are evaluated in alphabetical order of handler names. Only the first matching handler is called. Remaining handlers are skipped.
@rpc Decorator
Use @rpc for handlers that process requests and return responses.
Basic Usage
from autogen_core import RoutedAgent, rpc, MessageContext
from dataclasses import dataclass
@dataclass
class CalculateRequest :
operation: str
a: float
b: float
@dataclass
class CalculateResponse :
result: float
@dataclass
class ErrorResponse :
error: str
class Calculator ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Calculator agent" )
@rpc
async def calculate (
self ,
message : CalculateRequest,
ctx : MessageContext
) -> CalculateResponse | ErrorResponse:
"""RPC handlers must return a value."""
try :
if message.operation == "add" :
result = message.a + message.b
elif message.operation == "multiply" :
result = message.a * message.b
elif message.operation == "divide" :
if message.b == 0 :
return ErrorResponse( error = "Division by zero" )
result = message.a / message.b
else :
return ErrorResponse( error = f "Unknown operation: { message.operation } " )
return CalculateResponse( result = result)
except Exception as e:
return ErrorResponse( error = str (e))
RPC handlers must return a value (not None). They’re called when ctx.is_rpc == True.
RPC Handler Signature
@rpc
async def handler_name (
self ,
message : RequestType, # Type-hinted request parameter
ctx : MessageContext # Context parameter
) -> ResponseType: # Type-hinted return type (not None)
# Handler implementation
return response
Requirements:
Must be an async method
Must have exactly 3 parameters: self, message, ctx
message must be type-hinted with the request type to handle
ctx must be type MessageContext
Return type must be type-hinted (not None)
Must return a value
RPC Parameters
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default: "None"
Secondary routing function for selecting between multiple RPC handlers.
With Match Function
@dataclass
class Query :
query_type: str
params: dict
@dataclass
class Result :
data: list
class QueryHandler ( RoutedAgent ):
@rpc ( match = lambda msg , ctx : msg.query_type == "user" )
async def query_users ( self , message : Query, ctx : MessageContext) -> Result:
# Handle user queries
users = await self .db.query_users(message.params)
return Result( data = users)
@rpc ( match = lambda msg , ctx : msg.query_type == "product" )
async def query_products ( self , message : Query, ctx : MessageContext) -> Result:
# Handle product queries
products = await self .db.query_products(message.params)
return Result( data = products)
@rpc # Default handler
async def query_default ( self , message : Query, ctx : MessageContext) -> Result:
return Result( data = [])
@message_handler Decorator
Use @message_handler for generic handlers that can process both events and RPCs.
Basic Usage
from autogen_core import RoutedAgent, message_handler, MessageContext
from dataclasses import dataclass
from typing import Any
@dataclass
class Command :
action: str
params: dict
@dataclass
class CommandResult :
success: bool
output: str
class Executor ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Executor agent" )
@message_handler
async def execute (
self ,
message : Command,
ctx : MessageContext
) -> CommandResult | None :
"""Handles both RPC and event messages."""
result = await self .run_command(message.action, message.params)
# Return result for RPC, None for events
if ctx.is_rpc:
return CommandResult( success = True , output = result)
else :
# Event - just log
print ( f "Command { message.action } executed: { result } " )
return None
async def run_command ( self , action : str , params : dict ) -> str :
# Execute command
return f "Executed { action } "
Message Handler Signature
@message_handler
async def handler_name (
self ,
message : MessageType, # Type-hinted message parameter
ctx : MessageContext # Context parameter
) -> ResponseType | None : # Can return value or None
# Handler implementation
if ctx.is_rpc:
return response
else :
return None
Message Handler Parameters
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default: "None"
Secondary routing function.
Message Routing
Type-Based Routing
Messages are routed to handlers based on the type hint:
from typing import Union
@dataclass
class MessageA :
data: str
@dataclass
class MessageB :
value: int
@dataclass
class MessageC :
items: list
class MultiHandler ( RoutedAgent ):
@event
async def handle_a ( self , message : MessageA, ctx : MessageContext) -> None :
print ( f "Got A: { message.data } " )
@event
async def handle_b ( self , message : MessageB, ctx : MessageContext) -> None :
print ( f "Got B: { message.value } " )
@event
async def handle_ab ( self , message : MessageA | MessageB, ctx : MessageContext) -> None :
"""Handles both MessageA and MessageB."""
if isinstance (message, MessageA):
print ( f "A or B (A): { message.data } " )
else :
print ( f "A or B (B): { message.value } " )
If multiple handlers match the same message type, they’re evaluated in alphabetical order. Only the first matching handler (including match function) is called.
Handler Selection Order
Type match : Handler’s type annotation must match message type
Alphabetical order : Handlers are sorted alphabetically by method name
Match function : If provided, must return True
First match wins : First handler that passes all checks is called
class OrderDemo ( RoutedAgent ):
@event ( match = lambda msg , ctx : msg.priority > 5 )
async def handle_high_priority ( self , message : Task, ctx : MessageContext) -> None :
print ( "High priority" )
@event ( match = lambda msg , ctx : msg.priority <= 5 )
async def handle_low_priority ( self , message : Task, ctx : MessageContext) -> None :
print ( "Low priority" )
@event # Catch-all (evaluated last)
async def handle_task ( self , message : Task, ctx : MessageContext) -> None :
print ( "Default handler" )
Unhandled Messages
class MyAgent ( RoutedAgent ):
@event
async def handle_known ( self , message : KnownMessage, ctx : MessageContext) -> None :
print ( "Handled" )
async def on_unhandled_message ( self , message : Any, ctx : MessageContext) -> None :
"""Called when no handler matches."""
print ( f "Unhandled message type: { type (message). __name__ } " )
# Log, raise exception, or handle gracefully
By default, on_unhandled_message logs an info message. Override it to customize behavior.
Advanced Patterns
Chained Handlers
class ChainedAgent ( RoutedAgent ):
@rpc
async def process_step1 ( self , message : Step1Request, ctx : MessageContext) -> Step2Request:
# Process step 1
result = await self .do_step1(message.data)
# Forward to step 2 handler
return await self .send_message(
Step2Request( data = result),
recipient = AgentId( "processor" , "step2" )
)
Publish from Handler
class Publisher ( RoutedAgent ):
@rpc
async def handle_request ( self , message : Request, ctx : MessageContext) -> Response:
# Process request
result = await self .process(message)
# Notify subscribers
await self .publish_message(
ProcessedEvent( request_id = message.id, result = result),
TopicId( type = "processed" , source = message.id)
)
return Response( status = "done" )
Error Handling
class RobustAgent ( RoutedAgent ):
@rpc
async def handle_with_errors (
self ,
message : Request,
ctx : MessageContext
) -> Response | ErrorResponse:
try :
# Check cancellation
if ctx.cancellation_token.is_cancelled():
return ErrorResponse( error = "Cancelled" )
# Process
result = await self .process(message)
return Response( result = result)
except ValueError as e:
return ErrorResponse( error = f "Validation error: { e } " )
except Exception as e:
# Log error
print ( f "Unexpected error: { e } " )
return ErrorResponse( error = "Internal error" )
State Management in Handlers
class StatefulAgent ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Stateful agent" )
self .counter = 0
self .history = []
@event
async def increment ( self , message : IncrementEvent, ctx : MessageContext) -> None :
self .counter += message.amount
self .history.append((message.amount, self .counter))
@rpc
async def get_count ( self , message : GetCountRequest, ctx : MessageContext) -> CountResponse:
return CountResponse( count = self .counter)
async def save_state ( self ) -> dict :
return {
"counter" : self .counter,
"history" : self .history
}
async def load_state ( self , state : dict ) -> None :
self .counter = state[ "counter" ]
self .history = state[ "history" ]
Complete Example
import asyncio
from dataclasses import dataclass
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
MessageContext,
AgentId,
TopicId,
TypeSubscription,
event,
rpc
)
@dataclass
class TaskRequest :
task_id: str
operation: str
data: dict
@dataclass
class TaskResponse :
task_id: str
result: str
success: bool
@dataclass
class TaskEvent :
task_id: str
status: str
timestamp: float
class Worker ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Worker" )
self .completed_tasks = set ()
@rpc
async def handle_task ( self , message : TaskRequest, ctx : MessageContext) -> TaskResponse:
# Publish start event
await self .publish_message(
TaskEvent( task_id = message.task_id, status = "started" , timestamp = asyncio.get_event_loop().time()),
TopicId( type = "task.events" , source = message.task_id)
)
# Process
result = f "Processed { message.operation } with { message.data } "
self .completed_tasks.add(message.task_id)
# Publish completion event
await self .publish_message(
TaskEvent( task_id = message.task_id, status = "completed" , timestamp = asyncio.get_event_loop().time()),
TopicId( type = "task.events" , source = message.task_id)
)
return TaskResponse( task_id = message.task_id, result = result, success = True )
@rpc
async def get_status ( self , message : TaskStatusRequest, ctx : MessageContext) -> TaskStatusResponse:
completed = message.task_id in self .completed_tasks
return TaskStatusResponse( task_id = message.task_id, completed = completed)
@dataclass
class TaskStatusRequest :
task_id: str
@dataclass
class TaskStatusResponse :
task_id: str
completed: bool
class Monitor ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Monitor" )
self .events = []
@event
async def track_event ( self , message : TaskEvent, ctx : MessageContext) -> None :
self .events.append((message.task_id, message.status, message.timestamp))
print ( f "[ { message.timestamp :.2f} ] Task { message.task_id } : { message.status } " )
async def main ():
runtime = SingleThreadedAgentRuntime()
# Register agents
await Worker.register(runtime, "worker" , lambda : Worker())
await Monitor.register(runtime, "monitor" , lambda : Monitor())
# Subscribe monitor to events
await runtime.add_subscription(
TypeSubscription( topic_type = "task.events" , agent_type = "monitor" )
)
runtime.start()
# Send task
response = await runtime.send_message(
TaskRequest( task_id = "task-1" , operation = "analyze" , data = { "key" : "value" }),
recipient = AgentId( "worker" , "default" )
)
print ( f "Result: { response.result } " )
# Check status
status = await runtime.send_message(
TaskStatusRequest( task_id = "task-1" ),
recipient = AgentId( "worker" , "default" )
)
print ( f "Completed: { status.completed } " )
await asyncio.sleep( 0.1 )
await runtime.stop_when_idle()
if __name__ == "__main__" :
asyncio.run(main())
Best Practices
Use @event for notifications
Use @event when the sender doesn’t need a response. Perfect for logging, notifications, and status updates.
Use @rpc when the sender needs a response. Perfect for queries, calculations, and operations that produce results.
Always type-hint message parameters. This enables type checking and automatic routing.
RPC handlers should return error responses rather than raising exceptions when possible. This provides better error information to callers.
For long-running handlers, periodically check ctx.cancellation_token.is_cancelled().
Next Steps
Message Passing Learn about messages, contexts, and subscriptions
Distributed Runtime Scale event-driven agents across processes
Agent Runtime Understand runtime operations
Core Overview Return to Core API overview