Concurrent Workflow
The ConcurrentWorkflow runs multiple agents simultaneously on the same task, enabling parallel execution and high-throughput processing. This architecture is ideal when you need multiple perspectives or rapid parallel analysis.
When to Use
- High-throughput tasks: Process large volumes simultaneously
- Multiple perspectives: Get diverse viewpoints on the same input
- Parallel analysis: Market, financial, and risk analysis at once
- Time-critical operations: Minimize total execution time
- Independent processing: Tasks with no dependencies
Key Features
- True parallel execution with ThreadPoolExecutor
- Real-time dashboard monitoring (optional)
- Agent status tracking
- Streaming callbacks support
- Automatic CPU core utilization
- Conversation history aggregation
Basic Example
from swarms import Agent, ConcurrentWorkflow
# Create specialized analysts
market_analyst = Agent(
agent_name="Market-Analyst",
system_prompt="Analyze market trends and opportunities.",
model_name="gpt-4o-mini",
)
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="Provide financial analysis and projections.",
model_name="gpt-4o-mini",
)
risk_analyst = Agent(
agent_name="Risk-Analyst",
system_prompt="Assess and quantify potential risks.",
model_name="gpt-4o-mini",
)
# Create concurrent workflow
workflow = ConcurrentWorkflow(
agents=[market_analyst, financial_analyst, risk_analyst],
max_loops=1,
)
# All agents run simultaneously
results = workflow.run(
"Analyze the potential impact of AI on the healthcare industry"
)
print(results)
With Dashboard Monitoring
workflow = ConcurrentWorkflow(
name="AnalysisTeam",
description="Concurrent analysis team",
agents=[market_analyst, financial_analyst, risk_analyst],
show_dashboard=True, # Enable real-time dashboard
max_loops=1,
)
results = workflow.run("Evaluate cryptocurrency market trends")
With Streaming Callbacks
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
"""Receive real-time updates from agents"""
if is_final:
print(f"\n[{agent_name}] Complete!")
else:
print(f"[{agent_name}]: {chunk}", end="", flush=True)
results = workflow.run(
task="Analyze Q4 performance",
streaming_callback=streaming_callback,
)
Key Parameters
name
str
default:"ConcurrentWorkflow"
Name identifier for the workflow
List of agents to execute concurrently
Maximum number of execution loops
Enable real-time dashboard display
output_type
str
default:"dict-all-except-first"
Output format for results
Enable automatic prompt engineering
Automatically save conversation history
Methods
run()
Execute all agents concurrently on a task.
result = workflow.run(
task="Analyze market conditions",
img=None, # Optional image input
streaming_callback=None, # Optional callback
)
batch_run()
Process multiple tasks sequentially (each task runs agents concurrently).
tasks = [
"Analyze tech sector",
"Analyze healthcare sector",
"Analyze energy sector"
]
results = workflow.batch_run(tasks)
Dashboard Features
When show_dashboard=True, you get:
- Real-time Status: See each agent’s current state (pending, running, completed)
- Output Preview: Monitor agent outputs as they generate
- Progress Tracking: Visual progress indicators
- Error Detection: Immediate error visibility
- Completion Summary: Final dashboard with all results
Use Cases
Multi-Perspective Analysis
# Get technical, fundamental, and sentiment analysis simultaneously
technical_agent = Agent(agent_name="Technical-Analyst", ...)
fundamental_agent = Agent(agent_name="Fundamental-Analyst", ...)
sentiment_agent = Agent(agent_name="Sentiment-Analyst", ...)
analysis_team = ConcurrentWorkflow(
agents=[technical_agent, fundamental_agent, sentiment_agent]
)
insights = analysis_team.run("NVIDIA stock analysis")
Parallel Research
# Research multiple aspects simultaneously
literature_researcher = Agent(agent_name="Literature-Researcher", ...)
market_researcher = Agent(agent_name="Market-Researcher", ...)
competitor_researcher = Agent(agent_name="Competitor-Researcher", ...)
research_team = ConcurrentWorkflow(
agents=[literature_researcher, market_researcher, competitor_researcher]
)
research = research_team.run("Electric vehicle market landscape")
Batch Document Processing
# Process documents with multiple analyzers
summary_agent = Agent(agent_name="Summarizer", ...)
sentiment_agent = Agent(agent_name="Sentiment-Analyzer", ...)
key_points_agent = Agent(agent_name="Key-Points-Extractor", ...)
processing_team = ConcurrentWorkflow(
agents=[summary_agent, sentiment_agent, key_points_agent]
)
documents = ["doc1.txt", "doc2.txt", "doc3.txt"]
results = processing_team.batch_run(documents)
CPU Core Utilization
The workflow automatically uses 95% of available CPU cores:
import os
# Automatically calculated
max_workers = int(os.cpu_count() * 0.95)
Agent Configuration for Concurrency
# Disable printing for dashboard mode
if show_dashboard:
for agent in agents:
agent.print_on = False
Advanced Features
Agent Status Tracking
workflow = ConcurrentWorkflow(
agents=[agent1, agent2, agent3],
show_dashboard=True,
)
# Internal status tracking for each agent
# - status: "pending" | "running" | "completed" | "error"
# - output: Current agent output
print(workflow.agent_statuses)
Conversation Aggregation
result = workflow.run("Task")
# Access aggregated conversation
history = workflow.conversation.conversation_history
# Format:
# [{"role": "User", "content": "Task"},
# {"role": "Agent1", "content": "Response1"},
# {"role": "Agent2", "content": "Response2"}]
Output Types
Supported output formats:
"dict-all-except-first": Dictionary excluding initial user message
"dict": Complete conversation dictionary
"str": Concatenated string output
"list": List of all messages
workflow = ConcurrentWorkflow(
agents=agents,
output_type="dict",
)
Best Practices
Performance Tip: Use concurrent workflow when agents can truly work independently
- Independent Agents: Ensure agents don’t need each other’s outputs
- Appropriate Size: 3-8 agents typically optimal for most systems
- Dashboard Usage: Enable for debugging, disable for production
- Resource Management: Monitor CPU/memory with large agent counts
- Error Handling: One agent failure doesn’t stop others
Concurrent execution means agents run simultaneously - ensure your LLM API can handle parallel requests
Error Handling
try:
results = workflow.run("Analyze market")
except Exception as e:
print(f"Workflow error: {e}")
# Individual agent errors are captured in results