Overview
TheConcurrentWorkflow class provides a framework for executing multiple agents concurrently on the same task, with optional dashboard monitoring, streaming callbacks, and various output formatting options. It uses ThreadPoolExecutor to manage concurrent execution and provides real-time status tracking for each agent.
Key Features
- Concurrent Execution: Run multiple agents simultaneously on the same task
- Real-time Dashboard: Monitor agent status and outputs in real-time
- Streaming Callbacks: Get real-time updates as agents generate outputs
- Flexible Output Formatting: Multiple output format options
- Auto-save Support: Automatically save conversation history
- Error Handling: Graceful error handling with status tracking
- Batch Processing: Process multiple tasks sequentially with concurrent agents
Installation
Class Definition
Parameters
Unique identifier for the workflow instance. Auto-generated if not provided.
Human-readable name for the workflow
Description of the workflow’s purpose
List of agents to execute concurrently. Must not be None or empty.
Whether to automatically save workflow metadata
Format for output formatting. Options include “dict-all-except-first”, “dict”, “list”, “str”
Maximum number of execution loops (currently unused in concurrent execution)
Whether to enable automatic prompt engineering for all agents
Whether to display real-time dashboard during execution
Whether to automatically save conversation history to workspace
Whether to enable verbose logging
Methods
run(task, img=None, imgs=None, streaming_callback=None)
Execute all agents concurrently on the given task.
The task to be executed by all agents
Single image path for agents that support image input
List of image paths for agents that support multiple images
Callback function for streaming updates. Called with (agent_name, chunk, is_final) parameters.
Formatted conversation history based on output_type
batch_run(tasks, imgs=None, streaming_callback=None)
Execute workflow on multiple tasks sequentially.
List of tasks to be executed
List of image paths corresponding to each task
Callback function for streaming updates
List of results for each task
run_with_dashboard(task, img=None, imgs=None, streaming_callback=None)
Execute agents with real-time dashboard monitoring.
The task to be executed by all agents
Single image path for agents that support image input
List of image paths for agents that support multiple images
Callback function for streaming updates
Formatted conversation history based on output_type
cleanup()
Clean up resources and connections. Called automatically after each run.
reliability_check()
Validate workflow configuration.
Raises:
ValueError: If no agents are provided or agents list is empty
activate_auto_prompt_engineering()
Enable automatic prompt engineering for all agents in the workflow.
display_agent_dashboard(title="ConcurrentWorkflow Dashboard", is_final=False)
Display real-time dashboard showing agent status and outputs.
Title to display for the dashboard
Whether this is the final dashboard display
Attributes
| Attribute | Type | Description |
|---|---|---|
id | str | Unique identifier for the workflow instance |
name | str | Human-readable name for the workflow |
description | str | Description of the workflow’s purpose |
agents | List[Union[Agent, Callable]] | List of agents to execute concurrently |
agent_statuses | dict | Dictionary tracking status and output of each agent |
conversation | Conversation | Conversation object for storing agent interactions |
metadata_output_path | str | Path for saving workflow metadata |
Usage Examples
Basic Concurrent Workflow
With Real-Time Dashboard
With Streaming Callbacks
Batch Processing
With Image Input
Custom Output Types
With Autosave
Auto Prompt Engineering
Error Handling
Dashboard Output Example
Whenshow_dashboard=True, you’ll see real-time updates like:
Best Practices
- Agent Diversity: Use agents with different perspectives for richer analysis
- Dashboard for Monitoring: Enable dashboard during development to monitor agent progress
- Streaming Callbacks: Use streaming callbacks for real-time feedback in production
- Error Handling: Always wrap concurrent execution in try-except blocks
- Resource Management: Be mindful of API rate limits when running many agents
- Task Design: Ensure the task benefits from multiple concurrent perspectives
- Autosave: Enable autosave for important analyses
Common Use Cases
- Multi-Perspective Analysis: Get technical, business, and user perspectives simultaneously
- Consensus Building: Run multiple agents and aggregate their outputs
- Parallel Research: Research different aspects of a topic concurrently
- Voting Systems: Multiple agents vote on decisions
- Quality Assurance: Multiple agents review the same content
- Competitive Analysis: Different agents analyze competing solutions
Performance Considerations
- CPU Utilization: Uses ~95% of available CPU cores by default
- Concurrent Execution: All agents run truly concurrently using ThreadPoolExecutor
- Memory Usage: Each agent maintains its own context, consider memory for many agents
- API Rate Limits: Be aware of rate limits when using cloud-based LLMs
Related Classes
- SequentialWorkflow: For sequential agent execution
- AgentRearrange: For complex orchestration patterns
- GraphWorkflow: For DAG-based workflows
- Agent: The base agent class used in workflows