Skip to main content

Overview

The AgentRearrange class enables complex workflows where multiple agents can work sequentially or concurrently based on a defined flow pattern. It supports both sequential execution (using ->) and concurrent execution (using ,) within the same workflow, providing maximum flexibility for agent orchestration.

Key Features

  • Flexible Flow Syntax: Define sequential (->) and concurrent (,) agent execution in one flow
  • Custom Flow Patterns: Mix sequential and concurrent execution patterns
  • Team Awareness: Agents can be aware of their position in the workflow
  • Human-in-the-Loop: Integrate human review at any point in the workflow (using H)
  • Memory System Support: Optional persistent memory across agent interactions
  • Batch Processing: Process multiple tasks with the same flow
  • Concurrent Execution: Run multiple tasks in parallel
  • Async Support: Asynchronous execution for non-blocking operations

Installation

pip install -U swarms

Class Definition

class AgentRearrange:
    def __init__(
        self,
        id: str = swarm_id(),
        name: str = "AgentRearrange",
        description: str = "A swarm of agents for rearranging tasks.",
        agents: List[Union[Agent, Callable]] = None,
        flow: str = None,
        max_loops: int = 1,
        verbose: bool = True,
        memory_system: Any = None,
        human_in_the_loop: bool = False,
        custom_human_in_the_loop: Optional[Callable[[str], str]] = None,
        output_type: OutputType = "all",
        autosave: bool = True,
        rules: str = None,
        team_awareness: bool = False,
        time_enabled: bool = False,
        message_id_on: bool = False,
    )

Parameters

id
str
default:"auto-generated"
Unique identifier for the agent rearrange system. Auto-generated if not provided.
name
str
default:"AgentRearrange"
Human-readable name for the system
description
str
default:"A swarm of agents for rearranging tasks."
Description of the system’s purpose
agents
List[Union[Agent, Callable]]
required
List of agents to include in the system. Can be Agent objects or callable functions.
flow
str
required
Flow pattern defining agent execution order. Uses -> for sequential and , for concurrent execution. Example: "agent1 -> agent2, agent3 -> agent4"
max_loops
int
default:"1"
Maximum number of execution loops. Must be greater than 0.
verbose
bool
default:"True"
Whether to enable verbose logging
memory_system
Any
default:"None"
Optional memory system for persistence across interactions
human_in_the_loop
bool
default:"False"
Whether to enable human interaction points in the workflow
custom_human_in_the_loop
Optional[Callable[[str], str]]
default:"None"
Custom function for handling human interaction. Takes input string, returns response.
output_type
OutputType
default:"all"
Format for output results. Options: “all”, “final”, “list”, “dict”
autosave
bool
default:"True"
Whether to automatically save execution data
rules
str
default:"None"
System rules and constraints to add to conversation
team_awareness
bool
default:"False"
Whether agents should be aware of team structure and sequential flow
time_enabled
bool
default:"False"
Whether to track timestamps in conversations
message_id_on
bool
default:"False"
Whether to include message IDs in conversations

Flow Syntax

The flow pattern defines how agents execute:
  • Sequential: agent1 -> agent2 -> agent3 (agents run one after another)
  • Concurrent: agent1, agent2, agent3 (agents run simultaneously)
  • Mixed: agent1 -> agent2, agent3 -> agent4 (agent1 first, then agent2 and agent3 concurrently, then agent4)
  • Human-in-Loop: agent1 -> H -> agent2 (agent1, then human input, then agent2)

Methods

run(task, img=None, *args, **kwargs)

Execute the agent rearrangement task.
task
str
required
The task to execute through the agent workflow
img
Optional[str]
default:"None"
Path to input image if required by any agents
result
Union[str, List[str], Dict[str, str]]
The processed output in the format specified by output_type

batch_run(tasks, img=None, batch_size=10, *args, **kwargs)

Process multiple tasks in batches.
tasks
List[str]
required
List of tasks to process through the agent workflow
img
Optional[List[str]]
default:"None"
Optional list of images corresponding to tasks
batch_size
int
default:"10"
Number of tasks to process simultaneously in each batch
results
List[str]
List of results corresponding to input tasks

concurrent_run(tasks, img=None, max_workers=None, *args, **kwargs)

