Overview
TheGraphWorkflow class represents a workflow graph where each node is an agent. It provides sophisticated capabilities for building, executing, and visualizing Directed Acyclic Graph (DAG) workflows with automatic parallel execution, topological sorting, and compilation optimization.
Key Features
- DAG-Based Workflows: Build complex directed acyclic graphs of agent execution
- Automatic Parallelization: Automatically execute independent agents in parallel
- Topological Execution: Agents execute in topologically sorted layers
- Compilation Optimization: Pre-compute execution plans for faster multi-loop execution
- Multiple Graph Backends: Support for NetworkX and Rustworkx backends
- Visualization: Generate visual representations of workflows using Graphviz
- Fan-Out/Fan-In Patterns: Easy creation of parallel processing patterns
- JSON Serialization: Save and load workflows to/from JSON
- Async Support: Asynchronous execution for non-blocking operations
- Cycle Detection: Automatic detection and reporting of cycles
Installation
Class Definition
Parameters
Unique identifier for the workflow. Auto-generated UUID if not provided.
Human-readable name for the workflow
Description of the workflow’s purpose
Dictionary of nodes in the graph, where the key is the node ID and the value is the Node object
List of edges in the graph, where each edge is represented by an Edge object
List of node IDs that serve as entry points to the graph. Auto-detected if not provided.
List of node IDs that serve as end points of the graph. Auto-detected if not provided.
Maximum number of times to execute the workflow
The task to be executed by the workflow
Whether to automatically compile the graph for optimization
Whether to enable verbose logging
Graph backend to use. Options: “networkx”, “rustworkx” (Rustworkx is faster for large graphs)
Methods
Graph Construction
add_node(agent, **kwargs)
Add an agent node to the workflow graph.
The agent to add as a node
Additional keyword arguments for the node
add_nodes(agents, batch_size=10, **kwargs)
Add multiple agents to the workflow graph concurrently in batches.
List of agents to add
Number of agents to add concurrently in a batch
add_edge(edge_or_source, target=None, **kwargs)
Add an edge by Edge object or by passing node objects/ids.
Either an Edge object or the source node/id
Target node/id (required if edge_or_source is not an Edge)
add_edges_from_source(source, targets, **kwargs)
Add multiple edges from a single source to multiple targets (fan-out pattern).
Source node/id that will send output to multiple targets
List of target node/ids that will receive the source output in parallel
List of created Edge objects
add_edges_to_target(sources, target, **kwargs)
Add multiple edges from multiple sources to a single target (fan-in pattern).
List of source node/ids that will send output to the target
Target node/id that will receive all source outputs
List of created Edge objects
add_parallel_chain(sources, targets, **kwargs)
Create a parallel processing chain (full mesh connection).
List of source node/ids
List of target node/ids
List of created Edge objects
Execution
run(task=None, img=None, *args, **kwargs)
Run the workflow graph with optimized parallel agent execution.
Task to execute. Uses self.task if not provided.
Optional image path for multimodal tasks
Execution results from all nodes
arun(task=None, *args, **kwargs)
Async version of run for better performance with I/O bound operations.
Task to execute. Uses self.task if not provided.
Execution results from all nodes
Compilation and Optimization
compile()
Pre-compute expensive operations for faster execution. Results are cached.
get_compilation_status()
Get detailed compilation status information.
Compilation status including cache state, timestamps, and performance metrics
Entry/Exit Points
set_entry_points(entry_points)
Set the entry points for the workflow.
List of node IDs to serve as entry points
set_end_points(end_points)
Set the end points for the workflow.
List of node IDs to serve as end points
auto_set_entry_points()
Automatically set entry points to nodes with no incoming edges.
auto_set_end_points()
Automatically set end points to nodes with no outgoing edges.
Visualization
visualize(format="png", view=True, engine="dot", show_summary=False)
Visualize the workflow graph using Graphviz.
Output format: ‘png’, ‘svg’, ‘pdf’, ‘dot’
Whether to open the visualization after creation
Graphviz layout engine: ‘dot’, ‘neato’, ‘fdp’, ‘sfdp’, ‘twopi’, ‘circo’
Whether to print parallel processing summary
Path to the generated visualization file
visualize_simple()
Simple text-based visualization for environments without Graphviz.
Text representation of the workflow
Serialization
to_json(fast=True, include_conversation=False, include_runtime_state=False)
Serialize the workflow to JSON.
Whether to use fast JSON serialization
Whether to include conversation history
Whether to include runtime state like compilation info
JSON representation of the workflow
from_json(json_str, restore_runtime_state=False) (classmethod)
Deserialize a workflow from JSON.
JSON string representation of the workflow
Whether to restore runtime state
A new GraphWorkflow instance
save_to_file(filepath, include_conversation=False, include_runtime_state=False, overwrite=False)
Save the workflow to a JSON file.
Path to save the JSON file
Whether to overwrite existing files
Path to the saved file
load_from_file(filepath, restore_runtime_state=False) (classmethod)
Load a workflow from a JSON file.
Path to the JSON file
Loaded workflow instance
Validation
validate(auto_fix=False)
Validate the workflow structure.
Whether to automatically fix simple issues
Dictionary containing validation results, including validity, warnings and errors
Factory Method
from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) (classmethod)
Construct a workflow from a list of agents and connections.
List of agents or Node objects
List of edges or edge tuples. Supports fan-out, fan-in, and parallel chain patterns.
List of entry point node IDs
List of end point node IDs
Task to be executed by the workflow
A new GraphWorkflow instance
Usage Examples
Basic Sequential Graph
Fan-Out Pattern (Parallel Specialists)
Complex DAG Workflow
Using from_spec for Quick Construction
Advanced Edge Patterns in from_spec
Async Execution
Save and Load Workflows
Using Rustworkx Backend
Workflow Validation
Compilation for Multi-Loop Execution
Custom Visualization
Node and Edge Classes
Node
Edge
Best Practices
- Compilation: Always compile workflows before execution for optimal performance
- Visualization: Use
visualize()to understand your workflow structure - Validation: Validate workflows before production deployment
- Entry/Exit Points: Let the system auto-detect entry/exit points or set them explicitly
- Backend Selection: Use Rustworkx for large graphs (>100 nodes)
- Error Handling: Wrap execution in try-except blocks
- Batching: Use
add_nodes()with appropriate batch sizes for many agents - Caching: Enable
auto_compilefor multi-loop workflows - Save Progress: Save complex workflows to JSON for reuse
Performance Characteristics
- Parallel Execution: Independent agents in the same layer run concurrently
- CPU Utilization: Uses ~95% of available CPU cores
- Compilation: Pre-computes execution plan, cached for multi-loop runs
- Memory: Each agent maintains its own context
- Graph Backend: Rustworkx is faster for large graphs (>100 nodes)
Common Patterns
Research Pipeline
Ensemble Decision Making
Multi-Stage Review
Related Classes
- SequentialWorkflow: For simple sequential execution
- ConcurrentWorkflow: For parallel execution on same task
- AgentRearrange: For flexible flow-based orchestration
- Agent: The base agent class used in workflows