Overview
Microsoft Agent Framework provides multiple orchestration patterns for coordinating agents and executors. Choose the pattern that best fits your use case:
Sequential : Process steps in order, one after another
Concurrent : Run multiple agents/executors in parallel
Conditional : Route based on message content or state
Loops : Iterate until a condition is met
Complex graphs : Combine patterns for sophisticated workflows
Sequential Orchestration
Sequential workflows execute executors in a linear pipeline, where each step processes the output of the previous step.
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient
client = AzureOpenAIResponsesClient( ... )
researcher = client.as_agent(
name = "researcher" ,
instructions = "Research the topic and provide insights."
)
writer = client.as_agent(
name = "writer" ,
instructions = "Write content based on research."
)
editor = client.as_agent(
name = "editor" ,
instructions = "Edit and polish the content."
)
workflow = (
WorkflowBuilder( start_executor = researcher)
.add_edge(researcher, writer)
.add_edge(writer, editor)
.build()
)
events = await workflow.run( "Write an article about electric vehicles" )
Concurrent Orchestration
Concurrent workflows fan out work to multiple executors in parallel, then aggregate results.
High-Level API
Use ConcurrentBuilder for agent-based concurrent workflows:
from agent_framework.orchestrations import ConcurrentBuilder
researcher = client.as_agent(
name = "researcher" ,
instructions = "Provide market research insights."
)
marketer = client.as_agent(
name = "marketer" ,
instructions = "Create marketing strategy."
)
legal = client.as_agent(
name = "legal" ,
instructions = "Review for compliance concerns."
)
# Build concurrent workflow - fans out to all agents
workflow = ConcurrentBuilder(
participants = [researcher, marketer, legal]
).build()
events = await workflow.run( "Launch a new electric bike for urban commuters" )
# Get aggregated messages from all agents
outputs = events.get_outputs()
for output in outputs:
messages = output # list[Message]
for msg in messages:
print ( f " { msg.author_name } : { msg.text } " )
Custom Aggregation
Define custom aggregation logic for concurrent results:
from agent_framework import Message
def custom_aggregator ( results : list[list[Message]]) -> list[Message]:
"""Custom aggregation: concatenate all messages."""
all_messages = []
for agent_messages in results:
all_messages.extend(agent_messages)
return all_messages
workflow = ConcurrentBuilder(
participants = [agent1, agent2, agent3],
aggregator = custom_aggregator
).build()
Low-Level Fan-Out/Fan-In
For custom concurrent patterns, use add_fan_out_edge and add_fan_in_edge:
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class Dispatcher ( Executor ):
@handler
async def dispatch ( self , task : dict , ctx : WorkflowContext[ dict ]) -> None :
await ctx.send_message(task)
class Worker ( Executor ):
def __init__ ( self , id : str , name : str ):
super (). __init__ ( id = id )
self .name = name
@handler
async def work ( self , task : dict , ctx : WorkflowContext[ dict ]) -> None :
result = { "worker" : self .name, "result" : f "Processed { task } " }
await ctx.send_message(result)
class Aggregator ( Executor ):
@handler
async def aggregate ( self , result : dict , ctx : WorkflowContext[Never, list ]) -> None :
# Collect results from all workers
results = ctx.get_state( "results" , [])
results.append(result)
ctx.set_state( "results" , results)
# When all workers complete, yield output
if len (results) == 3 :
await ctx.yield_output(results)
dispatcher = Dispatcher( id = "dispatcher" )
workers = [Worker( id = f "worker_ { i } " , name = f "Worker { i } " ) for i in range ( 3 )]
aggregator = Aggregator( id = "aggregator" )
workflow = (
WorkflowBuilder( start_executor = dispatcher)
.add_fan_out_edge(dispatcher, workers)
.add_fan_in_edge(workers, aggregator)
.build()
)
Conditional Routing
Route messages to different executors based on conditions.
Conditional Edges
class Router ( Executor ):
@handler
async def route ( self , data : dict , ctx : WorkflowContext[ dict ]) -> None :
await ctx.send_message(data)
def is_urgent ( data : dict ) -> bool :
return data.get( "priority" ) == "urgent"
def is_normal ( data : dict ) -> bool :
return data.get( "priority" ) == "normal"
workflow = (
WorkflowBuilder( start_executor = router)
.add_edge(router, urgent_handler, condition = is_urgent)
.add_edge(router, normal_handler, condition = is_normal)
.add_edge(router, default_handler, condition = lambda d : True ) # Catch-all
.build()
)
Switch-Case Routing
For mutually exclusive routing (exactly one target):
from agent_framework import Case, Default
def is_high_value ( order : dict ) -> bool :
return order.get( "total" , 0 ) > 1000
def is_medium_value ( order : dict ) -> bool :
total = order.get( "total" , 0 )
return 100 < total <= 1000
workflow = (
WorkflowBuilder( start_executor = order_classifier)
.add_switch_case_edge_group(
order_classifier,
[
Case( condition = is_high_value, target = premium_handler),
Case( condition = is_medium_value, target = standard_handler),
Default( target = basic_handler), # Fallback
]
)
.build()
)
Switch-case evaluates cases in order. The first matching condition routes the message. Always include a Default case.
Loop Patterns
Create feedback loops for iterative refinement.
Simple Loop
from enum import Enum
class Signal ( Enum ):
CONTINUE = "continue"
DONE = "done"
class Generator ( Executor ):
def __init__ ( self , id : str , max_attempts : int = 3 ):
super (). __init__ ( id = id )
self .attempts = 0
self .max_attempts = max_attempts
@handler
async def generate (
self ,
signal : Signal,
ctx : WorkflowContext[ str , str ]
) -> None :
self .attempts += 1
content = f "Attempt { self .attempts } : Generated content"
if self .attempts >= self .max_attempts:
await ctx.yield_output(content)
else :
await ctx.send_message(content)
class Reviewer ( Executor ):
@handler
async def review ( self , content : str , ctx : WorkflowContext[Signal]) -> None :
# Simple review logic
if "good" in content.lower():
await ctx.send_message(Signal. DONE )
else :
await ctx.send_message(Signal. CONTINUE )
generator = Generator( id = "generator" , max_attempts = 5 )
reviewer = Reviewer( id = "reviewer" )
workflow = (
WorkflowBuilder( start_executor = generator)
.add_edge(generator, reviewer)
.add_edge(reviewer, generator) # Loop back
.build()
)
events = await workflow.run(Signal. CONTINUE )
Agent Feedback Loop
Real-world example with agents:
writer = client.as_agent(
name = "writer" ,
instructions = "Write content based on feedback."
)
reviewer = client.as_agent(
name = "reviewer" ,
instructions = "Review content. Reply 'approved' or provide feedback."
)
class FeedbackRouter ( Executor ):
@handler
async def route (
self ,
response : AgentResponse,
ctx : WorkflowContext[AgentExecutorRequest | str , str ]
) -> None :
if "approved" in response.text.lower():
await ctx.yield_output(response.text)
else :
# Send feedback back to writer
await ctx.send_message(
AgentExecutorRequest(
messages = [Message( "user" , text = f "Revise based on: { response.text } " )],
should_respond = True
),
target_id = "writer"
)
feedback_router = FeedbackRouter( id = "feedback_router" )
workflow = (
WorkflowBuilder( start_executor = writer, max_iterations = 10 )
.add_edge(writer, reviewer)
.add_edge(reviewer, feedback_router)
.add_edge(feedback_router, writer) # Loop back
.build()
)
Complex Graph Patterns
Combine patterns for sophisticated orchestration.
Multi-Stage Pipeline with Fan-Out
# Stage 1: Initial processing
preprocessor = Preprocessor( id = "preprocessor" )
# Stage 2: Parallel analysis
sentiment_analyzer = SentimentAnalyzer( id = "sentiment" )
entity_extractor = EntityExtractor( id = "entities" )
topic_classifier = TopicClassifier( id = "topics" )
# Stage 3: Synthesis
synthesizer = Synthesizer( id = "synthesizer" )
workflow = (
WorkflowBuilder( start_executor = preprocessor)
# Fan out to parallel analyzers
.add_fan_out_edge(
preprocessor,
[sentiment_analyzer, entity_extractor, topic_classifier]
)
# Fan in to synthesizer
.add_fan_in_edge(
[sentiment_analyzer, entity_extractor, topic_classifier],
synthesizer
)
.build()
)
Conditional Fan-Out with Dynamic Routing
def select_analyzers ( task : dict , available : list[ str ]) -> list[ str ]:
"""Dynamically select analyzers based on task type."""
task_type = task.get( "type" , "" )
if task_type == "text" :
return [ "sentiment" , "topics" ]
elif task_type == "image" :
return [ "vision" , "ocr" ]
return available # Use all
workflow = (
WorkflowBuilder( start_executor = dispatcher)
.add_fan_out_edge(
dispatcher,
[sentiment, topics, vision, ocr],
selection_func = select_analyzers
)
.add_fan_in_edge([sentiment, topics, vision, ocr], aggregator)
.build()
)
Decision Tree Pattern
from agent_framework import Case, Default
# Root decision
classifier = Classifier( id = "classifier" )
# Category-specific sub-workflows
category_a_handler = CategoryAHandler( id = "category_a" )
category_b_handler = CategoryBHandler( id = "category_b" )
category_c_handler = CategoryCHandler( id = "category_c" )
# Sub-handlers for category A
a_priority_high = APriorityHigh( id = "a_high" )
a_priority_low = APriorityLow( id = "a_low" )
def is_category_a ( data : dict ) -> bool :
return data.get( "category" ) == "A"
def is_category_b ( data : dict ) -> bool :
return data.get( "category" ) == "B"
def is_high_priority ( data : dict ) -> bool :
return data.get( "priority" ) == "high"
workflow = (
WorkflowBuilder( start_executor = classifier)
# First level: category routing
.add_switch_case_edge_group(
classifier,
[
Case( condition = is_category_a, target = category_a_handler),
Case( condition = is_category_b, target = category_b_handler),
Default( target = category_c_handler),
]
)
# Second level: priority routing for category A
.add_switch_case_edge_group(
category_a_handler,
[
Case( condition = is_high_priority, target = a_priority_high),
Default( target = a_priority_low),
]
)
.build()
)
Streaming and Events
Monitor orchestration progress in real-time:
async for event in workflow.run(initial_input, stream = True ):
if event.type == "executor_invoked" :
print ( f "→ { event.executor_id } started" )
elif event.type == "executor_completed" :
print ( f "✓ { event.executor_id } completed" )
elif event.type == "superstep_completed" :
print ( f "Super step { event.iteration } completed" )
elif event.type == "output" :
print ( f "Output: { event.data } " )
Best Practices
Use sequential for linear pipelines where order matters
Use concurrent when tasks are independent and can run in parallel
Use conditional when routing depends on message content
Use loops for iterative refinement with feedback
Always include a Default case in switch-case routing
Set max_iterations to prevent infinite loops
Add error handling in executors to gracefully handle failures
Use type hints for better error detection
Use descriptive executor IDs
Log important state transitions
Monitor workflow events for debugging
Use structured data for messages
Next Steps
Checkpoints Save and restore workflow state for fault tolerance
Human-in-the-Loop Integrate human decision points into workflows