Skip to main content

Overview

Microsoft Agent Framework provides multiple orchestration patterns for coordinating agents and executors. Choose the pattern that best fits your use case:
  • Sequential: Process steps in order, one after another
  • Concurrent: Run multiple agents/executors in parallel
  • Conditional: Route based on message content or state
  • Loops: Iterate until a condition is met
  • Complex graphs: Combine patterns for sophisticated workflows

Sequential Orchestration

Sequential workflows execute executors in a linear pipeline, where each step processes the output of the previous step.
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient

client = AzureOpenAIResponsesClient(...)

researcher = client.as_agent(
    name="researcher",
    instructions="Research the topic and provide insights."
)

writer = client.as_agent(
    name="writer",
    instructions="Write content based on research."
)

editor = client.as_agent(
    name="editor",
    instructions="Edit and polish the content."
)

workflow = (
    WorkflowBuilder(start_executor=researcher)
    .add_edge(researcher, writer)
    .add_edge(writer, editor)
    .build()
)

events = await workflow.run("Write an article about electric vehicles")

Concurrent Orchestration

Concurrent workflows fan out work to multiple executors in parallel, then aggregate results.

High-Level API

Use ConcurrentBuilder for agent-based concurrent workflows:
from agent_framework.orchestrations import ConcurrentBuilder

researcher = client.as_agent(
    name="researcher",
    instructions="Provide market research insights."
)

marketer = client.as_agent(
    name="marketer",
    instructions="Create marketing strategy."
)

legal = client.as_agent(
    name="legal",
    instructions="Review for compliance concerns."
)

# Build concurrent workflow - fans out to all agents
workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal]
).build()

events = await workflow.run("Launch a new electric bike for urban commuters")

# Get aggregated messages from all agents
outputs = events.get_outputs()
for output in outputs:
    messages = output  # list[Message]
    for msg in messages:
        print(f"{msg.author_name}: {msg.text}")

Custom Aggregation

Define custom aggregation logic for concurrent results:
Python
from agent_framework import Message

def custom_aggregator(results: list[list[Message]]) -> list[Message]:
    """Custom aggregation: concatenate all messages."""
    all_messages = []
    for agent_messages in results:
        all_messages.extend(agent_messages)
    return all_messages

workflow = ConcurrentBuilder(
    participants=[agent1, agent2, agent3],
    aggregator=custom_aggregator
).build()

Low-Level Fan-Out/Fan-In

For custom concurrent patterns, use add_fan_out_edge and add_fan_in_edge:
Python
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler

class Dispatcher(Executor):
    @handler
    async def dispatch(self, task: dict, ctx: WorkflowContext[dict]) -> None:
        await ctx.send_message(task)

class Worker(Executor):
    def __init__(self, id: str, name: str):
        super().__init__(id=id)
        self.name = name
    
    @handler
    async def work(self, task: dict, ctx: WorkflowContext[dict]) -> None:
        result = {"worker": self.name, "result": f"Processed {task}"}
        await ctx.send_message(result)

class Aggregator(Executor):
    @handler
    async def aggregate(self, result: dict, ctx: WorkflowContext[Never, list]) -> None:
        # Collect results from all workers
        results = ctx.get_state("results", [])
        results.append(result)
        ctx.set_state("results", results)
        
        # When all workers complete, yield output
        if len(results) == 3:
            await ctx.yield_output(results)

dispatcher = Dispatcher(id="dispatcher")
workers = [Worker(id=f"worker_{i}", name=f"Worker {i}") for i in range(3)]
aggregator = Aggregator(id="aggregator")

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(dispatcher, workers)
    .add_fan_in_edge(workers, aggregator)
    .build()
)

Conditional Routing

Route messages to different executors based on conditions.

Conditional Edges

Python
class Router(Executor):
    @handler
    async def route(self, data: dict, ctx: WorkflowContext[dict]) -> None:
        await ctx.send_message(data)

