Skip to main content

Overview

The GraphWorkflow class represents a workflow graph where each node is an agent. It provides sophisticated capabilities for building, executing, and visualizing Directed Acyclic Graph (DAG) workflows with automatic parallel execution, topological sorting, and compilation optimization.

Key Features

  • DAG-Based Workflows: Build complex directed acyclic graphs of agent execution
  • Automatic Parallelization: Automatically execute independent agents in parallel
  • Topological Execution: Agents execute in topologically sorted layers
  • Compilation Optimization: Pre-compute execution plans for faster multi-loop execution
  • Multiple Graph Backends: Support for NetworkX and Rustworkx backends
  • Visualization: Generate visual representations of workflows using Graphviz
  • Fan-Out/Fan-In Patterns: Easy creation of parallel processing patterns
  • JSON Serialization: Save and load workflows to/from JSON
  • Async Support: Asynchronous execution for non-blocking operations
  • Cycle Detection: Automatic detection and reporting of cycles

Installation

pip install -U swarms

# For visualization support
pip install graphviz

# For Rustworkx backend (optional, faster for large graphs)
pip install rustworkx

Class Definition

class GraphWorkflow:
    def __init__(
        self,
        id: Optional[str] = str(uuid.uuid4()),
        name: Optional[str] = "Graph-Workflow-01",
        description: Optional[str] = "A customizable workflow system for orchestrating and coordinating multiple agents.",
        nodes: Optional[Dict[str, Node]] = None,
        edges: Optional[List[Edge]] = None,
        entry_points: Optional[List[str]] = None,
        end_points: Optional[List[str]] = None,
        max_loops: int = 1,
        task: Optional[str] = None,
        auto_compile: bool = True,
        verbose: bool = False,
        backend: str = "networkx",
    )

Parameters

id
Optional[str]
default:"uuid.uuid4()"
Unique identifier for the workflow. Auto-generated UUID if not provided.
name
Optional[str]
default:"Graph-Workflow-01"
Human-readable name for the workflow
description
Optional[str]
default:"A customizable workflow system..."
Description of the workflow’s purpose
nodes
Optional[Dict[str, Node]]
default:"None"
Dictionary of nodes in the graph, where the key is the node ID and the value is the Node object
edges
Optional[List[Edge]]
default:"None"
List of edges in the graph, where each edge is represented by an Edge object
entry_points
Optional[List[str]]
default:"None"
List of node IDs that serve as entry points to the graph. Auto-detected if not provided.
end_points
Optional[List[str]]
default:"None"
List of node IDs that serve as end points of the graph. Auto-detected if not provided.
max_loops
int
default:"1"
Maximum number of times to execute the workflow
task
Optional[str]
default:"None"
The task to be executed by the workflow
auto_compile
bool
default:"True"
Whether to automatically compile the graph for optimization
verbose
bool
default:"False"
Whether to enable verbose logging
backend
str
default:"networkx"
Graph backend to use. Options: “networkx”, “rustworkx” (Rustworkx is faster for large graphs)

Methods

Graph Construction

add_node(agent, **kwargs)

Add an agent node to the workflow graph.
agent
Agent
required
The agent to add as a node
**kwargs
Any
Additional keyword arguments for the node

add_nodes(agents, batch_size=10, **kwargs)

Add multiple agents to the workflow graph concurrently in batches.
agents
List[Agent]
required
List of agents to add
batch_size
int
default:"10"
Number of agents to add concurrently in a batch

add_edge(edge_or_source, target=None, **kwargs)

Add an edge by Edge object or by passing node objects/ids.
edge_or_source
Union[Edge, Node, Agent, str]
required
Either an Edge object or the source node/id
target
Optional[Union[Node, Agent, str]]
Target node/id (required if edge_or_source is not an Edge)

add_edges_from_source(source, targets, **kwargs)

Add multiple edges from a single source to multiple targets (fan-out pattern).
source
Union[Node, Agent, str]
required
Source node/id that will send output to multiple targets
targets
List[Union[Node, Agent, str]]
required
List of target node/ids that will receive the source output in parallel
edges
List[Edge]
List of created Edge objects

add_edges_to_target(sources, target, **kwargs)

