Skip to main content

Concurrent Workflow

The ConcurrentWorkflow runs multiple agents simultaneously on the same task, enabling parallel execution and high-throughput processing. This architecture is ideal when you need multiple perspectives or rapid parallel analysis.

When to Use

  • High-throughput tasks: Process large volumes simultaneously
  • Multiple perspectives: Get diverse viewpoints on the same input
  • Parallel analysis: Market, financial, and risk analysis at once
  • Time-critical operations: Minimize total execution time
  • Independent processing: Tasks with no dependencies

Key Features

  • True parallel execution with ThreadPoolExecutor
  • Real-time dashboard monitoring (optional)
  • Agent status tracking
  • Streaming callbacks support
  • Automatic CPU core utilization
  • Conversation history aggregation

Basic Example

from swarms import Agent, ConcurrentWorkflow

# Create specialized analysts
market_analyst = Agent(
    agent_name="Market-Analyst",
    system_prompt="Analyze market trends and opportunities.",
    model_name="gpt-4o-mini",
)

financial_analyst = Agent(
    agent_name="Financial-Analyst",
    system_prompt="Provide financial analysis and projections.",
    model_name="gpt-4o-mini",
)

risk_analyst = Agent(
    agent_name="Risk-Analyst",
    system_prompt="Assess and quantify potential risks.",
    model_name="gpt-4o-mini",
)

# Create concurrent workflow
workflow = ConcurrentWorkflow(
    agents=[market_analyst, financial_analyst, risk_analyst],
    max_loops=1,
)

# All agents run simultaneously
results = workflow.run(
    "Analyze the potential impact of AI on the healthcare industry"
)
print(results)

With Dashboard Monitoring

workflow = ConcurrentWorkflow(
    name="AnalysisTeam",
    description="Concurrent analysis team",
    agents=[market_analyst, financial_analyst, risk_analyst],
    show_dashboard=True,  # Enable real-time dashboard
    max_loops=1,
)

results = workflow.run("Evaluate cryptocurrency market trends")

With Streaming Callbacks

def streaming_callback(agent_name: str, chunk: str, is_final: bool):
    """Receive real-time updates from agents"""
    if is_final:
        print(f"\n[{agent_name}] Complete!")
    else:
        print(f"[{agent_name}]: {chunk}", end="", flush=True)

results = workflow.run(
    task="Analyze Q4 performance",
    streaming_callback=streaming_callback,
)

Key Parameters

name
str
default:"ConcurrentWorkflow"
Name identifier for the workflow
agents
List[Agent]
required
List of agents to execute concurrently
max_loops
int
default:1
Maximum number of execution loops
show_dashboard
bool
default:false
Enable real-time dashboard display
output_type
str
default:"dict-all-except-first"
Output format for results
auto_generate_prompts
bool
default:false
Enable automatic prompt engineering
autosave
bool
default:true
Automatically save conversation history

Methods

run()

Execute all agents concurrently on a task.
result = workflow.run(
    task="Analyze market conditions",
    img=None,  # Optional image input
    streaming_callback=None,  # Optional callback
)

batch_run()

Process multiple tasks sequentially (each task runs agents concurrently).
tasks = [
    "Analyze tech sector",
    "Analyze healthcare sector",
    "Analyze energy sector"
]

results = workflow.batch_run(tasks)

Dashboard Features

When show_dashboard=True, you get:
  • Real-time Status: See each agent’s current state (pending, running, completed)
  • Output Preview: Monitor agent outputs as they generate
  • Progress Tracking: Visual progress indicators
  • Error Detection: Immediate error visibility
  • Completion Summary: Final dashboard with all results

Use Cases

Multi-Perspective Analysis

# Get technical, fundamental, and sentiment analysis simultaneously
technical_agent = Agent(agent_name="Technical-Analyst", ...)
fundamental_agent = Agent(agent_name="Fundamental-Analyst", ...)
sentiment_agent = Agent(agent_name="Sentiment-Analyst", ...)

analysis_team = ConcurrentWorkflow(
    agents=[technical_agent, fundamental_agent, sentiment_agent]
)

insights = analysis_team.run("NVIDIA stock analysis")

Parallel Research

# Research multiple aspects simultaneously
literature_researcher = Agent(agent_name="Literature-Researcher", ...)
market_researcher = Agent(agent_name="Market-Researcher", ...)
competitor_researcher = Agent(agent_name="Competitor-Researcher", ...)

research_team = ConcurrentWorkflow(
    agents=[literature_researcher, market_researcher, competitor_researcher]
)

research = research_team.run("Electric vehicle market landscape")

Batch Document Processing

# Process documents with multiple analyzers
summary_agent = Agent(agent_name="Summarizer", ...)
sentiment_agent = Agent(agent_name="Sentiment-Analyzer", ...)
key_points_agent = Agent(agent_name="Key-Points-Extractor", ...)

processing_team = ConcurrentWorkflow(
    agents=[summary_agent, sentiment_agent, key_points_agent]
)

documents = ["doc1.txt", "doc2.txt", "doc3.txt"]
results = processing_team.batch_run(documents)

Performance Optimization

CPU Core Utilization

The workflow automatically uses 95% of available CPU cores:
import os

# Automatically calculated
max_workers = int(os.cpu_count() * 0.95)

Agent Configuration for Concurrency

# Disable printing for dashboard mode
if show_dashboard:
    for agent in agents:
        agent.print_on = False

Advanced Features

Agent Status Tracking

workflow = ConcurrentWorkflow(
    agents=[agent1, agent2, agent3],
    show_dashboard=True,
)

# Internal status tracking for each agent
# - status: "pending" | "running" | "completed" | "error"
# - output: Current agent output
print(workflow.agent_statuses)

Conversation Aggregation

result = workflow.run("Task")

# Access aggregated conversation
history = workflow.conversation.conversation_history

# Format:
# [{"role": "User", "content": "Task"},
#  {"role": "Agent1", "content": "Response1"},
#  {"role": "Agent2", "content": "Response2"}]

Output Types

Supported output formats:
  • "dict-all-except-first": Dictionary excluding initial user message
  • "dict": Complete conversation dictionary
  • "str": Concatenated string output
  • "list": List of all messages
workflow = ConcurrentWorkflow(
    agents=agents,
    output_type="dict",
)

Best Practices

Performance Tip: Use concurrent workflow when agents can truly work independently
  1. Independent Agents: Ensure agents don’t need each other’s outputs
  2. Appropriate Size: 3-8 agents typically optimal for most systems
  3. Dashboard Usage: Enable for debugging, disable for production
  4. Resource Management: Monitor CPU/memory with large agent counts
  5. Error Handling: One agent failure doesn’t stop others
Concurrent execution means agents run simultaneously - ensure your LLM API can handle parallel requests

Error Handling

try:
    results = workflow.run("Analyze market")
except Exception as e:
    print(f"Workflow error: {e}")
    # Individual agent errors are captured in results

Build docs developers (and LLMs) love