Process multiple tasks concurrently using ThreadPoolExecutor.
tasks
List[str]
required
List of tasks to process through the agent workflow
img
Optional[List[str]]
default:"None"
Optional list of images corresponding to tasks
max_workers
Optional[int]
default:"None"
Maximum number of worker threads. Uses default ThreadPoolExecutor behavior if None.
results
List[str]
List of results corresponding to input tasks

run_async(task, img=None, *args, **kwargs)

Asynchronously execute a task.
task
str
required
The task to be executed through the agent workflow
img
Optional[str]
default:"None"
Optional image input for the task
result
Any
The result of the task execution

set_custom_flow(flow)

Set a custom flow pattern for agent execution.
flow
str
required
The new flow pattern to use for agent execution

add_agent(agent)

Add an agent to the swarm.
agent
Agent
required
The agent to be added

remove_agent(agent_name)

Remove an agent from the swarm.
agent_name
str
required
The name of the agent to be removed

validate_flow()

Validate the flow pattern.
is_valid
bool
True if the flow pattern is valid
Raises:
  • ValueError: If the flow pattern is incorrectly formatted or contains unregistered agents

to_dict()

Convert all attributes to a dictionary for serialization.
dict_representation
Dict[str, Any]
Dictionary representation of all class attributes

Attributes

AttributeTypeDescription
idstrUnique identifier for the system
namestrHuman-readable name
descriptionstrDescription of the system’s purpose
agentsDict[str, Agent]Dictionary mapping agent names to Agent objects
flowstrFlow pattern defining agent execution order
conversationConversationConversation history management

Usage Examples

Sequential Flow

from swarms import Agent, AgentRearrange
from swarms.models import OpenAIChat

# Initialize LLM
llm = OpenAIChat(model_name="gpt-4o")

# Create agents
researcher = Agent(
    agent_name="researcher",
    llm=llm,
    system_prompt="You are a research specialist."
)

writer = Agent(
    agent_name="writer",
    llm=llm,
    system_prompt="You are a content writer."
)

editor = Agent(
    agent_name="editor",
    llm=llm,
    system_prompt="You are an editor."
)

# Define sequential flow
flow = "researcher -> writer -> editor"

# Create rearrange system
system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow=flow,
    max_loops=1
)

# Execute task
result = system.run("Write a blog post about AI")
print(result)

Concurrent Flow

# Define concurrent flow - all agents run simultaneously
flow = "researcher, writer, editor"

system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow=flow
)

result = system.run("Analyze this topic from different angles")

Mixed Sequential and Concurrent Flow

# Create more agents
data_collector = Agent(
    agent_name="data_collector",
    llm=llm,
    system_prompt="You collect and organize data."
)

technical_analyst = Agent(
    agent_name="technical_analyst",
    llm=llm,
    system_prompt="You analyze technical aspects."
)

business_analyst = Agent(
    agent_name="business_analyst",
    llm=llm,
    system_prompt="You analyze business aspects."
)

synthesizer = Agent(
    agent_name="synthesizer",
    llm=llm,
    system_prompt="You synthesize multiple perspectives."
)

# Mixed flow: collect data, then analyze concurrently, then synthesize
flow = "data_collector -> technical_analyst, business_analyst -> synthesizer"

system = AgentRearrange(
    agents=[data_collector, technical_analyst, business_analyst, synthesizer],
    flow=flow,
    team_awareness=True  # Agents know about each other
)

result = system.run("Analyze the market opportunity for AI assistants")

Human-in-the-Loop

# Include human review in the flow
flow = "researcher -> writer -> H -> editor"

system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow=flow,
    human_in_the_loop=True
)

result = system.run("Create a marketing campaign")
# System will pause for human input after the writer and before the editor

Custom Human-in-the-Loop Handler

def custom_human_handler(agent_output: str) -> str:
    """Custom function to handle human interaction"""
    print(f"Agent output: {agent_output}")
    feedback = input("Your feedback: ")
    return feedback

flow = "agent1 -> H -> agent2"

system = AgentRearrange(
    agents=[agent1, agent2],
    flow=flow,
    human_in_the_loop=True,
    custom_human_in_the_loop=custom_human_handler
)

