Skip to main content

What are Workflows?

Workflows in Microsoft Agent Framework enable you to build complex, graph-based orchestrations of agents and executors. Unlike simple sequential chains, workflows provide:
  • Graph-based execution: Define nodes (executors) and edges to create flexible, non-linear execution flows
  • Type-safe message passing: Executors communicate via strongly-typed messages
  • Built-in orchestration: Support for sequential, concurrent, fan-out/fan-in, and conditional routing patterns
  • State management: Maintain workflow state across executions
  • Checkpointing: Save and restore workflow state for long-running processes
  • Human-in-the-loop: Pause execution to collect input and resume seamlessly

Core Concepts

Executors

Executors are the building blocks of workflows. Each executor represents a unit of work that processes input and produces output.
from agent_framework import Executor, WorkflowContext, handler

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert input to uppercase and forward it."""
        result = text.upper()
        await ctx.send_message(result)

Edges

Edges connect executors, defining how data flows through the workflow. Edges can be:
  • Simple: Direct connections between two executors
  • Conditional: Route based on message content
  • Fan-out: Send to multiple targets in parallel
  • Fan-in: Aggregate from multiple sources
  • Switch-case: Route to one of many targets based on conditions

WorkflowBuilder

The WorkflowBuilder provides a fluent API for constructing workflows:
from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(start_executor=upper_case)
    .add_edge(upper_case, reverse_text)
    .build()
)

# Run the workflow
events = await workflow.run("hello world")
print(events.get_outputs())  # ['DLROW OLLEH']

Workflow Execution Model

Workflows execute in super steps:
  1. Super step: A stage where one or more executors run in parallel
  2. Message passing: Executors send messages to downstream executors
  3. Next super step: All messages from the previous step trigger the next set of executors
  4. Completion: Workflow completes when idle (no pending work)
Workflow execution model showing super steps

Function-based Executors

For simple operations, you can define executors as functions:
from agent_framework import executor, WorkflowContext
from typing_extensions import Never

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input string and yield output."""
    result = text[::-1]
    await ctx.yield_output(result)

Type System

Workflows use a rich type system for message passing:
  • Input types: Types an executor can handle (from @handler parameter)
  • Output types: Types an executor can send via ctx.send_message()
  • Workflow output types: Types an executor can yield via ctx.yield_output()
from agent_framework import WorkflowContext

class MyExecutor(Executor):
    @handler
    async def process(
        self, 
        message: str,  # Input type: str
        ctx: WorkflowContext[int, bool]  # Output: int, Workflow output: bool
    ) -> None:
        await ctx.send_message(42)      # Send int to downstream executor
        await ctx.yield_output(True)    # Yield bool as workflow output

Agents in Workflows

Agents can be used directly as executors in workflows:
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient

client = AzureOpenAIResponsesClient(...)

writer = client.as_agent(
    name="writer",
    instructions="You create content."
)

reviewer = client.as_agent(
    name="reviewer",
    instructions="You review content."
)

workflow = (
    WorkflowBuilder(start_executor=writer)
    .add_edge(writer, reviewer)
    .build()
)

events = await workflow.run("Create a slogan for an electric SUV")

Next Steps

Building Workflows

Learn how to construct workflows with the WorkflowBuilder API

Orchestration Patterns

Explore sequential, concurrent, and conditional execution patterns

Checkpoints

Save and restore workflow state for long-running processes

Human-in-the-Loop

Integrate human decision points into your workflows

Common Patterns

Sequential Pipeline

# Process data through a series of transformations
workflow = (
    WorkflowBuilder(start_executor=fetch_data)
    .add_edge(fetch_data, validate)
    .add_edge(validate, transform)
    .add_edge(transform, store)
    .build()
)

Feedback Loop

# Iterate until a condition is met
workflow = (
    WorkflowBuilder(start_executor=generate)
    .add_edge(generate, review)
    .add_edge(review, generate)  # Loop back for revisions
    .build()
)

Parallel Processing

# Process with multiple agents concurrently
from agent_framework.orchestrations import ConcurrentBuilder

workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal]
).build()
Workflows execute until idle - when no executors have pending work. Use ctx.yield_output() to produce final results.

Build docs developers (and LLMs) love