Skip to main content

Overview

The ConcurrentWorkflow class provides a framework for executing multiple agents concurrently on the same task, with optional dashboard monitoring, streaming callbacks, and various output formatting options. It uses ThreadPoolExecutor to manage concurrent execution and provides real-time status tracking for each agent.

Key Features

  • Concurrent Execution: Run multiple agents simultaneously on the same task
  • Real-time Dashboard: Monitor agent status and outputs in real-time
  • Streaming Callbacks: Get real-time updates as agents generate outputs
  • Flexible Output Formatting: Multiple output format options
  • Auto-save Support: Automatically save conversation history
  • Error Handling: Graceful error handling with status tracking
  • Batch Processing: Process multiple tasks sequentially with concurrent agents

Installation

pip install -U swarms

Class Definition

class ConcurrentWorkflow:
    def __init__(
        self,
        id: str = None,
        name: str = "ConcurrentWorkflow",
        description: str = "Execution of multiple agents concurrently",
        agents: List[Union[Agent, Callable]] = None,
        auto_save: bool = True,
        output_type: str = "dict-all-except-first",
        max_loops: int = 1,
        auto_generate_prompts: bool = False,
        show_dashboard: bool = False,
        autosave: bool = True,
        verbose: bool = False,
    )

Parameters

id
str
default:"auto-generated"
Unique identifier for the workflow instance. Auto-generated if not provided.
name
str
default:"ConcurrentWorkflow"
Human-readable name for the workflow
description
str
default:"Execution of multiple agents concurrently"
Description of the workflow’s purpose
agents
List[Union[Agent, Callable]]
required
List of agents to execute concurrently. Must not be None or empty.
auto_save
bool
default:"True"
Whether to automatically save workflow metadata
output_type
str
default:"dict-all-except-first"
Format for output formatting. Options include “dict-all-except-first”, “dict”, “list”, “str”
max_loops
int
default:"1"
Maximum number of execution loops (currently unused in concurrent execution)
auto_generate_prompts
bool
default:"False"
Whether to enable automatic prompt engineering for all agents
show_dashboard
bool
default:"False"
Whether to display real-time dashboard during execution
autosave
bool
default:"True"
Whether to automatically save conversation history to workspace
verbose
bool
default:"False"
Whether to enable verbose logging

Methods

run(task, img=None, imgs=None, streaming_callback=None)

Execute all agents concurrently on the given task.
task
str
required
The task to be executed by all agents
img
Optional[str]
default:"None"
Single image path for agents that support image input
imgs
Optional[List[str]]
default:"None"
List of image paths for agents that support multiple images
streaming_callback
Optional[Callable[[str, str, bool], None]]
default:"None"
Callback function for streaming updates. Called with (agent_name, chunk, is_final) parameters.
result
Union[Dict, List, str]
Formatted conversation history based on output_type

batch_run(tasks, imgs=None, streaming_callback=None)

Execute workflow on multiple tasks sequentially.
tasks
List[str]
required
List of tasks to be executed
imgs
Optional[List[str]]
default:"None"
List of image paths corresponding to each task
streaming_callback
Optional[Callable[[str, str, bool], None]]
default:"None"
Callback function for streaming updates
results
List[Union[Dict, List, str]]
List of results for each task

run_with_dashboard(task, img=None, imgs=None, streaming_callback=None)

Execute agents with real-time dashboard monitoring.
task
str
required
The task to be executed by all agents
img
Optional[str]
default:"None"
Single image path for agents that support image input
imgs
Optional[List[str]]
default:"None"
List of image paths for agents that support multiple images
streaming_callback
Optional[Callable[[str, str, bool], None]]
default:"None"
Callback function for streaming updates
result
Union[Dict, List, str]
Formatted conversation history based on output_type

cleanup()

Clean up resources and connections. Called automatically after each run.

reliability_check()

Validate workflow configuration. Raises:
  • ValueError: If no agents are provided or agents list is empty

activate_auto_prompt_engineering()

Enable automatic prompt engineering for all agents in the workflow.

display_agent_dashboard(title="ConcurrentWorkflow Dashboard", is_final=False)

Display real-time dashboard showing agent status and outputs.
title
str
default:"ConcurrentWorkflow Dashboard"
Title to display for the dashboard
is_final
bool
default:"False"
Whether this is the final dashboard display

Attributes

AttributeTypeDescription
idstrUnique identifier for the workflow instance
namestrHuman-readable name for the workflow
descriptionstrDescription of the workflow’s purpose
agentsList[Union[Agent, Callable]]List of agents to execute concurrently
agent_statusesdictDictionary tracking status and output of each agent
conversationConversationConversation object for storing agent interactions
metadata_output_pathstrPath for saving workflow metadata

Usage Examples

Basic Concurrent Workflow

from swarms import Agent, ConcurrentWorkflow
from swarms.models import OpenAIChat

# Initialize the language model
llm = OpenAIChat(
    model_name="gpt-4o",
    temperature=0.5,
)

# Create specialized agents with different perspectives
technical_analyst = Agent(
    agent_name="Technical-Analyst",
    llm=llm,
    max_loops=1,
    system_prompt="You are a technical analyst. Analyze from a technical perspective."
)

business_analyst = Agent(
    agent_name="Business-Analyst",
    llm=llm,
    max_loops=1,
    system_prompt="You are a business analyst. Analyze from a business perspective."
)

user_researcher = Agent(
    agent_name="User-Researcher",
    llm=llm,
    max_loops=1,
    system_prompt="You are a user researcher. Analyze from a user experience perspective."
)