Add multiple edges from multiple sources to a single target (fan-in pattern).
sources
List[Union[Node, Agent, str]]
required
List of source node/ids that will send output to the target
target
Union[Node, Agent, str]
required
Target node/id that will receive all source outputs
edges
List[Edge]
List of created Edge objects

add_parallel_chain(sources, targets, **kwargs)

Create a parallel processing chain (full mesh connection).
sources
List[Union[Node, Agent, str]]
required
List of source node/ids
targets
List[Union[Node, Agent, str]]
required
List of target node/ids
edges
List[Edge]
List of created Edge objects

Execution

run(task=None, img=None, *args, **kwargs)

Run the workflow graph with optimized parallel agent execution.
task
Optional[str]
Task to execute. Uses self.task if not provided.
img
Optional[str]
Optional image path for multimodal tasks
results
Dict[str, Any]
Execution results from all nodes

arun(task=None, *args, **kwargs)

Async version of run for better performance with I/O bound operations.
task
Optional[str]
Task to execute. Uses self.task if not provided.
results
Dict[str, Any]
Execution results from all nodes

Compilation and Optimization

compile()

Pre-compute expensive operations for faster execution. Results are cached.

get_compilation_status()

Get detailed compilation status information.
status
Dict[str, Any]
Compilation status including cache state, timestamps, and performance metrics

Entry/Exit Points

set_entry_points(entry_points)

Set the entry points for the workflow.
entry_points
List[str]
required
List of node IDs to serve as entry points

set_end_points(end_points)

Set the end points for the workflow.
end_points
List[str]
required
List of node IDs to serve as end points

auto_set_entry_points()

Automatically set entry points to nodes with no incoming edges.

auto_set_end_points()

Automatically set end points to nodes with no outgoing edges.

Visualization

visualize(format="png", view=True, engine="dot", show_summary=False)

Visualize the workflow graph using Graphviz.
format
str
default:"png"
Output format: ‘png’, ‘svg’, ‘pdf’, ‘dot’
view
bool
default:"True"
Whether to open the visualization after creation
engine
str
default:"dot"
Graphviz layout engine: ‘dot’, ‘neato’, ‘fdp’, ‘sfdp’, ‘twopi’, ‘circo’
show_summary
bool
default:"False"
Whether to print parallel processing summary
filepath
str
Path to the generated visualization file

visualize_simple()

Simple text-based visualization for environments without Graphviz.
visualization
str
Text representation of the workflow

Serialization

to_json(fast=True, include_conversation=False, include_runtime_state=False)

Serialize the workflow to JSON.
fast
bool
default:"True"
Whether to use fast JSON serialization
include_conversation
bool
default:"False"
Whether to include conversation history
include_runtime_state
bool
default:"False"
Whether to include runtime state like compilation info
json_string
str
JSON representation of the workflow

from_json(json_str, restore_runtime_state=False) (classmethod)

Deserialize a workflow from JSON.
json_str
str
required
JSON string representation of the workflow
restore_runtime_state
bool
default:"False"
Whether to restore runtime state
workflow
GraphWorkflow
A new GraphWorkflow instance

save_to_file(filepath, include_conversation=False, include_runtime_state=False, overwrite=False)

Save the workflow to a JSON file.
filepath
str
required
Path to save the JSON file
overwrite
bool
default:"False"
Whether to overwrite existing files
filepath
str
Path to the saved file

load_from_file(filepath, restore_runtime_state=False) (classmethod)

Load a workflow from a JSON file.
filepath
str
required
Path to the JSON file
workflow
GraphWorkflow
Loaded workflow instance

Validation

validate(auto_fix=False)

Validate the workflow structure.
auto_fix
bool
default:"False"
Whether to automatically fix simple issues
validation_result
Dict[str, Any]
Dictionary containing validation results, including validity, warnings and errors

Factory Method

from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) (classmethod)

Construct a workflow from a list of agents and connections.
agents
List[Union[Agent, Node]]
required
List of agents or Node objects
edges
List[Union[Edge, Tuple[Any, Any]]]
required
List of edges or edge tuples. Supports fan-out, fan-in, and parallel chain patterns.
entry_points
Optional[List[str]]
List of entry point node IDs
end_points
Optional[List[str]]
List of end point node IDs
task
Optional[str]
Task to be executed by the workflow
workflow
GraphWorkflow
A new GraphWorkflow instance

Usage Examples

Basic Sequential Graph

