Skip to main content
GraphFlow orchestration uses directed graphs to define complex agent workflows with conditional logic, parallel execution, and sophisticated control flow.

What You’ll Learn

  • How to design graph-based workflows
  • Conditional agent transitions
  • Parallel agent execution
  • State management across agents
  • Building complex orchestration patterns

Prerequisites

1

Install AutoGen

pip install -U "autogen-agentchat" "autogen-ext[openai]"
2

Set your OpenAI API key

export OPENAI_API_KEY="sk-..."

What is GraphFlow?

GraphFlow is an orchestration pattern where:
  • Agents are nodes in a directed graph
  • Edges define possible transitions between agents
  • Conditions determine which path to take
  • Multiple paths can execute in parallel
  • State flows through the graph

Basic Graph Workflow

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import BaseGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient


class WorkflowGraph(BaseGroupChat):
    """Custom graph-based workflow."""
    
    def __init__(self, participants, model_client, workflow_graph):
        super().__init__(participants)
        self._model_client = model_client
        self._graph = workflow_graph
        self._current_agent = None
    
    async def select_speaker(self, thread):
        """Select next agent based on graph transitions."""
        if self._current_agent is None:
            # Start with first agent
            self._current_agent = self._graph["start"]
        else:
            # Use LLM to decide next transition
            last_message = thread[-1] if thread else None
            transitions = self._graph.get(self._current_agent, [])
            
            if not transitions:
                return None  # End of workflow
            
            # Select next agent based on context
            self._current_agent = await self._select_transition(
                last_message, transitions
            )
        
        return self._current_agent
    
    async def _select_transition(self, message, transitions):
        """Use LLM to select best transition."""
        # Implementation details...
        return transitions[0]  # Simplified


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Define agents
    analyzer = AssistantAgent(
        "analyzer",
        model_client=model_client,
        system_message="Analyze the problem and identify key requirements.",
    )

    designer = AssistantAgent(
        "designer",
        model_client=model_client,
        system_message="Design a solution based on requirements.",
    )

    implementer = AssistantAgent(
        "implementer",
        model_client=model_client,
        system_message="Implement the designed solution.",
    )

    tester = AssistantAgent(
        "tester",
        model_client=model_client,
        system_message="Test the implementation and report issues.",
    )

    # Define workflow graph
    workflow = {
        "start": "analyzer",
        "analyzer": ["designer"],
        "designer": ["implementer"],
        "implementer": ["tester"],
        "tester": ["implementer", None],  # Can loop back or end
    }

    # Create workflow
    team = WorkflowGraph(
        participants=[analyzer, designer, implementer, tester],
        model_client=model_client,
        workflow_graph=workflow,
    )

    await Console(
        team.run_stream(
            task="Build a function to validate email addresses"
        )
    )

    await model_client.close()


asyncio.run(main())

Conditional Workflow

This example shows conditional branching based on agent output:
import asyncio
from typing import Dict, List, Optional
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import BaseGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient


class ConditionalWorkflow(BaseGroupChat):
    """Workflow with conditional branching."""
    
    def __init__(self, participants, transitions, model_client):
        super().__init__(participants)
        self._transitions = transitions
        self._model_client = model_client
        self._current = "start"
    
    async def select_speaker(self, thread):
        if self._current not in self._transitions:
            return None
        
        next_options = self._transitions[self._current]
        
        if callable(next_options):
            # Dynamic selection based on last message
            last_msg = thread[-1].content if thread else ""
            self._current = next_options(last_msg)
        else:
            self._current = next_options
        
        return self._current


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Create agents
    classifier = AssistantAgent(
        "classifier",
        model_client=model_client,
        system_message="""Classify the request type: 'bug', 'feature', or 'question'.
        Respond with just the classification.""",
    )

    bug_handler = AssistantAgent(
        "bug_handler",
        model_client=model_client,
        system_message="You handle bug reports. Analyze and provide fix suggestions.",
    )

    feature_handler = AssistantAgent(
        "feature_handler",
        model_client=model_client,
        system_message="You handle feature requests. Analyze feasibility and design.",
    )

    qa_handler = AssistantAgent(
        "qa_handler",
        model_client=model_client,
        system_message="You answer questions. Provide clear, helpful information.",
    )

    # Define conditional transitions
    def route_by_classification(message: str) -> Optional[str]:
        message_lower = message.lower()
        if "bug" in message_lower:
            return "bug_handler"
        elif "feature" in message_lower:
            return "feature_handler"
        elif "question" in message_lower:
            return "qa_handler"
        return None

    transitions = {
        "start": "classifier",
        "classifier": route_by_classification,
        "bug_handler": None,
        "feature_handler": None,
        "qa_handler": None,
    }

    team = ConditionalWorkflow(
        participants=[classifier, bug_handler, feature_handler, qa_handler],
        transitions=transitions,
        model_client=model_client,
    )

    await Console(
        team.run_stream(
            task="I found a bug where the login page crashes on mobile devices"
        )
    )

    await model_client.close()


