Messages, MessageContext, TopicId, and Subscription patterns in AutoGen Core
Message passing is the foundation of communication in AutoGen Core. Agents communicate exclusively through asynchronous messages, following the Actor model with no shared state.
Every message handler receives a MessageContext with metadata about the message:
from dataclasses import dataclassfrom autogen_core import MessageContext, AgentId, TopicId, CancellationToken@dataclassclass MessageContext: sender: AgentId | None """The agent that sent the message, or None if sent externally.""" topic_id: TopicId | None """The topic this message was published to, or None for direct messages.""" is_rpc: bool """True if this is an RPC (expects response), False for events.""" cancellation_token: CancellationToken """Token to check if operation was cancelled.""" message_id: str """Unique identifier for this message."""
from autogen_core import RoutedAgent, MessageContext, event, rpcclass MyAgent(RoutedAgent): @rpc async def handle_request(self, message: Request, ctx: MessageContext) -> Response: # Check who sent the message if ctx.sender: print(f"Request from: {ctx.sender.type}/{ctx.sender.key}") # Check if operation was cancelled if ctx.cancellation_token.is_cancelled(): raise asyncio.CancelledError() # Reply with response return Response(data="processed") @event async def handle_event(self, message: StatusEvent, ctx: MessageContext) -> None: # Check if published to topic if ctx.topic_id: print(f"Event on topic: {ctx.topic_id.type}/{ctx.topic_id.source}") # Events don't return responses assert not ctx.is_rpc
DefaultTopicId is a predefined topic for simple pub-sub:
from autogen_core import DefaultTopicId# Use default topicawait runtime.publish_message( message=StatusEvent(status="ready"), topic_id=DefaultTopicId())
TypePrefixSubscription matches topics by type prefix (internal use):
from autogen_core import TypePrefixSubscription# Subscribe to all topics starting with "task."subscription = TypePrefixSubscription( topic_type_prefix="task.", # Must end with separator agent_type="task_handler")
TypePrefixSubscription is mainly used internally for direct message routing. For application-level subscriptions, use TypeSubscription or DefaultSubscription.
# TypeSubscription creates separate agent per sourceawait runtime.add_subscription( TypeSubscription(topic_type="user.action", agent_type="user_handler"))# Each user gets their own handler instanceawait runtime.publish_message( UserAction(action="login"), TopicId(type="user.action", source="user-123")) # Routes to AgentId("user_handler", "user-123")await runtime.publish_message( UserAction(action="logout"), TopicId(type="user.action", source="user-456")) # Routes to AgentId("user_handler", "user-456")
class Worker(RoutedAgent): @rpc async def long_task(self, message: Request, ctx: MessageContext) -> Response: for i in range(100): # Check if cancelled if ctx.cancellation_token.is_cancelled(): raise asyncio.CancelledError() # Do work await asyncio.sleep(0.1) return Response(result="completed")