def is_urgent(data: dict) -> bool:
    return data.get("priority") == "urgent"

def is_normal(data: dict) -> bool:
    return data.get("priority") == "normal"

workflow = (
    WorkflowBuilder(start_executor=router)
    .add_edge(router, urgent_handler, condition=is_urgent)
    .add_edge(router, normal_handler, condition=is_normal)
    .add_edge(router, default_handler, condition=lambda d: True)  # Catch-all
    .build()
)

Switch-Case Routing

For mutually exclusive routing (exactly one target):
Python
from agent_framework import Case, Default

def is_high_value(order: dict) -> bool:
    return order.get("total", 0) > 1000

def is_medium_value(order: dict) -> bool:
    total = order.get("total", 0)
    return 100 < total <= 1000

workflow = (
    WorkflowBuilder(start_executor=order_classifier)
    .add_switch_case_edge_group(
        order_classifier,
        [
            Case(condition=is_high_value, target=premium_handler),
            Case(condition=is_medium_value, target=standard_handler),
            Default(target=basic_handler),  # Fallback
        ]
    )
    .build()
)
Switch-case evaluates cases in order. The first matching condition routes the message. Always include a Default case.

Loop Patterns

Create feedback loops for iterative refinement.

Simple Loop

Python
from enum import Enum

class Signal(Enum):
    CONTINUE = "continue"
    DONE = "done"

class Generator(Executor):
    def __init__(self, id: str, max_attempts: int = 3):
        super().__init__(id=id)
        self.attempts = 0
        self.max_attempts = max_attempts
    
    @handler
    async def generate(
        self, 
        signal: Signal, 
        ctx: WorkflowContext[str, str]
    ) -> None:
        self.attempts += 1
        content = f"Attempt {self.attempts}: Generated content"
        
        if self.attempts >= self.max_attempts:
            await ctx.yield_output(content)
        else:
            await ctx.send_message(content)

class Reviewer(Executor):
    @handler
    async def review(self, content: str, ctx: WorkflowContext[Signal]) -> None:
        # Simple review logic
        if "good" in content.lower():
            await ctx.send_message(Signal.DONE)
        else:
            await ctx.send_message(Signal.CONTINUE)

generator = Generator(id="generator", max_attempts=5)
reviewer = Reviewer(id="reviewer")

workflow = (
    WorkflowBuilder(start_executor=generator)
    .add_edge(generator, reviewer)
    .add_edge(reviewer, generator)  # Loop back
    .build()
)

events = await workflow.run(Signal.CONTINUE)

Agent Feedback Loop

Real-world example with agents:
Python
writer = client.as_agent(
    name="writer",
    instructions="Write content based on feedback."
)

reviewer = client.as_agent(
    name="reviewer",
    instructions="Review content. Reply 'approved' or provide feedback."
)

class FeedbackRouter(Executor):
    @handler
    async def route(
        self, 
        response: AgentResponse, 
        ctx: WorkflowContext[AgentExecutorRequest | str, str]
    ) -> None:
        if "approved" in response.text.lower():
            await ctx.yield_output(response.text)
        else:
            # Send feedback back to writer
            await ctx.send_message(
                AgentExecutorRequest(
                    messages=[Message("user", text=f"Revise based on: {response.text}")],
                    should_respond=True
                ),
                target_id="writer"
            )

feedback_router = FeedbackRouter(id="feedback_router")

workflow = (
    WorkflowBuilder(start_executor=writer, max_iterations=10)
    .add_edge(writer, reviewer)
    .add_edge(reviewer, feedback_router)
    .add_edge(feedback_router, writer)  # Loop back
    .build()
)

Complex Graph Patterns

Combine patterns for sophisticated orchestration.

Multi-Stage Pipeline with Fan-Out

Python
# Stage 1: Initial processing
preprocessor = Preprocessor(id="preprocessor")

