Skip to main content

Graph Workflow

The GraphWorkflow orchestrates agents using a Directed Acyclic Graph (DAG) structure, enabling complex workflows with dependencies, parallel branches, and convergence points. Ideal for sophisticated pipelines with intricate task relationships.

When to Use

  • Complex dependencies: Tasks with intricate dependency graphs
  • Parallel branches: Multiple independent paths that converge
  • Pipeline optimization: Maximize parallelism while respecting dependencies
  • Software builds: Compile, test, and deploy pipelines
  • Data processing: ETL workflows with multiple stages

Key Features

  • DAG-based execution with topological sorting
  • Automatic parallelization of independent nodes
  • Support for NetworkX and Rustworkx backends
  • Fan-out and fan-in patterns
  • Entry and exit point management
  • Graph visualization (with Graphviz)
  • Auto-compilation for performance
  • Graph validation and cycle detection

Basic Example

from swarms import Agent
from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge

# Define agents
data_collector = Agent(
    agent_name="DataCollector",
    system_prompt="Collect and validate data from sources.",
    model_name="gpt-4o-mini",
)

processor = Agent(
    agent_name="Processor",
    system_prompt="Process and clean collected data.",
    model_name="gpt-4o-mini",
)

analyzer = Agent(
    agent_name="Analyzer",
    system_prompt="Analyze processed data for insights.",
    model_name="gpt-4o-mini",
)

# Create graph workflow
workflow = GraphWorkflow(
    name="Data-Pipeline",
    description="ETL workflow for data processing",
)

# Add agents as nodes
workflow.add_node(data_collector)
workflow.add_node(processor)
workflow.add_node(analyzer)

# Define dependencies
workflow.add_edge("DataCollector", "Processor")  # Collector -> Processor
workflow.add_edge("Processor", "Analyzer")       # Processor -> Analyzer

# Compile and run
workflow.compile()
result = workflow.run("Process Q4 sales data")
print(result)

Creating from Spec

Simplified workflow creation:
from swarms.structs.graph_workflow import GraphWorkflow

# Define agents
agents = [collector, processor, analyzer, reporter]

# Define edges (dependencies)
edges = [
    ("DataCollector", "Processor"),
    ("Processor", "Analyzer"),
    ("Analyzer", "Reporter"),
]

# Create workflow
workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    task="Process data",
)

result = workflow.run()

Advanced Edge Patterns

Fan-Out Pattern

One node distributes to multiple nodes:
# Single edge to multiple targets
workflow.add_edges_from_source(
    source="DataCollector",
    targets=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)

# Equivalent to:
# DataCollector -> TechnicalAnalyst
# DataCollector -> FundamentalAnalyst
# DataCollector -> SentimentAnalyst
# (All three analysts run in parallel)

Fan-In Pattern

Multiple nodes converge to one:
# Multiple sources to single target
workflow.add_edges_to_target(
    sources=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
    target="SynthesisAgent"
)

# Equivalent to:
# TechnicalAnalyst -> SynthesisAgent
# FundamentalAnalyst -> SynthesisAgent
# SentimentAnalyst -> SynthesisAgent
# (Synthesis waits for all three)

Parallel Chain Pattern

Full mesh connection:
# Multiple sources to multiple targets
workflow.add_parallel_chain(
    sources=["Collector1", "Collector2"],
    targets=["Analyst1", "Analyst2", "Analyst3"]
)

# Creates:
# Collector1 -> Analyst1, Analyst2, Analyst3
# Collector2 -> Analyst1, Analyst2, Analyst3

Tuple-Based Patterns

Simplified edge definitions:
edges = [
    # Simple edge
    ("agent1", "agent2"),
    
    # Fan-out
    ("agent1", ["agent2", "agent3"]),
    
    # Fan-in
    (["agent1", "agent2"], "agent3"),
    
    # Parallel chain
    (["agent1", "agent2"], ["agent3", "agent4"]),
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
)

Key Parameters

name
str
Name for the workflow
description
str
Description of the workflow’s purpose
nodes
Dict[str, Node]
Dictionary of nodes (agents)
edges
List[Edge]
List of edges (dependencies)
entry_points
List[str]
Node IDs with no predecessors (auto-detected if not set)
end_points
List[str]
Node IDs with no successors (auto-detected if not set)
max_loops
int
default:1
Maximum execution loops
auto_compile
bool
default:true
Automatically compile on initialization
backend
str
default:"networkx"
Graph backend (“networkx” or “rustworkx”)
verbose
bool
default:false
Enable verbose logging

Methods

add_node()

Add an agent to the graph:
workflow.add_node(
    agent=my_agent,
    metadata={"priority": "high"}  # Optional metadata
)

add_nodes()

Add multiple agents concurrently:
workflow.add_nodes(
    agents=[agent1, agent2, agent3],
    batch_size=10,
)

add_edge()

Add a dependency between nodes:
# Using agent objects
workflow.add_edge(source_agent, target_agent)

# Using node IDs
workflow.add_edge("Agent1", "Agent2")

# Using Edge object
from swarms.structs.graph_workflow import Edge
edge = Edge(source="Agent1", target="Agent2", metadata={"type": "data"})
workflow.add_edge(edge)

compile()

Pre-compute expensive operations:
# Manual compilation
workflow.compile()

# Auto-compilation (default)
workflow = GraphWorkflow(auto_compile=True)
Compilation:
  • Auto-sets entry/exit points
  • Computes topological layers
  • Caches for performance
  • Validates graph structure