# Create concurrent workflow
workflow = ConcurrentWorkflow(
    name="Multi-Perspective Analysis",
    description="Analyze from multiple perspectives simultaneously",
    agents=[technical_analyst, business_analyst, user_researcher],
    verbose=True
)

# Run all agents concurrently on the same task
result = workflow.run("Analyze the potential of implementing AI chatbots in customer service")
print(result)

With Real-Time Dashboard

# Create workflow with dashboard enabled
workflow = ConcurrentWorkflow(
    name="Analysis with Dashboard",
    agents=[technical_analyst, business_analyst, user_researcher],
    show_dashboard=True,  # Enable real-time dashboard
    verbose=True
)

# Run and watch the dashboard update in real-time
result = workflow.run("Evaluate the impact of remote work on productivity")

With Streaming Callbacks

def handle_stream(agent_name: str, chunk: str, is_final: bool):
    """Custom streaming callback to handle real-time updates"""
    if is_final:
        print(f"\n[{agent_name}] Completed!")
    else:
        print(f"[{agent_name}] {chunk}", end="", flush=True)

workflow = ConcurrentWorkflow(
    name="Streaming Analysis",
    agents=[technical_analyst, business_analyst],
)

# Get real-time streaming updates
result = workflow.run(
    "Analyze cybersecurity trends",
    streaming_callback=handle_stream
)

Batch Processing

# Process multiple tasks sequentially, with agents running concurrently on each
tasks = [
    "Analyze the impact of AI on healthcare",
    "Evaluate renewable energy solutions",
    "Assess blockchain technology trends"
]

workflow = ConcurrentWorkflow(
    name="Batch Analysis",
    agents=[technical_analyst, business_analyst, user_researcher],
)

results = workflow.batch_run(tasks)

for i, (task, result) in enumerate(zip(tasks, results), 1):
    print(f"\nTask {i}: {task}")
    print(f"Result: {result}")
    print("-" * 80)

With Image Input

from swarms import Agent
from swarms.models import GPT4VisionAPI

# Create vision-capable agents
vision_llm = GPT4VisionAPI()

image_analyst1 = Agent(
    agent_name="Image-Analyst-1",
    llm=vision_llm,
    system_prompt="Analyze images for technical details"
)

image_analyst2 = Agent(
    agent_name="Image-Analyst-2",
    llm=vision_llm,
    system_prompt="Analyze images for aesthetic quality"
)

workflow = ConcurrentWorkflow(
    agents=[image_analyst1, image_analyst2],
)

# Analyze image with multiple agents concurrently
result = workflow.run(
    "Analyze this product image",
    img="path/to/product.jpg"
)

Custom Output Types

# Different output formats
workflow_dict = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="dict"  # Returns dict with all agent outputs
)

workflow_list = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="list"  # Returns list of outputs
)

workflow_dict_except_first = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="dict-all-except-first"  # Skip first agent in output dict
)

With Autosave

import os

# Set workspace directory
os.environ["WORKSPACE_DIR"] = "./analysis_workspace"

workflow = ConcurrentWorkflow(
    name="Saved-Analysis",
    agents=[technical_analyst, business_analyst],
    autosave=True,  # Save conversation history
    verbose=True
)

result = workflow.run("Analyze market trends")
# Conversation saved to ./analysis_workspace/swarms/ConcurrentWorkflow/

Auto Prompt Engineering

# Enable automatic prompt engineering
workflow = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    auto_generate_prompts=True  # Agents will auto-optimize their prompts
)

workflow.activate_auto_prompt_engineering()
result = workflow.run("Complex analysis task")

Error Handling

try:
    workflow = ConcurrentWorkflow(
        agents=[technical_analyst, business_analyst],
        show_dashboard=True
    )
    result = workflow.run("Analyze this topic")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")
finally:
    # Cleanup is called automatically
    pass

Dashboard Output Example

When show_dashboard=True, you’ll see real-time updates like:
╔═══════════════════════════════════════════════════════════════╗
║           ConcurrentWorkflow Dashboard                         ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: Technical-Analyst                                       ║
║ Status: running                                                ║
║ Output: Analyzing technical aspects...                         ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: Business-Analyst                                        ║
║ Status: completed                                              ║
║ Output: The business impact is significant...                  ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: User-Researcher                                         ║
║ Status: pending                                                ║
║ Output:                                                        ║
╚═══════════════════════════════════════════════════════════════╝

Best Practices

  1. Agent Diversity: Use agents with different perspectives for richer analysis
  2. Dashboard for Monitoring: Enable dashboard during development to monitor agent progress
  3. Streaming Callbacks: Use streaming callbacks for real-time feedback in production
  4. Error Handling: Always wrap concurrent execution in try-except blocks
  5. Resource Management: Be mindful of API rate limits when running many agents
  6. Task Design: Ensure the task benefits from multiple concurrent perspectives
  7. Autosave: Enable autosave for important analyses

Common Use Cases

  • Multi-Perspective Analysis: Get technical, business, and user perspectives simultaneously
  • Consensus Building: Run multiple agents and aggregate their outputs
  • Parallel Research: Research different aspects of a topic concurrently
  • Voting Systems: Multiple agents vote on decisions
  • Quality Assurance: Multiple agents review the same content
  • Competitive Analysis: Different agents analyze competing solutions

Performance Considerations

  • CPU Utilization: Uses ~95% of available CPU cores by default
  • Concurrent Execution: All agents run truly concurrently using ThreadPoolExecutor
  • Memory Usage: Each agent maintains its own context, consider memory for many agents
  • API Rate Limits: Be aware of rate limits when using cloud-based LLMs

Build docs developers (and LLMs) love