# Stage 2: Parallel analysis
sentiment_analyzer = SentimentAnalyzer(id="sentiment")
entity_extractor = EntityExtractor(id="entities")
topic_classifier = TopicClassifier(id="topics")

# Stage 3: Synthesis
synthesizer = Synthesizer(id="synthesizer")

workflow = (
    WorkflowBuilder(start_executor=preprocessor)
    # Fan out to parallel analyzers
    .add_fan_out_edge(
        preprocessor,
        [sentiment_analyzer, entity_extractor, topic_classifier]
    )
    # Fan in to synthesizer
    .add_fan_in_edge(
        [sentiment_analyzer, entity_extractor, topic_classifier],
        synthesizer
    )
    .build()
)

Conditional Fan-Out with Dynamic Routing

Python
def select_analyzers(task: dict, available: list[str]) -> list[str]:
    """Dynamically select analyzers based on task type."""
    task_type = task.get("type", "")
    
    if task_type == "text":
        return ["sentiment", "topics"]
    elif task_type == "image":
        return ["vision", "ocr"]
    return available  # Use all

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(
        dispatcher,
        [sentiment, topics, vision, ocr],
        selection_func=select_analyzers
    )
    .add_fan_in_edge([sentiment, topics, vision, ocr], aggregator)
    .build()
)

Decision Tree Pattern

Python
from agent_framework import Case, Default

# Root decision
classifier = Classifier(id="classifier")

# Category-specific sub-workflows
category_a_handler = CategoryAHandler(id="category_a")
category_b_handler = CategoryBHandler(id="category_b")
category_c_handler = CategoryCHandler(id="category_c")

# Sub-handlers for category A
a_priority_high = APriorityHigh(id="a_high")
a_priority_low = APriorityLow(id="a_low")

def is_category_a(data: dict) -> bool:
    return data.get("category") == "A"

def is_category_b(data: dict) -> bool:
    return data.get("category") == "B"

def is_high_priority(data: dict) -> bool:
    return data.get("priority") == "high"

workflow = (
    WorkflowBuilder(start_executor=classifier)
    # First level: category routing
    .add_switch_case_edge_group(
        classifier,
        [
            Case(condition=is_category_a, target=category_a_handler),
            Case(condition=is_category_b, target=category_b_handler),
            Default(target=category_c_handler),
        ]
    )
    # Second level: priority routing for category A
    .add_switch_case_edge_group(
        category_a_handler,
        [
            Case(condition=is_high_priority, target=a_priority_high),
            Default(target=a_priority_low),
        ]
    )
    .build()
)

Streaming and Events

Monitor orchestration progress in real-time:
Python
async for event in workflow.run(initial_input, stream=True):
    if event.type == "executor_invoked":
        print(f"→ {event.executor_id} started")
    elif event.type == "executor_completed":
        print(f"✓ {event.executor_id} completed")
    elif event.type == "superstep_completed":
        print(f"Super step {event.iteration} completed")
    elif event.type == "output":
        print(f"Output: {event.data}")

Best Practices

  • Use sequential for linear pipelines where order matters
  • Use concurrent when tasks are independent and can run in parallel
  • Use conditional when routing depends on message content
  • Use loops for iterative refinement with feedback
  • Always include a Default case in switch-case routing
  • Set max_iterations to prevent infinite loops
  • Add error handling in executors to gracefully handle failures
  • Use type hints for better error detection
  • Use concurrent execution when possible (parallelism)
  • Minimize state size - only store what’s needed
  • Use selective fan-out to reduce unnecessary work
  • Stream results for long-running workflows
  • Use descriptive executor IDs
  • Log important state transitions
  • Monitor workflow events for debugging
  • Use structured data for messages

Next Steps

Checkpoints

Save and restore workflow state for fault tolerance

Human-in-the-Loop

Integrate human decision points into workflows

Build docs developers (and LLMs) love