Overview
The AgentRearrange class enables complex workflows where multiple agents can work sequentially or concurrently based on a defined flow pattern. It supports both sequential execution (using ->) and concurrent execution (using ,) within the same workflow, providing maximum flexibility for agent orchestration.
Key Features
- Flexible Flow Syntax: Define sequential (
->) and concurrent (,) agent execution in one flow
- Custom Flow Patterns: Mix sequential and concurrent execution patterns
- Team Awareness: Agents can be aware of their position in the workflow
- Human-in-the-Loop: Integrate human review at any point in the workflow (using
H)
- Memory System Support: Optional persistent memory across agent interactions
- Batch Processing: Process multiple tasks with the same flow
- Concurrent Execution: Run multiple tasks in parallel
- Async Support: Asynchronous execution for non-blocking operations
Installation
Class Definition
class AgentRearrange:
def __init__(
self,
id: str = swarm_id(),
name: str = "AgentRearrange",
description: str = "A swarm of agents for rearranging tasks.",
agents: List[Union[Agent, Callable]] = None,
flow: str = None,
max_loops: int = 1,
verbose: bool = True,
memory_system: Any = None,
human_in_the_loop: bool = False,
custom_human_in_the_loop: Optional[Callable[[str], str]] = None,
output_type: OutputType = "all",
autosave: bool = True,
rules: str = None,
team_awareness: bool = False,
time_enabled: bool = False,
message_id_on: bool = False,
)
Parameters
id
str
default:"auto-generated"
Unique identifier for the agent rearrange system. Auto-generated if not provided.
name
str
default:"AgentRearrange"
Human-readable name for the system
description
str
default:"A swarm of agents for rearranging tasks."
Description of the system’s purpose
agents
List[Union[Agent, Callable]]
required
List of agents to include in the system. Can be Agent objects or callable functions.
Flow pattern defining agent execution order. Uses -> for sequential and , for concurrent execution.
Example: "agent1 -> agent2, agent3 -> agent4"
Maximum number of execution loops. Must be greater than 0.
Whether to enable verbose logging
Optional memory system for persistence across interactions
Whether to enable human interaction points in the workflow
custom_human_in_the_loop
Optional[Callable[[str], str]]
default:"None"
Custom function for handling human interaction. Takes input string, returns response.
Format for output results. Options: “all”, “final”, “list”, “dict”
Whether to automatically save execution data
System rules and constraints to add to conversation
Whether agents should be aware of team structure and sequential flow
Whether to track timestamps in conversations
Whether to include message IDs in conversations
Flow Syntax
The flow pattern defines how agents execute:
- Sequential:
agent1 -> agent2 -> agent3 (agents run one after another)
- Concurrent:
agent1, agent2, agent3 (agents run simultaneously)
- Mixed:
agent1 -> agent2, agent3 -> agent4 (agent1 first, then agent2 and agent3 concurrently, then agent4)
- Human-in-Loop:
agent1 -> H -> agent2 (agent1, then human input, then agent2)
Methods
run(task, img=None, *args, **kwargs)
Execute the agent rearrangement task.
The task to execute through the agent workflow
img
Optional[str]
default:"None"
Path to input image if required by any agents
result
Union[str, List[str], Dict[str, str]]
The processed output in the format specified by output_type
batch_run(tasks, img=None, batch_size=10, *args, **kwargs)
Process multiple tasks in batches.
List of tasks to process through the agent workflow
img
Optional[List[str]]
default:"None"
Optional list of images corresponding to tasks
Number of tasks to process simultaneously in each batch
List of results corresponding to input tasks
concurrent_run(tasks, img=None, max_workers=None, *args, **kwargs)
Process multiple tasks concurrently using ThreadPoolExecutor.
List of tasks to process through the agent workflow
img
Optional[List[str]]
default:"None"
Optional list of images corresponding to tasks
max_workers
Optional[int]
default:"None"
Maximum number of worker threads. Uses default ThreadPoolExecutor behavior if None.
List of results corresponding to input tasks
run_async(task, img=None, *args, **kwargs)
Asynchronously execute a task.
The task to be executed through the agent workflow
img
Optional[str]
default:"None"
Optional image input for the task
The result of the task execution
set_custom_flow(flow)
Set a custom flow pattern for agent execution.
The new flow pattern to use for agent execution
add_agent(agent)
Add an agent to the swarm.
remove_agent(agent_name)
Remove an agent from the swarm.
The name of the agent to be removed
validate_flow()
Validate the flow pattern.
True if the flow pattern is valid
Raises:
ValueError: If the flow pattern is incorrectly formatted or contains unregistered agents
to_dict()
Convert all attributes to a dictionary for serialization.
Dictionary representation of all class attributes
Attributes
| Attribute | Type | Description |
|---|
id | str | Unique identifier for the system |
name | str | Human-readable name |
description | str | Description of the system’s purpose |
agents | Dict[str, Agent] | Dictionary mapping agent names to Agent objects |
flow | str | Flow pattern defining agent execution order |
conversation | Conversation | Conversation history management |
Usage Examples
Sequential Flow
from swarms import Agent, AgentRearrange
from swarms.models import OpenAIChat
# Initialize LLM
llm = OpenAIChat(model_name="gpt-4o")
# Create agents
researcher = Agent(
agent_name="researcher",
llm=llm,
system_prompt="You are a research specialist."
)
writer = Agent(
agent_name="writer",
llm=llm,
system_prompt="You are a content writer."
)
editor = Agent(
agent_name="editor",
llm=llm,
system_prompt="You are an editor."
)
# Define sequential flow
flow = "researcher -> writer -> editor"
# Create rearrange system
system = AgentRearrange(
agents=[researcher, writer, editor],
flow=flow,
max_loops=1
)
# Execute task
result = system.run("Write a blog post about AI")
print(result)
Concurrent Flow
# Define concurrent flow - all agents run simultaneously
flow = "researcher, writer, editor"
system = AgentRearrange(
agents=[researcher, writer, editor],
flow=flow
)
result = system.run("Analyze this topic from different angles")
Mixed Sequential and Concurrent Flow
# Create more agents
data_collector = Agent(
agent_name="data_collector",
llm=llm,
system_prompt="You collect and organize data."
)
technical_analyst = Agent(
agent_name="technical_analyst",
llm=llm,
system_prompt="You analyze technical aspects."
)
business_analyst = Agent(
agent_name="business_analyst",
llm=llm,
system_prompt="You analyze business aspects."
)
synthesizer = Agent(
agent_name="synthesizer",
llm=llm,
system_prompt="You synthesize multiple perspectives."
)
# Mixed flow: collect data, then analyze concurrently, then synthesize
flow = "data_collector -> technical_analyst, business_analyst -> synthesizer"
system = AgentRearrange(
agents=[data_collector, technical_analyst, business_analyst, synthesizer],
flow=flow,
team_awareness=True # Agents know about each other
)
result = system.run("Analyze the market opportunity for AI assistants")
Human-in-the-Loop
# Include human review in the flow
flow = "researcher -> writer -> H -> editor"
system = AgentRearrange(
agents=[researcher, writer, editor],
flow=flow,
human_in_the_loop=True
)
result = system.run("Create a marketing campaign")
# System will pause for human input after the writer and before the editor
Custom Human-in-the-Loop Handler
def custom_human_handler(agent_output: str) -> str:
"""Custom function to handle human interaction"""
print(f"Agent output: {agent_output}")
feedback = input("Your feedback: ")
return feedback
flow = "agent1 -> H -> agent2"
system = AgentRearrange(
agents=[agent1, agent2],
flow=flow,
human_in_the_loop=True,
custom_human_in_the_loop=custom_human_handler
)
Batch Processing
# Process multiple tasks through the same flow
tasks = [
"Analyze healthcare AI trends",
"Analyze education AI trends",
"Analyze finance AI trends"
]
system = AgentRearrange(
agents=[researcher, writer, editor],
flow="researcher -> writer -> editor"
)
results = system.batch_run(tasks, batch_size=2)
for task, result in zip(tasks, results):
print(f"Task: {task}")
print(f"Result: {result}")
print("-" * 80)
Concurrent Task Execution
tasks = [
"Research AI in healthcare",
"Research AI in education",
"Research AI in finance"
]
# Run all tasks in parallel
results = system.concurrent_run(tasks, max_workers=3)
print(f"Processed {len(results)} tasks concurrently")
Async Execution
import asyncio
async def process_async():
result = await system.run_async("Analyze quantum computing trends")
return result
result = asyncio.run(process_async())
Dynamic Flow Modification
# Create system with initial flow
system = AgentRearrange(
agents=[researcher, writer, editor],
flow="researcher -> writer -> editor"
)
# Run with initial flow
result1 = system.run("Task 1")
# Change flow dynamically
system.set_custom_flow("researcher -> editor -> writer")
# Run with new flow
result2 = system.run("Task 2")
With Memory System
from swarms.memory import ChromaDB
# Create memory system
memory = ChromaDB(
output_dir="./agent_memory",
n_results=5
)
system = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer",
memory_system=memory # Persist conversation across runs
)
result = system.run("Analyze AI trends")
With Team Awareness
# Agents will know about their position in the workflow
system = AgentRearrange(
agents=[researcher, writer, editor],
flow="researcher -> writer -> editor",
team_awareness=True, # Enable team awareness
verbose=True
)
result = system.run("Create comprehensive analysis")
# Each agent will receive information about agents ahead and behind
With Rules
rules = """
All agents must:
1. Cite sources
2. Be objective
3. Flag uncertainties
4. Respect data privacy
"""
system = AgentRearrange(
agents=[researcher, writer, editor],
flow="researcher -> writer -> editor",
rules=rules # Add system-wide rules
)
Different Output Types
# Return all outputs
system_all = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer",
output_type="all" # Return all agent outputs
)
# Return only final output
system_final = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer",
output_type="final" # Return only last agent's output
)
# Return as list
system_list = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer",
output_type="list" # Return list of outputs
)
# Return as dict
system_dict = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer",
output_type="dict" # Return dict mapping agent names to outputs
)
Convenience Function
The rearrange() function provides a quick way to create and execute:
from swarms import rearrange
result = rearrange(
name="Quick Analysis",
agents=[researcher, writer, editor],
flow="researcher -> writer -> editor",
task="Analyze AI trends in 2024"
)
Error Handling
try:
system = AgentRearrange(
agents=[researcher, writer],
flow="researcher -> writer"
)
result = system.run("Process this task")
except ValueError as e:
print(f"Configuration error: {e}")
except Exception as e:
print(f"Execution error: {e}")
Best Practices
- Flow Design: Carefully design your flow to match your task requirements
- Team Awareness: Enable for complex flows where context matters
- Human Review: Add
H at critical decision points
- Error Handling: Always wrap execution in try-except blocks
- Logging: Enable verbose mode during development
- Memory Systems: Use for tasks requiring persistent context
- Flow Validation: Always validate flows before production use
- Agent Naming: Use clear, descriptive agent names in flows
Common Flow Patterns
Fan-Out Pattern
# One agent feeds multiple agents
flow = "collector -> analyst1, analyst2, analyst3 -> synthesizer"
Pipeline Pattern
# Linear processing chain
flow = "stage1 -> stage2 -> stage3 -> stage4"
Review Pattern
# Create, review, revise
flow = "creator -> reviewer -> H -> reviser"
Ensemble Pattern
# Multiple independent analyses
flow = "agent1, agent2, agent3, agent4 -> aggregator"