Batch Processing

# Process multiple tasks through the same flow
tasks = [
    "Analyze healthcare AI trends",
    "Analyze education AI trends",
    "Analyze finance AI trends"
]

system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow="researcher -> writer -> editor"
)

results = system.batch_run(tasks, batch_size=2)

for task, result in zip(tasks, results):
    print(f"Task: {task}")
    print(f"Result: {result}")
    print("-" * 80)

Concurrent Task Execution

tasks = [
    "Research AI in healthcare",
    "Research AI in education",
    "Research AI in finance"
]

# Run all tasks in parallel
results = system.concurrent_run(tasks, max_workers=3)
print(f"Processed {len(results)} tasks concurrently")

Async Execution

import asyncio

async def process_async():
    result = await system.run_async("Analyze quantum computing trends")
    return result

result = asyncio.run(process_async())

Dynamic Flow Modification

# Create system with initial flow
system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow="researcher -> writer -> editor"
)

# Run with initial flow
result1 = system.run("Task 1")

# Change flow dynamically
system.set_custom_flow("researcher -> editor -> writer")

# Run with new flow
result2 = system.run("Task 2")

With Memory System

from swarms.memory import ChromaDB

# Create memory system
memory = ChromaDB(
    output_dir="./agent_memory",
    n_results=5
)

system = AgentRearrange(
    agents=[researcher, writer],
    flow="researcher -> writer",
    memory_system=memory  # Persist conversation across runs
)

result = system.run("Analyze AI trends")

With Team Awareness

# Agents will know about their position in the workflow
system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow="researcher -> writer -> editor",
    team_awareness=True,  # Enable team awareness
    verbose=True
)

result = system.run("Create comprehensive analysis")
# Each agent will receive information about agents ahead and behind

With Rules

rules = """
All agents must:
1. Cite sources
2. Be objective
3. Flag uncertainties
4. Respect data privacy
"""

system = AgentRearrange(
    agents=[researcher, writer, editor],
    flow="researcher -> writer -> editor",
    rules=rules  # Add system-wide rules
)

Different Output Types

# Return all outputs
system_all = AgentRearrange(
    agents=[researcher, writer],
    flow="researcher -> writer",
    output_type="all"  # Return all agent outputs
)

# Return only final output
system_final = AgentRearrange(
    agents=[researcher, writer],
    flow="researcher -> writer",
    output_type="final"  # Return only last agent's output
)

# Return as list
system_list = AgentRearrange(
    agents=[researcher, writer],
    flow="researcher -> writer",
    output_type="list"  # Return list of outputs
)

# Return as dict
system_dict = AgentRearrange(
    agents=[researcher, writer],
    flow="researcher -> writer",
    output_type="dict"  # Return dict mapping agent names to outputs
)

Convenience Function

The rearrange() function provides a quick way to create and execute:
from swarms import rearrange

result = rearrange(
    name="Quick Analysis",
    agents=[researcher, writer, editor],
    flow="researcher -> writer -> editor",
    task="Analyze AI trends in 2024"
)

Error Handling

try:
    system = AgentRearrange(
        agents=[researcher, writer],
        flow="researcher -> writer"
    )
    result = system.run("Process this task")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")

Best Practices

  1. Flow Design: Carefully design your flow to match your task requirements
  2. Team Awareness: Enable for complex flows where context matters
  3. Human Review: Add H at critical decision points
  4. Error Handling: Always wrap execution in try-except blocks
  5. Logging: Enable verbose mode during development
  6. Memory Systems: Use for tasks requiring persistent context
  7. Flow Validation: Always validate flows before production use
  8. Agent Naming: Use clear, descriptive agent names in flows

Common Flow Patterns

Fan-Out Pattern

# One agent feeds multiple agents
flow = "collector -> analyst1, analyst2, analyst3 -> synthesizer"

Pipeline Pattern

# Linear processing chain
flow = "stage1 -> stage2 -> stage3 -> stage4"

Review Pattern

# Create, review, revise
flow = "creator -> reviewer -> H -> reviser"

Ensemble Pattern

# Multiple independent analyses
flow = "agent1, agent2, agent3, agent4 -> aggregator"

Build docs developers (and LLMs) love