from swarms import Agent, GraphWorkflow
from swarms.models import OpenAIChat

# Initialize LLM
llm = OpenAIChat(model_name="gpt-4o")

# Create agents
researcher = Agent(
    agent_name="Researcher",
    llm=llm,
    system_prompt="Research and gather information"
)

analyzer = Agent(
    agent_name="Analyzer",
    llm=llm,
    system_prompt="Analyze the research findings"
)

writer = Agent(
    agent_name="Writer",
    llm=llm,
    system_prompt="Write based on analysis"
)

# Create workflow
workflow = GraphWorkflow(
    name="Research-Analyze-Write",
    description="Sequential research workflow"
)

# Add nodes
workflow.add_node(researcher)
workflow.add_node(analyzer)
workflow.add_node(writer)

# Add edges
workflow.add_edge("Researcher", "Analyzer")
workflow.add_edge("Analyzer", "Writer")

# Compile and run
workflow.compile()
results = workflow.run("Research AI trends in healthcare")

Fan-Out Pattern (Parallel Specialists)

# Create multiple specialist agents
technical = Agent(agent_name="Technical", llm=llm, system_prompt="Technical analysis")
business = Agent(agent_name="Business", llm=llm, system_prompt="Business analysis")
user_exp = Agent(agent_name="UX", llm=llm, system_prompt="User experience analysis")
synthesizer = Agent(agent_name="Synthesizer", llm=llm, system_prompt="Synthesize all analyses")

workflow = GraphWorkflow(name="Multi-Perspective")

# Add all nodes
for agent in [researcher, technical, business, user_exp, synthesizer]:
    workflow.add_node(agent)

# Fan-out from researcher to specialists
workflow.add_edges_from_source(
    "Researcher",
    ["Technical", "Business", "UX"]
)

# Fan-in to synthesizer
workflow.add_edges_to_target(
    ["Technical", "Business", "UX"],
    "Synthesizer"
)

results = workflow.run("Analyze AI chatbot implementation")

Complex DAG Workflow

# Create a complex multi-stage workflow
data_collector = Agent(agent_name="DataCollector", llm=llm)
preprocessor = Agent(agent_name="Preprocessor", llm=llm)
model_a = Agent(agent_name="ModelA", llm=llm)
model_b = Agent(agent_name="ModelB", llm=llm)
model_c = Agent(agent_name="ModelC", llm=llm)
ensemble = Agent(agent_name="Ensemble", llm=llm)
validator = Agent(agent_name="Validator", llm=llm)

workflow = GraphWorkflow(name="ML-Pipeline")

# Add all nodes
agents = [data_collector, preprocessor, model_a, model_b, model_c, ensemble, validator]
workflow.add_nodes(agents, batch_size=5)

# Build DAG
workflow.add_edge("DataCollector", "Preprocessor")
workflow.add_edges_from_source("Preprocessor", ["ModelA", "ModelB", "ModelC"])
workflow.add_edges_to_target(["ModelA", "ModelB", "ModelC"], "Ensemble")
workflow.add_edge("Ensemble", "Validator")

# Visualize before running
workflow.visualize(show_summary=True)

# Execute
results = workflow.run("Predict customer churn")

Using from_spec for Quick Construction

# Quick workflow construction using from_spec
agents = [researcher, analyzer, writer]

edges = [
    ("Researcher", "Analyzer"),
    ("Analyzer", "Writer")
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    name="Quick-Workflow",
    task="Analyze quantum computing trends"
)

results = workflow.run()  # Uses task from initialization

Advanced Edge Patterns in from_spec

agents = [collector, analyst1, analyst2, analyst3, synthesizer]

edges = [
    # Simple edge
    ("Collector", "Analyst1"),
    
    # Fan-out
    ("Collector", ["Analyst2", "Analyst3"]),
    
    # Fan-in
    (["Analyst1", "Analyst2", "Analyst3"], "Synthesizer"),
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    verbose=True
)

Async Execution

import asyncio

async def run_workflow():
    workflow = GraphWorkflow(name="Async-Workflow")
    workflow.add_nodes([agent1, agent2, agent3])
    workflow.add_edge("agent1", "agent2")
    workflow.add_edge("agent2", "agent3")
    
    results = await workflow.arun("Process this task asynchronously")
    return results

results = asyncio.run(run_workflow())

Save and Load Workflows

