Graph Workflow
The GraphWorkflow orchestrates agents using a Directed Acyclic Graph (DAG) structure, enabling complex workflows with dependencies, parallel branches, and convergence points. Ideal for sophisticated pipelines with intricate task relationships.
When to Use
- Complex dependencies: Tasks with intricate dependency graphs
- Parallel branches: Multiple independent paths that converge
- Pipeline optimization: Maximize parallelism while respecting dependencies
- Software builds: Compile, test, and deploy pipelines
- Data processing: ETL workflows with multiple stages
Key Features
- DAG-based execution with topological sorting
- Automatic parallelization of independent nodes
- Support for NetworkX and Rustworkx backends
- Fan-out and fan-in patterns
- Entry and exit point management
- Graph visualization (with Graphviz)
- Auto-compilation for performance
- Graph validation and cycle detection
Basic Example
from swarms import Agent
from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge
# Define agents
data_collector = Agent(
agent_name="DataCollector",
system_prompt="Collect and validate data from sources.",
model_name="gpt-4o-mini",
)
processor = Agent(
agent_name="Processor",
system_prompt="Process and clean collected data.",
model_name="gpt-4o-mini",
)
analyzer = Agent(
agent_name="Analyzer",
system_prompt="Analyze processed data for insights.",
model_name="gpt-4o-mini",
)
# Create graph workflow
workflow = GraphWorkflow(
name="Data-Pipeline",
description="ETL workflow for data processing",
)
# Add agents as nodes
workflow.add_node(data_collector)
workflow.add_node(processor)
workflow.add_node(analyzer)
# Define dependencies
workflow.add_edge("DataCollector", "Processor") # Collector -> Processor
workflow.add_edge("Processor", "Analyzer") # Processor -> Analyzer
# Compile and run
workflow.compile()
result = workflow.run("Process Q4 sales data")
print(result)
Creating from Spec
Simplified workflow creation:
from swarms.structs.graph_workflow import GraphWorkflow
# Define agents
agents = [collector, processor, analyzer, reporter]
# Define edges (dependencies)
edges = [
("DataCollector", "Processor"),
("Processor", "Analyzer"),
("Analyzer", "Reporter"),
]
# Create workflow
workflow = GraphWorkflow.from_spec(
agents=agents,
edges=edges,
task="Process data",
)
result = workflow.run()
Advanced Edge Patterns
Fan-Out Pattern
One node distributes to multiple nodes:
# Single edge to multiple targets
workflow.add_edges_from_source(
source="DataCollector",
targets=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)
# Equivalent to:
# DataCollector -> TechnicalAnalyst
# DataCollector -> FundamentalAnalyst
# DataCollector -> SentimentAnalyst
# (All three analysts run in parallel)
Fan-In Pattern
Multiple nodes converge to one:
# Multiple sources to single target
workflow.add_edges_to_target(
sources=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
target="SynthesisAgent"
)
# Equivalent to:
# TechnicalAnalyst -> SynthesisAgent
# FundamentalAnalyst -> SynthesisAgent
# SentimentAnalyst -> SynthesisAgent
# (Synthesis waits for all three)
Parallel Chain Pattern
Full mesh connection:
# Multiple sources to multiple targets
workflow.add_parallel_chain(
sources=["Collector1", "Collector2"],
targets=["Analyst1", "Analyst2", "Analyst3"]
)
# Creates:
# Collector1 -> Analyst1, Analyst2, Analyst3
# Collector2 -> Analyst1, Analyst2, Analyst3
Tuple-Based Patterns
Simplified edge definitions:
edges = [
# Simple edge
("agent1", "agent2"),
# Fan-out
("agent1", ["agent2", "agent3"]),
# Fan-in
(["agent1", "agent2"], "agent3"),
# Parallel chain
(["agent1", "agent2"], ["agent3", "agent4"]),
]
workflow = GraphWorkflow.from_spec(
agents=agents,
edges=edges,
)
Key Parameters
Description of the workflow’s purpose
Dictionary of nodes (agents)
List of edges (dependencies)
Node IDs with no predecessors (auto-detected if not set)
Node IDs with no successors (auto-detected if not set)
Automatically compile on initialization
Graph backend (“networkx” or “rustworkx”)
Methods
add_node()
Add an agent to the graph:
workflow.add_node(
agent=my_agent,
metadata={"priority": "high"} # Optional metadata
)
add_nodes()
Add multiple agents concurrently:
workflow.add_nodes(
agents=[agent1, agent2, agent3],
batch_size=10,
)
add_edge()
Add a dependency between nodes:
# Using agent objects
workflow.add_edge(source_agent, target_agent)
# Using node IDs
workflow.add_edge("Agent1", "Agent2")
# Using Edge object
from swarms.structs.graph_workflow import Edge
edge = Edge(source="Agent1", target="Agent2", metadata={"type": "data"})
workflow.add_edge(edge)
compile()
Pre-compute expensive operations:
# Manual compilation
workflow.compile()
# Auto-compilation (default)
workflow = GraphWorkflow(auto_compile=True)
Compilation:
- Auto-sets entry/exit points
- Computes topological layers
- Caches for performance
- Validates graph structure
run()
Execute the workflow:
result = workflow.run(
task="Process data pipeline",
img=None, # Optional image input
)
Use Cases
Software Build Pipeline
compile_agent = Agent(agent_name="Compiler", ...)
test_agent = Agent(agent_name="Tester", ...)
lint_agent = Agent(agent_name="Linter", ...)
security_agent = Agent(agent_name="SecurityScanner", ...)
package_agent = Agent(agent_name="Packager", ...)
deploy_agent = Agent(agent_name="Deployer", ...)
build_pipeline = GraphWorkflow(name="CI-CD-Pipeline")
# Add all nodes
for agent in [compile_agent, test_agent, lint_agent, security_agent, package_agent, deploy_agent]:
build_pipeline.add_node(agent)
# Define dependencies
build_pipeline.add_edge("Compiler", "Tester")
build_pipeline.add_edges_from_source("Compiler", ["Linter", "SecurityScanner"])
build_pipeline.add_edges_to_target(["Tester", "Linter", "SecurityScanner"], "Packager")
build_pipeline.add_edge("Packager", "Deployer")
build_pipeline.compile()
result = build_pipeline.run("Build and deploy version 2.0")
Data Science Pipeline
edges = [
# Data collection phase
("APICollector", "DataValidator"),
("DatabaseCollector", "DataValidator"),
# Parallel processing
("DataValidator", ["Cleaner", "Transformer", "FeatureEngineer"]),
# Convergence for modeling
(["Cleaner", "Transformer", "FeatureEngineer"], "ModelTrainer"),
# Evaluation and deployment
("ModelTrainer", "ModelEvaluator"),
("ModelEvaluator", "Deployer"),
]
ml_pipeline = GraphWorkflow.from_spec(
agents=[api_collector, db_collector, validator, cleaner, transformer,
feature_eng, trainer, evaluator, deployer],
edges=edges,
)
result = ml_pipeline.run("Train and deploy customer churn model")
Content Creation Workflow
# Diamond pattern: Research -> (Write, Design) -> Review
edges = [
("Researcher", ["Writer", "Designer"]),
(["Writer", "Designer"], "Reviewer"),
("Reviewer", "Publisher"),
]
content_workflow = GraphWorkflow.from_spec(
agents=[researcher, writer, designer, reviewer, publisher],
edges=edges,
)
result = content_workflow.run("Create marketing campaign for product launch")
Entry and Exit Points
Auto-Detection
workflow = GraphWorkflow()
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edge("agent1", "agent2")
workflow.add_edge("agent2", "agent3")
workflow.compile()
# Automatically detects:
# - entry_points: ["agent1"] (no incoming edges)
# - end_points: ["agent3"] (no outgoing edges)
Manual Setting
workflow.set_entry_points(["StartAgent"])
workflow.set_end_points(["FinalAgent"])
Backend Selection
NetworkX (Default)
workflow = GraphWorkflow(backend="networkx")
Benefits:
- Pure Python
- Rich ecosystem
- Extensive algorithms
- Easy debugging
workflow = GraphWorkflow(backend="rustworkx")
Benefits:
- Rust-based performance
- Faster graph operations
- Lower memory usage
- Better for large graphs
Requires: pip install rustworkx
Topological Execution
The workflow executes in topological layers:
# Graph:
# A -> B, C
# B -> D
# C -> D
# Execution layers:
# Layer 0: [A]
# Layer 1: [B, C] (parallel)
# Layer 2: [D]
Parallel execution within layers using ThreadPoolExecutor.
Graph Validation
Cycle Detection
try:
workflow.add_edge("A", "B")
workflow.add_edge("B", "C")
workflow.add_edge("C", "A") # Creates cycle
workflow.compile()
except ValueError as e:
print("Cycle detected!")
Dependency Validation
# Validates:
# - All referenced nodes exist
# - No self-loops
# - DAG structure (no cycles)
# - Entry points exist
# - End points exist
Compilation Caching
# First run: compiles
result1 = workflow.run("Task 1")
# Subsequent runs: uses cache
result2 = workflow.run("Task 2")
result3 = workflow.run("Task 3")
# Manual recompilation (if graph changed)
workflow.compile()
Concurrent Node Addition
# Add many nodes efficiently
workflow.add_nodes(
agents=agent_list,
batch_size=10, # Process in batches
)
Best Practices
Graph Design: Keep graphs acyclic - use multiple workflows for iterative processes
- Clear Dependencies: Only add edges for true dependencies
- Maximize Parallelism: Let independent nodes run concurrently
- Compilation: Always compile before running
- Entry/Exit Points: Let auto-detection work unless specific control needed
- Backend Choice: Use Rustworkx for large graphs (>100 nodes)
Graph compilation is cached - manually recompile if graph structure changes after initial compilation
Error Handling
try:
workflow.add_edge("NonExistentAgent", "TargetAgent")
except ValueError as e:
print(f"Invalid edge: {e}")
# Source or target node doesn't exist
try:
workflow.compile()
except Exception as e:
print(f"Compilation failed: {e}")
# Cycle detected or invalid graph structure
Visualization
With Graphviz installed:
# Requires: pip install graphviz
# Visualize workflow structure
# (Implementation depends on GraphWorkflow visualization methods)