run()

Execute the workflow:
result = workflow.run(
    task="Process data pipeline",
    img=None,  # Optional image input
)

Use Cases

Software Build Pipeline

compile_agent = Agent(agent_name="Compiler", ...)
test_agent = Agent(agent_name="Tester", ...)
lint_agent = Agent(agent_name="Linter", ...)
security_agent = Agent(agent_name="SecurityScanner", ...)
package_agent = Agent(agent_name="Packager", ...)
deploy_agent = Agent(agent_name="Deployer", ...)

build_pipeline = GraphWorkflow(name="CI-CD-Pipeline")

# Add all nodes
for agent in [compile_agent, test_agent, lint_agent, security_agent, package_agent, deploy_agent]:
    build_pipeline.add_node(agent)

# Define dependencies
build_pipeline.add_edge("Compiler", "Tester")
build_pipeline.add_edges_from_source("Compiler", ["Linter", "SecurityScanner"])
build_pipeline.add_edges_to_target(["Tester", "Linter", "SecurityScanner"], "Packager")
build_pipeline.add_edge("Packager", "Deployer")

build_pipeline.compile()
result = build_pipeline.run("Build and deploy version 2.0")

Data Science Pipeline

edges = [
    # Data collection phase
    ("APICollector", "DataValidator"),
    ("DatabaseCollector", "DataValidator"),
    
    # Parallel processing
    ("DataValidator", ["Cleaner", "Transformer", "FeatureEngineer"]),
    
    # Convergence for modeling
    (["Cleaner", "Transformer", "FeatureEngineer"], "ModelTrainer"),
    
    # Evaluation and deployment
    ("ModelTrainer", "ModelEvaluator"),
    ("ModelEvaluator", "Deployer"),
]

ml_pipeline = GraphWorkflow.from_spec(
    agents=[api_collector, db_collector, validator, cleaner, transformer, 
            feature_eng, trainer, evaluator, deployer],
    edges=edges,
)

result = ml_pipeline.run("Train and deploy customer churn model")

Content Creation Workflow

# Diamond pattern: Research -> (Write, Design) -> Review
edges = [
    ("Researcher", ["Writer", "Designer"]),
    (["Writer", "Designer"], "Reviewer"),
    ("Reviewer", "Publisher"),
]

content_workflow = GraphWorkflow.from_spec(
    agents=[researcher, writer, designer, reviewer, publisher],
    edges=edges,
)

result = content_workflow.run("Create marketing campaign for product launch")

Entry and Exit Points

Auto-Detection

workflow = GraphWorkflow()
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edge("agent1", "agent2")
workflow.add_edge("agent2", "agent3")

workflow.compile()

# Automatically detects:
# - entry_points: ["agent1"] (no incoming edges)
# - end_points: ["agent3"] (no outgoing edges)

Manual Setting

workflow.set_entry_points(["StartAgent"])
workflow.set_end_points(["FinalAgent"])

Backend Selection

NetworkX (Default)

workflow = GraphWorkflow(backend="networkx")
Benefits:
  • Pure Python
  • Rich ecosystem
  • Extensive algorithms
  • Easy debugging

Rustworkx (Performance)

workflow = GraphWorkflow(backend="rustworkx")
Benefits:
  • Rust-based performance
  • Faster graph operations
  • Lower memory usage
  • Better for large graphs
Requires: pip install rustworkx

Topological Execution

The workflow executes in topological layers:
# Graph:
# A -> B, C
# B -> D
# C -> D

# Execution layers:
# Layer 0: [A]
# Layer 1: [B, C]  (parallel)
# Layer 2: [D]
Parallel execution within layers using ThreadPoolExecutor.

Graph Validation

Cycle Detection

try:
    workflow.add_edge("A", "B")
    workflow.add_edge("B", "C")
    workflow.add_edge("C", "A")  # Creates cycle
    workflow.compile()
except ValueError as e:
    print("Cycle detected!")

Dependency Validation

# Validates:
# - All referenced nodes exist
# - No self-loops
# - DAG structure (no cycles)
# - Entry points exist
# - End points exist

Performance Optimization

Compilation Caching

# First run: compiles
result1 = workflow.run("Task 1")

# Subsequent runs: uses cache
result2 = workflow.run("Task 2")
result3 = workflow.run("Task 3")

# Manual recompilation (if graph changed)
workflow.compile()

Concurrent Node Addition

# Add many nodes efficiently
workflow.add_nodes(
    agents=agent_list,
    batch_size=10,  # Process in batches
)

Best Practices

Graph Design: Keep graphs acyclic - use multiple workflows for iterative processes
  1. Clear Dependencies: Only add edges for true dependencies
  2. Maximize Parallelism: Let independent nodes run concurrently
  3. Compilation: Always compile before running
  4. Entry/Exit Points: Let auto-detection work unless specific control needed
  5. Backend Choice: Use Rustworkx for large graphs (>100 nodes)
Graph compilation is cached - manually recompile if graph structure changes after initial compilation

Error Handling

try:
    workflow.add_edge("NonExistentAgent", "TargetAgent")
except ValueError as e:
    print(f"Invalid edge: {e}")
    # Source or target node doesn't exist

try:
    workflow.compile()
except Exception as e:
    print(f"Compilation failed: {e}")
    # Cycle detected or invalid graph structure

Visualization

With Graphviz installed:
# Requires: pip install graphviz

# Visualize workflow structure
# (Implementation depends on GraphWorkflow visualization methods)

Build docs developers (and LLMs) love