asyncio.run(main())

Parallel Execution

Execute multiple agents in parallel:
import asyncio
from typing import List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def parallel_agent_execution(
    agents: List[AssistantAgent],
    task: str
) -> List[TextMessage]:
    """Execute multiple agents in parallel on the same task."""
    tasks = [agent.run(task=task) for agent in agents]
    results = await asyncio.gather(*tasks)
    return [result.messages[-1] for result in results]


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Create specialized analyzers
    security_analyzer = AssistantAgent(
        "security_analyst",
        model_client=model_client,
        system_message="Analyze security implications.",
    )

    performance_analyzer = AssistantAgent(
        "performance_analyst",
        model_client=model_client,
        system_message="Analyze performance implications.",
    )

    cost_analyzer = AssistantAgent(
        "cost_analyst",
        model_client=model_client,
        system_message="Analyze cost implications.",
    )

    # Run parallel analysis
    task = "We're considering migrating to a microservices architecture."
    
    print("Running parallel analysis...")
    results = await parallel_agent_execution(
        [security_analyzer, performance_analyzer, cost_analyzer],
        task
    )

    print("\nResults:")
    for result in results:
        print(f"\n{result.source}: {result.content}")

    # Synthesizer combines results
    synthesizer = AssistantAgent(
        "synthesizer",
        model_client=model_client,
        system_message="Synthesize multiple analyses into recommendations.",
    )

    synthesis_task = f"""Synthesize these analyses:
    
    Security: {results[0].content}
    Performance: {results[1].content}
    Cost: {results[2].content}
    
    Provide unified recommendations."""

    final = await synthesizer.run(task=synthesis_task)
    print(f"\nFinal Recommendation:\n{final.messages[-1].content}")

    await model_client.close()


asyncio.run(main())

Key Concepts

Graph Structure

Defines the workflow as nodes (agents) and edges (transitions).

Conditional Routing

Choose next agent based on current state or output.

Parallel Execution

Run multiple agents simultaneously for efficiency.

State Management

Maintain context as it flows through the graph.

Workflow Patterns

Sequential Pipeline

Input → Agent A → Agent B → Agent C → Output

Conditional Branching

Input → Classifier → Bug Handler
                   → Feature Handler
                   → QA Handler

Loop with Exit

Develop → Test → [Pass: Exit, Fail: Develop]

Parallel-Merge

        → Agent A →
Input →→ Agent B →→ Synthesizer → Output
        → Agent C →

Best Practices

  1. Clear Graph Design: Document the workflow before implementation
  2. Error Handling: Define what happens when agents fail
  3. Termination Conditions: Prevent infinite loops
  4. State Validation: Ensure state is valid at each transition
  5. Monitoring: Log transitions for debugging

Visualization

Visualize your workflow:
import graphviz

def visualize_workflow(graph: dict, filename: str = "workflow"):
    """Generate workflow diagram."""
    dot = graphviz.Digraph(comment='Agent Workflow')
    
    for node, edges in graph.items():
        dot.node(node, node)
        if isinstance(edges, list):
            for edge in edges:
                if edge:
                    dot.edge(node, edge)
        elif edges:
            dot.edge(node, edges)
    
    dot.render(filename, format='png', view=True)

# Usage
workflow = {
    "analyzer": ["designer"],
    "designer": ["implementer"],
    "implementer": ["tester"],
    "tester": ["implementer", None],
}

visualize_workflow(workflow)

Troubleshooting

Infinite Loops

Add max iteration limit:
class WorkflowGraph(BaseGroupChat):
    def __init__(self, *args, max_iterations=50, **kwargs):
        super().__init__(*args, **kwargs)
        self._max_iterations = max_iterations
        self._iterations = 0
    
    async def select_speaker(self, thread):
        self._iterations += 1
        if self._iterations >= self._max_iterations:
            return None
        # ... rest of selection logic

State Loss

Preserve state between transitions:
class StatefulWorkflow(BaseGroupChat):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._state = {}
    
    def update_state(self, key, value):
        self._state[key] = value
    
    def get_state(self, key):
        return self._state.get(key)

Next Steps

Research Assistant

Build a research assistant with complex workflows

Data Analysis

Create data analysis pipelines

Build docs developers (and LLMs) love