Skip to main content
Workflows enable you to orchestrate multiple agents and executors in a graph-based structure. They provide control flow, state management, and coordination for complex multi-step processes.

Core Concepts

Executors

Executors are the nodes in a workflow graph. They process inputs and produce outputs:
  • Function Executors: Standalone functions decorated with @executor
  • Agent Executors: Agents that can be used as workflow nodes
  • Custom Executors: Classes that inherit from Executor

Edges

Edges connect executors and define the flow of data:
  • Single Edge: One-to-one connection
  • Fan-Out: One executor sends to multiple downstream executors
  • Fan-In: Multiple executors send to one downstream executor
  • Conditional: Branching based on runtime conditions

Workflow Context

The WorkflowContext object provides methods to:
  • Send messages to downstream nodes
  • Yield workflow outputs
  • Access workflow state
  • Manage execution flow

Creating Executors

Function Executor with @executor

from agent_framework import executor, WorkflowContext
from typing_extensions import Never

@executor(id="process_text")
async def process_text(
    text: str,
    ctx: WorkflowContext[str]  # Sends str to downstream
) -> None:
    """Process text and forward result."""
    result = text.upper()
    await ctx.send_message(result)

@executor(id="save_result")
async def save_result(
    text: str,
    ctx: WorkflowContext[Never, str]  # Yields str as workflow output
) -> None:
    """Save result and yield output."""
    # Save to database
    await db.save(text)
    # Yield as workflow output
    await ctx.yield_output(text)

Class-Based Executor

from agent_framework import Executor, handler, WorkflowContext

class DataProcessor(Executor):
    def __init__(self, id: str, config: dict):
        super().__init__(id=id)
        self.config = config
    
    @handler
    async def process(
        self,
        data: dict,
        ctx: WorkflowContext[dict]
    ) -> None:
        """Process data according to configuration."""
        # Use self.config
        processed = {**data, "processed": True}
        await ctx.send_message(processed)

Agent Executor

Use agents directly as workflow nodes:
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

client = AzureOpenAIResponsesClient(credential=AzureCliCredential())

writer = client.as_agent(
    name="Writer",
    instructions="You are a content writer."
)

reviewer = client.as_agent(
    name="Reviewer",
    instructions="You are a content reviewer. Provide feedback."
)

# Agents are executors and can be connected with edges
workflow = WorkflowBuilder(start_executor=writer) \
    .add_edge(writer, reviewer) \
    .build()

Building Workflows

Simple Linear Workflow

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="step1")
async def step1(input: str, ctx: WorkflowContext[str]) -> None:
    result = input.upper()
    await ctx.send_message(result)

@executor(id="step2")
async def step2(input: str, ctx: WorkflowContext[str]) -> None:
    result = f"{input}!!!"
    await ctx.send_message(result)

@executor(id="step3")
async def step3(input: str, ctx: WorkflowContext[Never, str]) -> None:
    reversed_text = input[::-1]
    await ctx.yield_output(reversed_text)

# Build the workflow
workflow = WorkflowBuilder(start_executor=step1) \
    .add_edge(step1, step2) \
    .add_edge(step2, step3) \
    .build()

# Run the workflow
events = await workflow.run("hello")
print(events.get_outputs())  # ['!!!OLLEH']

Workflow with Agents

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
import os

client = AzureOpenAIResponsesClient(
    project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
    deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
    credential=AzureCliCredential()
)

writer = client.as_agent(
    instructions="You are a content writer. Create content based on requests.",
    name="Writer"
)

reviewer = client.as_agent(
    instructions="You are a content reviewer. Provide concise, actionable feedback.",
    name="Reviewer"
)

workflow = WorkflowBuilder(start_executor=writer) \
    .add_edge(writer, reviewer) \
    .build()

events = await workflow.run(
    "Create a slogan for an affordable electric SUV"
)

for output in events.get_outputs():
    print(f"{output.messages[0].author_name}: {output.text}")

Explicit Type Parameters

