The distributed runtime extends AutoGen Core’s capabilities beyond a single process, enabling agents to communicate across process boundaries, machines, and even programming languages.
The distributed runtime implements the same AgentRuntime protocol as SingleThreadedAgentRuntime, making it easy to scale from development to production.
Architecture Overview
The distributed runtime uses a message broker to route messages between agent instances running in different processes or on different machines.
Key features:
Process isolation : Agents run in separate processes for fault tolerance
Horizontal scaling : Add more agent instances to handle load
Cross-language : Python agents can communicate with .NET agents
Location transparency : Agents don’t know or care where other agents are running
Core Concepts
Message Broker
The distributed runtime uses a message broker (like gRPC, Redis, or RabbitMQ) to route messages between agent processes.
Agent Registration
Each agent process registers its agent types and subscriptions with the runtime. The runtime coordinates with the broker to route messages correctly.
Serialization
Messages are serialized when sent across process boundaries. AutoGen Core supports JSON and Protocol Buffers.
from autogen_core import JSON_DATA_CONTENT_TYPE , PROTOBUF_DATA_CONTENT_TYPE
print ( JSON_DATA_CONTENT_TYPE ) # "application/json"
print ( PROTOBUF_DATA_CONTENT_TYPE ) # "application/protobuf"
Setting Up Distributed Runtime
The exact distributed runtime implementation may vary based on your deployment. This section covers general patterns. Refer to the AutoGen distributed runtime documentation for specific implementation details.
Example: Multi-Process Setup
Process 1: Worker Agent
import asyncio
from dataclasses import dataclass
from autogen_core import (
RoutedAgent,
MessageContext,
rpc
)
# Assume distributed_runtime is configured
from autogen_core.distributed import create_distributed_runtime
@dataclass
class WorkRequest :
task_id: str
data: dict
@dataclass
class WorkResponse :
task_id: str
result: str
class Worker ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Worker agent" )
@rpc
async def process_work ( self , message : WorkRequest, ctx : MessageContext) -> WorkResponse:
# Process the work
result = f "Processed task { message.task_id } "
return WorkResponse( task_id = message.task_id, result = result)
async def main ():
# Create distributed runtime connected to broker
runtime = await create_distributed_runtime(
broker_url = "grpc://localhost:50051" ,
instance_id = "worker-process-1"
)
# Register worker agent
await Worker.register(runtime, "worker" , lambda : Worker())
# Start runtime (listens for messages)
await runtime.start()
# Keep running
await runtime.wait_until_stopped()
if __name__ == "__main__" :
asyncio.run(main())
Process 2: Coordinator Agent
import asyncio
from dataclasses import dataclass
from autogen_core import (
RoutedAgent,
MessageContext,
AgentId,
event
)
from autogen_core.distributed import create_distributed_runtime
@dataclass
class StartTask :
task_id: str
data: dict
class Coordinator ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Coordinator agent" )
@event
async def handle_start ( self , message : StartTask, ctx : MessageContext) -> None :
# Send work to worker agent (in different process)
response = await self .send_message(
WorkRequest( task_id = message.task_id, data = message.data),
recipient = AgentId( "worker" , "default" )
)
print ( f "Worker completed: { response.result } " )
async def main ():
# Connect to same broker
runtime = await create_distributed_runtime(
broker_url = "grpc://localhost:50051" ,
instance_id = "coordinator-process-1"
)
# Register coordinator
await Coordinator.register(runtime, "coordinator" , lambda : Coordinator())
await runtime.start()
# Send initial task
await runtime.publish_message(
StartTask( task_id = "task-1" , data = { "key" : "value" }),
topic_id = TopicId( type = "tasks" , source = "system" )
)
await runtime.wait_until_stopped()
if __name__ == "__main__" :
asyncio.run(main())
Cross-Language Communication
The distributed runtime supports agents written in different languages communicating through the message broker.
Python to .NET Example
Python Agent
from dataclasses import dataclass
from autogen_core import RoutedAgent, MessageContext, AgentId, rpc
@dataclass
class AnalyticsRequest :
data_points: list[ float ]
@dataclass
class AnalyticsResponse :
mean: float
std_dev: float
class DataCollector ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Data collector" )
@rpc
async def analyze ( self , message : CollectRequest, ctx : MessageContext) -> CollectResponse:
# Collect data
data = await self .collect_data()
# Send to .NET analytics agent
result = await self .send_message(
AnalyticsRequest( data_points = data),
recipient = AgentId( "analytics" , "default" ) # .NET agent
)
return CollectResponse( mean = result.mean, std_dev = result.std_dev)
C# (.NET) Agent
using AutoGen . Core ;
using System . Threading . Tasks ;
public record AnalyticsRequest ( float [] DataPoints );
public record AnalyticsResponse ( float Mean , float StdDev );
public class AnalyticsAgent : RoutedAgent
{
public AnalyticsAgent () : base ( "Analytics agent" ) { }
[ RpcMethod ]
public async Task < AnalyticsResponse > Analyze (
AnalyticsRequest message ,
MessageContext ctx )
{
// Calculate statistics
var mean = message . DataPoints . Average ();
var variance = message . DataPoints . Select ( x => Math . Pow ( x - mean , 2 )). Average ();
var stdDev = Math . Sqrt ( variance );
return new AnalyticsResponse ( mean , ( float ) stdDev );
}
}
// Registration
var runtime = await DistributedRuntime . CreateAsync (
brokerUrl : "grpc://localhost:50051" ,
instanceId : "analytics-process-1"
);
await AnalyticsAgent . RegisterAsync (
runtime ,
"analytics" ,
() => new AnalyticsAgent ()
);
await runtime . StartAsync ();
Message types must be compatible across languages. Use simple types (strings, numbers, lists, dictionaries) or shared Protocol Buffer definitions.
Message Serialization
Custom Serializers for Distributed Runtime
from autogen_core import MessageSerializer
import json
from typing import Any
class DistributedMessageSerializer (MessageSerializer[MyMessage]):
@ property
def data_content_type ( self ) -> str :
return "application/json"
@ property
def type_name ( self ) -> str :
return "my_message_v1" # Versioned type name
def serialize ( self , message : MyMessage) -> bytes :
data = {
"version" : 1 ,
"field1" : message.field1,
"field2" : message.field2
}
return json.dumps(data).encode( 'utf-8' )
def deserialize ( self , data : bytes ) -> MyMessage:
obj = json.loads(data.decode( 'utf-8' ))
if obj[ "version" ] != 1 :
raise ValueError ( f "Unsupported version: { obj[ 'version' ] } " )
return MyMessage( field1 = obj[ "field1" ], field2 = obj[ "field2" ])
# Register with runtime
runtime.add_message_serializer(DistributedMessageSerializer())
Protocol Buffers
For efficient binary serialization:
// messages.proto
syntax = "proto3" ;
message TaskRequest {
string task_id = 1 ;
string data = 2 ;
}
message TaskResponse {
string task_id = 1 ;
string result = 2 ;
}
# Python
from autogen_core import MessageSerializer
import messages_pb2
class TaskRequestSerializer (MessageSerializer[TaskRequest]):
@ property
def data_content_type ( self ) -> str :
return "application/protobuf"
def serialize ( self , message : TaskRequest) -> bytes :
proto = messages_pb2.TaskRequest(
task_id = message.task_id,
data = message.data
)
return proto.SerializeToString()
def deserialize ( self , data : bytes ) -> TaskRequest:
proto = messages_pb2.TaskRequest()
proto.ParseFromString(data)
return TaskRequest( task_id = proto.task_id, data = proto.data)
Fault Tolerance
Agent Restart Handling
Agents in different processes can fail and restart independently:
class ResilientAgent ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Resilient agent" )
self .state = {}
async def save_state ( self ) -> dict :
"""Called periodically by runtime."""
return self .state
async def load_state ( self , state : dict ) -> None :
"""Called when agent restarts."""
self .state = state
print ( f "Agent restored with state: { state } " )
# Runtime automatically persists and restores state
Timeout Handling
from autogen_core import CancellationToken
import asyncio
class TimeoutHandler ( RoutedAgent ):
@rpc
async def call_with_timeout (
self ,
message : Request,
ctx : MessageContext
) -> Response | ErrorResponse:
token = CancellationToken()
try :
# Set timeout
response = await asyncio.wait_for(
self .send_message(
SubRequest( data = message.data),
recipient = AgentId( "worker" , "default" ),
cancellation_token = token
),
timeout = 5.0
)
return Response( result = response.result)
except asyncio.TimeoutError:
token.cancel()
return ErrorResponse( error = "Request timed out" )
Distributed Subscriptions
Subscriptions work the same in distributed runtime:
# Process 1: Publisher
class EventPublisher ( RoutedAgent ):
@event
async def publish_event ( self , message : TriggerEvent, ctx : MessageContext) -> None :
await self .publish_message(
DataEvent( data = "something happened" ),
TopicId( type = "data.events" , source = "publisher" )
)
# Process 2: Subscriber
class EventSubscriber ( RoutedAgent ):
@event
async def handle_event ( self , message : DataEvent, ctx : MessageContext) -> None :
print ( f "Received event: { message.data } " )
# Each process registers and subscribes
await EventPublisher.register(runtime1, "publisher" , lambda : EventPublisher())
await EventSubscriber.register(runtime2, "subscriber" , lambda : EventSubscriber())
await runtime2.add_subscription(
TypeSubscription( topic_type = "data.events" , agent_type = "subscriber" )
)
The broker ensures published messages reach all subscribers across processes.
Load Balancing
Multiple Worker Instances
# Process 1
await Worker.register(runtime1, "worker" , lambda : Worker())
# Process 2
await Worker.register(runtime2, "worker" , lambda : Worker())
# Process 3
await Worker.register(runtime3, "worker" , lambda : Worker())
# Sending message to AgentId("worker", "instance-1")
# Runtime/broker determines which process handles it
The distributed runtime can load-balance messages across multiple instances of the same agent type running in different processes.
Monitoring and Observability
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
# Setup tracing
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
# Create runtime with tracing
runtime = await create_distributed_runtime(
broker_url = "grpc://localhost:50051" ,
tracer_provider = provider
)
Traces span across processes, showing the complete message flow.
Best Practices
Assume agents can fail and restart. Use state persistence and idempotent message handlers.
Always use timeouts for cross-process RPC calls to prevent indefinite blocking.
Include version information in message serialization to support rolling updates.
Track message queue depths and processing latencies to detect bottlenecks.
Large messages increase serialization overhead and network latency. Pass references instead of large data.
Use Protocol Buffers for performance
For high-throughput scenarios, Protocol Buffers provide better performance than JSON.
Deployment Patterns
Docker Compose Example
# docker-compose.yml
version : '3.8'
services :
broker :
image : autogen-broker:latest
ports :
- "50051:50051"
worker :
image : my-worker-agent:latest
environment :
- BROKER_URL=grpc://broker:50051
- INSTANCE_ID=worker-1
depends_on :
- broker
deploy :
replicas : 3
coordinator :
image : my-coordinator-agent:latest
environment :
- BROKER_URL=grpc://broker:50051
- INSTANCE_ID=coordinator-1
depends_on :
- broker
Kubernetes Example
# worker-deployment.yaml
apiVersion : apps/v1
kind : Deployment
metadata :
name : worker-agent
spec :
replicas : 5
selector :
matchLabels :
app : worker
template :
metadata :
labels :
app : worker
spec :
containers :
- name : worker
image : my-worker-agent:latest
env :
- name : BROKER_URL
value : "grpc://broker-service:50051"
- name : INSTANCE_ID
valueFrom :
fieldRef :
fieldPath : metadata.name
Limitations and Considerations
Network latency : Cross-process communication is slower than in-process
Serialization overhead : Messages must be serialized/deserialized
Broker dependency : The message broker is a single point of failure (use HA broker)
Eventual consistency : State across agents may be eventually consistent
Next Steps
Core Overview Return to Core API overview
Agent Runtime Learn about runtime operations
Message Passing Understand message contexts and subscriptions
Event-Driven Architecture Master event handlers and routing