# Save workflow
workflow.save_to_file(
    "my_workflow.json",
    include_conversation=True,
    include_runtime_state=True
)

# Load workflow
loaded_workflow = GraphWorkflow.load_from_file(
    "my_workflow.json",
    restore_runtime_state=True
)

# Continue execution
results = loaded_workflow.run("New task")

Using Rustworkx Backend

# Use Rustworkx for better performance on large graphs
workflow = GraphWorkflow(
    name="Large-Workflow",
    backend="rustworkx",  # Faster for large graphs
    verbose=True
)

# Add many nodes
workflow.add_nodes(list_of_100_agents, batch_size=20)

# Build complex graph
# ...

results = workflow.run("Complex task")

Workflow Validation

# Validate workflow before execution
validation = workflow.validate(auto_fix=True)

if validation["is_valid"]:
    print("Workflow is valid!")
    results = workflow.run(task)
else:
    print("Errors:", validation["errors"])
    print("Warnings:", validation["warnings"])

Compilation for Multi-Loop Execution

workflow = GraphWorkflow(
    name="Multi-Loop-Workflow",
    max_loops=10,  # Will run 10 times
    auto_compile=True  # Compilation cached for all loops
)

workflow.add_nodes([agent1, agent2, agent3])
workflow.add_edges_from_source("agent1", ["agent2", "agent3"])

# Compilation is cached and reused across all loops
results = workflow.run("Task to repeat 10 times")

# Check compilation status
status = workflow.get_compilation_status()
print(f"Compiled: {status['is_compiled']}")
print(f"Layers: {status['cached_layers_count']}")

Custom Visualization

# Generate different visualization formats
workflow.visualize(
    format="svg",
    engine="fdp",  # Force-directed layout
    view=False,
    show_summary=True
)

# Simple text visualization (no Graphviz needed)
text_viz = workflow.visualize_simple()
print(text_viz)

Node and Edge Classes

Node

from swarms.structs.graph_workflow import Node, NodeType

# Create node from agent
node = Node.from_agent(agent)

# Create node manually
node = Node(
    id="my-node",
    type=NodeType.AGENT,
    agent=agent,
    metadata={"custom_key": "custom_value"}
)

Edge

from swarms.structs.graph_workflow import Edge

# Create edge from nodes
edge = Edge.from_nodes(source_agent, target_agent)

# Create edge from IDs
edge = Edge(source="agent1", target="agent2")

# With metadata
edge = Edge(
    source="agent1",
    target="agent2",
    metadata={"weight": 1.0}
)

Best Practices

  1. Compilation: Always compile workflows before execution for optimal performance
  2. Visualization: Use visualize() to understand your workflow structure
  3. Validation: Validate workflows before production deployment
  4. Entry/Exit Points: Let the system auto-detect entry/exit points or set them explicitly
  5. Backend Selection: Use Rustworkx for large graphs (>100 nodes)
  6. Error Handling: Wrap execution in try-except blocks
  7. Batching: Use add_nodes() with appropriate batch sizes for many agents
  8. Caching: Enable auto_compile for multi-loop workflows
  9. Save Progress: Save complex workflows to JSON for reuse

Performance Characteristics

  • Parallel Execution: Independent agents in the same layer run concurrently
  • CPU Utilization: Uses ~95% of available CPU cores
  • Compilation: Pre-computes execution plan, cached for multi-loop runs
  • Memory: Each agent maintains its own context
  • Graph Backend: Rustworkx is faster for large graphs (>100 nodes)

Common Patterns

Research Pipeline

edges = [
    ("DataCollector", "Researcher"),
    ("Researcher", ["Analyst1", "Analyst2", "Analyst3"]),
    (["Analyst1", "Analyst2", "Analyst3"], "Synthesizer")
]

Ensemble Decision Making

edges = [
    ("Coordinator", ["Expert1", "Expert2", "Expert3", "Expert4"]),
    (["Expert1", "Expert2", "Expert3", "Expert4"], "Aggregator"),
    ("Aggregator", "DecisionMaker")
]

Multi-Stage Review

edges = [
    ("Creator", "TechnicalReviewer"),
    ("TechnicalReviewer", "BusinessReviewer"),
    ("BusinessReviewer", "LegalReviewer"),
    ("LegalReviewer", "FinalApprover")
]

Build docs developers (and LLMs) love