Specify input/output types explicitly on @handler:
class FlexibleProcessor(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    @handler(input=str | int, output=str)
    async def process(self, message, ctx) -> None:  # type: ignore
        """Process either string or int input.
        
        The input and output types are explicitly declared on @handler,
        so we don't need type hints on the function parameters.
        """
        result = f"Processed: {message}"
        await ctx.send_message(result)  # type: ignore

Running Workflows

Non-Streaming Execution

# Run and wait for completion
events = await workflow.run("initial input")

# Get all outputs
outputs = events.get_outputs()
print(outputs)

# Get final state
final_state = events.get_final_state()
print(final_state)  # WorkflowRunState.IDLE or COMPLETED

Streaming Execution

# Stream events as they occur
async for event in workflow.run("initial input", stream=True):
    if event.type == "executor_start":
        print(f"Starting: {event.executor_id}")
    elif event.type == "executor_output":
        print(f"Output from {event.executor_id}: {event.data}")
    elif event.type == "workflow_output":
        print(f"Final output: {event.data}")

Advanced Control Flow

Conditional Branching

from agent_framework import WorkflowBuilder, EdgeCondition, executor, WorkflowContext

@executor(id="classifier")
async def classify(text: str, ctx: WorkflowContext[dict]) -> None:
    """Classify text as urgent or normal."""
    is_urgent = "urgent" in text.lower()
    await ctx.send_message({
        "text": text,
        "is_urgent": is_urgent
    })

@executor(id="urgent_handler")
async def handle_urgent(data: dict, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"URGENT: {data['text']}")

@executor(id="normal_handler")
async def handle_normal(data: dict, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Normal: {data['text']}")

def is_urgent(data: dict) -> bool:
    return data.get("is_urgent", False)

workflow = WorkflowBuilder(start_executor=classify) \
    .add_edge(classify, urgent_handler, condition=is_urgent) \
    .add_edge(classify, normal_handler, condition=lambda d: not is_urgent(d)) \
    .build()

Fan-Out (Parallel Processing)

from agent_framework import WorkflowBuilder, executor, WorkflowContext

@executor(id="broadcast")
async def broadcast(text: str, ctx: WorkflowContext[str]) -> None:
    """Send input to multiple downstream executors."""
    await ctx.send_message(text)

@executor(id="process_a")
async def process_a(text: str, ctx: WorkflowContext[str]) -> None:
    result = f"A: {text.upper()}"
    await ctx.send_message(result)

@executor(id="process_b")
async def process_b(text: str, ctx: WorkflowContext[str]) -> None:
    result = f"B: {text.lower()}"
    await ctx.send_message(result)

@executor(id="collect")
async def collect(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Collect results from multiple sources."""
    await ctx.yield_output(text)

# Fan-out: broadcast sends to both process_a and process_b
# Fan-in: both send to collect
workflow = WorkflowBuilder(start_executor=broadcast) \
    .add_edge(broadcast, process_a) \
    .add_edge(broadcast, process_b) \
    .add_edge(process_a, collect) \
    .add_edge(process_b, collect) \
    .build()

events = await workflow.run("hello")
print(events.get_outputs())  # ['A: HELLO', 'B: hello']

State Management

Workflow State

Workflows maintain state across executions:
from agent_framework import Executor, handler, WorkflowContext

class Counter(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        self.count = 0
    
    @handler
    async def increment(
        self,
        message: str,
        ctx: WorkflowContext[dict]
    ) -> None:
        self.count += 1
        await ctx.send_message({
            "message": message,
            "count": self.count
        })

State Isolation

Create fresh workflows for each run:
def create_workflow():
    """Factory function ensures fresh state per workflow instance."""
    step1 = Step1Executor(id="step1")
    step2 = Step2Executor(id="step2")
    
    return WorkflowBuilder(start_executor=step1) \
        .add_edge(step1, step2) \
        .build()

# Each call gets a fresh workflow with clean state
workflow1 = create_workflow()
workflow2 = create_workflow()

await workflow1.run("input1")
await workflow2.run("input2")  # Independent state

Checkpointing

Save and resume workflow state:
from agent_framework import (
    WorkflowBuilder,
    FileCheckpointStorage,
    InMemoryCheckpointStorage
)

# File-based checkpointing
checkpoint_storage = FileCheckpointStorage("./checkpoints")

# Or in-memory
# checkpoint_storage = InMemoryCheckpointStorage()

workflow = WorkflowBuilder(start_executor=step1) \
    .add_edge(step1, step2) \
    .add_edge(step2, step3) \
    .set_checkpoint_storage(checkpoint_storage) \
    .build()

# Run with checkpointing
events = await workflow.run(
    "input",
    checkpoint_id="run_123"
)

# Resume from checkpoint
events = await workflow.resume(
    checkpoint_id="run_123"
)

Orchestration Patterns

Sequential Orchestration

from agent_framework import WorkflowBuilder

# Tasks execute in sequence: A -> B -> C
workflow = WorkflowBuilder(start_executor=task_a) \
    .add_edge(task_a, task_b) \
    .add_edge(task_b, task_c) \
    .build()

Concurrent Orchestration

# Tasks execute in parallel, then merge
workflow = WorkflowBuilder(start_executor=broadcast) \
    .add_edge(broadcast, task_a) \
    .add_edge(broadcast, task_b) \
    .add_edge(broadcast, task_c) \
    .add_edge(task_a, merge) \
    .add_edge(task_b, merge) \
    .add_edge(task_c, merge) \
    .build()

Human-in-the-Loop

from agent_framework import executor, WorkflowContext
from typing_extensions import Never

@executor(id="request_approval")
async def request_approval(
    data: dict,
    ctx: WorkflowContext[dict]
) -> None:
    """Request human approval before proceeding."""
    # Send approval request
    approval_needed = True
    
    if approval_needed:
        # Store state and wait
        ctx.session.state["pending_approval"] = data
        await ctx.send_message({"status": "awaiting_approval", "data": data})
    else:
        await ctx.send_message({"status": "approved", "data": data})

@executor(id="execute_action")
async def execute_action(
    data: dict,
    ctx: WorkflowContext[Never, str]
) -> None:
    if data["status"] == "approved":
        # Execute the action
        result = perform_action(data["data"])
        await ctx.yield_output(result)

Sub-Workflows

Embed workflows within other workflows:
from agent_framework import WorkflowExecutor

# Define a sub-workflow
sub_workflow = WorkflowBuilder(start_executor=sub_step1) \
    .add_edge(sub_step1, sub_step2) \
    .build()

# Wrap it in a WorkflowExecutor
sub_executor = WorkflowExecutor(
    id="sub_workflow",
    workflow=sub_workflow
)

# Use in parent workflow
parent_workflow = WorkflowBuilder(start_executor=main_step1) \
    .add_edge(main_step1, sub_executor) \
    .add_edge(sub_executor, main_step2) \
    .build()

Declarative Workflows

Define workflows using YAML:
workflow:
  name: content_pipeline
  start: writer
  
  executors:
    - id: writer
      type: agent
      instructions: You are a content writer.
      
    - id: reviewer
      type: agent
      instructions: You are a content reviewer.
  
  edges:
    - from: writer
      to: reviewer
Load and run:
from agent_framework.declarative import load_workflow_from_yaml

workflow = load_workflow_from_yaml("workflow.yaml", client=client)
events = await workflow.run("Create a blog post")

Observability

Event Types

async for event in workflow.run("input", stream=True):
    match event.type:
        case "workflow_start":
            print("Workflow started")
        case "executor_start":
            print(f"Executor {event.executor_id} started")
        case "executor_output":
            print(f"Output: {event.data}")
        case "executor_complete":
            print(f"Executor {event.executor_id} completed")
        case "workflow_output":
            print(f"Final output: {event.data}")
        case "workflow_complete":
            print("Workflow completed")
        case "error":
            print(f"Error: {event.error}")

Telemetry

Workflows automatically emit OpenTelemetry traces and metrics:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider

# Configure tracing
trace.set_tracer_provider(TracerProvider())

# Workflows will emit spans for each executor
events = await workflow.run("input")

Visualization

Visualize workflow graphs:
from agent_framework import WorkflowViz

# Create visualization
viz = WorkflowViz(workflow)

# Generate Mermaid diagram
mermaid_code = viz.to_mermaid()
print(mermaid_code)

# Save to file
viz.save_diagram("workflow.png")

Best Practices

Always wrap workflow creation in a function to ensure fresh state:
def create_workflow():
    return WorkflowBuilder(...).build()

# Each call gets fresh instances
workflow1 = create_workflow()
workflow2 = create_workflow()
Implement error handling in executors:
@executor(id="safe_processor")
async def process(data: dict, ctx: WorkflowContext[dict]) -> None:
    try:
        result = await risky_operation(data)
        await ctx.send_message(result)
    except Exception as e:
        # Log error and send error state
        await ctx.send_message({"error": str(e)})
Enable checkpointing for workflows that may need to resume:
workflow = WorkflowBuilder(...) \
    .set_checkpoint_storage(FileCheckpointStorage("./checkpoints")) \
    .build()
Identify independent tasks that can run concurrently to improve performance.
Each executor should have a single, well-defined responsibility. This makes workflows easier to understand and maintain.

API Reference

WorkflowBuilder

class WorkflowBuilder:
    def __init__(self, start_executor: Executor | AgentExecutor) -> None:
        """Initialize with the starting executor."""
    
    def add_edge(
        self,
        from_executor: Executor,
        to_executor: Executor,
        condition: EdgeCondition | None = None
    ) -> WorkflowBuilder:
        """Add an edge between executors."""
    
    def set_checkpoint_storage(
        self,
        storage: CheckpointStorage
    ) -> WorkflowBuilder:
        """Set checkpoint storage for resume capability."""
    
    def build(self) -> Workflow:
        """Build and validate the workflow."""

WorkflowContext

class WorkflowContext[T_Out, T_W_Out]:
    async def send_message(self, message: T_Out) -> None:
        """Send a message to downstream executors."""
    
    async def yield_output(self, output: T_W_Out) -> None:
        """Yield an output from the workflow."""
    
    @property
    def session(self) -> SessionContext:
        """Access workflow session data."""

Workflow

class Workflow:
    async def run(
        self,
        input: Any,
        *,
        stream: bool = False,
        checkpoint_id: str | None = None
    ) -> WorkflowRunResult | AsyncIterable[WorkflowEvent]:
        """Run the workflow."""
    
    async def resume(
        self,
        checkpoint_id: str
    ) -> WorkflowRunResult:
        """Resume from a checkpoint."""

Examples

Content Creation Pipeline

from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

client = AzureOpenAIResponsesClient(credential=AzureCliCredential())

# Create specialized agents
researcher = client.as_agent(
    name="Researcher",
    instructions="Research topics and gather information."
)

writer = client.as_agent(
    name="Writer",
    instructions="Write engaging content based on research."
)

editor = client.as_agent(
    name="Editor",
    instructions="Edit and improve content for clarity and style."
)

reviewer = client.as_agent(
    name="Reviewer",
    instructions="Review content and provide final approval."
)

# Build the pipeline
workflow = WorkflowBuilder(start_executor=researcher) \
    .add_edge(researcher, writer) \
    .add_edge(writer, editor) \
    .add_edge(editor, reviewer) \
    .build()

# Run the pipeline
events = await workflow.run(
    "Create a blog post about quantum computing"
)

for output in events.get_outputs():
    print(output.text)
  • Agents - Using agents in workflows
  • Middleware - Workflow-level middleware
  • Tools - Tools available to workflow agents

Build docs developers (and LLMs) love