Skip to main content

Overview

The BaseSwarm class is an abstract base class (ABC) that provides the foundation for all multi-agent systems in the Swarms framework. It defines the core interface and common functionality for orchestrating multiple agents to work together on complex tasks.

Import

from swarms.structs import BaseSwarm

Key Features

  • Agent Management: Add, remove, and query agents in the swarm
  • Task Distribution: Assign tasks to specific agents or broadcast to all
  • Lifecycle Management: Pause, resume, stop, and restart agents
  • Scaling: Dynamic agent scaling (scale up/down/to specific size)
  • State Management: Save and load swarm state
  • Batch Operations: Run tasks across multiple agents concurrently
  • Async Support: Full async/await support for concurrent execution
  • Conversation Management: Built-in conversation tracking

Initialization

name
str
Name of the swarm for identification and logging
description
str
Description of the swarm’s purpose and capabilities
agents
List[Union[Agent, Callable]]
List of Agent instances or callables to include in the swarm
models
List[Any]
List of language models to use across agents
max_loops
int
default:"200"
Maximum number of execution loops for the swarm
callbacks
Sequence[callable]
Callback functions to execute during swarm operations
autosave
bool
default:"false"
Automatically save swarm state to file
logging
bool
default:"false"
Enable detailed logging for debugging
return_metadata
bool
default:"false"
Include metadata in return values
metadata_filename
str
default:"multiagent_structure_metadata.json"
Filename for saving swarm metadata
stopping_function
Callable
Function that determines when the swarm should stop execution
stopping_condition
str
default:"stop"
Condition string that triggers swarm termination
stopping_condition_args
Dict
Arguments passed to the stopping condition function
speaker_selection_func
Callable
Function to select which agent should speak next in multi-agent conversations
rules
str
Rules that govern swarm behavior and agent interactions
collective_memory_system
Any
default:"false"
Shared memory system accessible by all agents
agent_ops_on
bool
default:"false"
Enable AgentOps tracking for all agents in the swarm
output_schema
BaseModel
Pydantic model defining the expected output structure

Core Methods

run

Execute the swarm’s main task loop.
def run(
    task: str = None,
    *args,
    **kwargs
) -> Any
task
str
The task for the swarm to execute
return
Any
Result of swarm execution (implementation-specific)

call

Alternative syntax for running the swarm (calls run internally).
def __call__(
    task: str,
    *args,
    **kwargs
) -> Any

Agent Management

add_agent

Add a single agent to the swarm.
def add_agent(
    agent: AgentType
) -> None
agent
AgentType
The agent instance to add

add_agents

Add multiple agents to the swarm.
def add_agents(
    agents: List[AgentType]
) -> None
agents
List[AgentType]
List of agent instances to add

remove_agent

Remove an agent from the swarm.
def remove_agent(
    agent: AgentType
) -> None

get_agent_by_name

Retrieve an agent by its name.
def get_agent_by_name(
    name: str
) -> AgentType
name
str
Name of the agent to retrieve
return
AgentType
The agent instance with matching name, or None if not found

get_agent_by_id

Retrieve an agent by its ID.
def get_agent_by_id(
    id: str
) -> AgentType
id
str
ID of the agent to retrieve
return
AgentType
The agent instance with matching ID, or None if not found

agent_exists

Check if an agent exists in the swarm.
def agent_exists(
    name: str
) -> bool
name
str
Name of the agent to check
return
bool
True if agent exists, False otherwise

Task Management

assign_task

Assign a specific task to an agent.
def assign_task(
    agent: AgentType,
    task: Any
) -> Dict
agent
AgentType
The agent to assign the task to
task
Any
The task to assign
return
Dict
Task execution result

task_assignment_by_name

Assign a task to an agent by name.
def task_assignment_by_name(
    task: str,
    agent_name: str,
    *args,
    **kwargs
) -> Any

broadcast

Broadcast a message to all agents in the swarm.
def broadcast(
    message: str,
    sender: Optional[AgentType] = None
) -> None
message
str
Message to broadcast
sender
AgentType
Optional sender agent

direct_message

Send a direct message from one agent to another.
def direct_message(
    message: str,
    sender: AgentType,
    recipient: AgentType
) -> None

Batch Operations

batched_run

Run multiple tasks in batch mode.
def batched_run(
    tasks: List[Any],
    *args,
    **kwargs
) -> List[Any]
tasks
List[Any]
List of tasks to execute
return
List[Any]
List of results for each task

run_batch

Alias for batched_run.
def run_batch(
    tasks: List[str],
    *args,
    **kwargs
) -> List[Any]

concurrent_run

Run a task concurrently across all agents.
def concurrent_run(
    task: str
) -> List[str]
task
str
Task to run on all agents
return
List[str]
List of responses from each agent

run_all

Run a task on all agents sequentially.
def run_all(
    task: str = None,
    *args,
    **kwargs
) -> List[Any]

run_on_all_agents

Run a task on all agents using ThreadPoolExecutor.
def run_on_all_agents(
    task: str = None,
    *args,
    **kwargs
) -> List[Any]

Async Operations

arun

Run the swarm asynchronously.
async def arun(
    task: Optional[str] = None,
    *args,
    **kwargs
) -> Any

abatch_run

Run multiple tasks asynchronously in batch.
async def abatch_run(
    tasks: List[str],
    *args,
    **kwargs
) -> List[Any]

run_async

Run the swarm asynchronously (synchronous wrapper).
def run_async(
    task: Optional[str] = None,
    *args,
    **kwargs
) -> Any

run_batch_async

Run batch tasks asynchronously (synchronous wrapper).
def run_batch_async(
    tasks: List[str],
    *args,
    **kwargs
) -> Any

Lifecycle Management

pause_agent

Pause an agent’s execution.
def pause_agent(
    agent: AgentType,
    agent_id: str
) -> None

resume_agent

Resume a paused agent.
def resume_agent(
    agent: AgentType,
    agent_id: str
) -> None

stop_agent

Stop an agent’s execution.
def stop_agent(
    agent: AgentType,
    agent_id: str
) -> None

restart_agent

Restart an agent.
def restart_agent(
    agent: AgentType
) -> None

reset_all_agents

Reset the state of all agents.
def reset_all_agents() -> None

Scaling

scale_up

Increase the number of agents.
def scale_up(
    num_agent: int
) -> None
num_agent
int
Number of agents to add

scale_down

Decrease the number of agents.
def scale_down(
    num_agent: int
) -> None
num_agent
int
Number of agents to remove

scale_to

Scale to a specific number of agents.
def scale_to(
    num_agent: int
) -> None
num_agent
int
Target number of agents

State Management

save_to_json

Save swarm state to JSON file.
def save_to_json(
    filename: str
) -> None
filename
str
Path to save JSON file

load_from_json

Load swarm state from JSON file.
def load_from_json(
    filename: str
) -> None
filename
str
Path to JSON file to load

save_to_yaml

Save swarm state to YAML file.
def save_to_yaml(
    filename: str
) -> None

load_from_yaml

Load swarm state from YAML file.
def load_from_yaml(
    filename: str
) -> None

metadata

Get swarm metadata.
def metadata() -> dict
return
dict
Dictionary containing swarm metadata (agents, callbacks, autosave, logging, conversation)

Examples

Basic Swarm Implementation

from swarms import Agent, BaseSwarm

class SimpleSwarm(BaseSwarm):
    def run(self, task: str, *args, **kwargs):
        """Run task on all agents and aggregate results."""
        results = []
        for agent in self.agents:
            result = agent.run(task)
            results.append(result)
        return results

# Create agents
agent1 = Agent(agent_name="Agent-1", model_name="gpt-4o")
agent2 = Agent(agent_name="Agent-2", model_name="gpt-4o")

# Create swarm
swarm = SimpleSwarm(
    name="Research-Swarm",
    description="A swarm for research tasks",
    agents=[agent1, agent2]
)

# Run task
results = swarm.run("Analyze market trends")

Dynamic Agent Management

from swarms import Agent, BaseSwarm

class DynamicSwarm(BaseSwarm):
    def run(self, task: str, *args, **kwargs):
        return self.run_all(task)

swarm = DynamicSwarm(
    name="Dynamic-Swarm",
    agents=[]
)

# Add agents dynamically
for i in range(5):
    agent = Agent(
        agent_name=f"Agent-{i}",
        model_name="gpt-4o"
    )
    swarm.add_agent(agent)

# Check if agent exists
if swarm.agent_exists("Agent-0"):
    print("Agent-0 is in the swarm")

# Get agent by name
agent = swarm.get_agent_by_name("Agent-2")

# Scale the swarm
swarm.scale_up(3)  # Add 3 more agents
swarm.scale_down(2)  # Remove 2 agents

Batch Task Processing

class BatchSwarm(BaseSwarm):
    def run(self, task: str, *args, **kwargs):
        # Implementation
        pass

swarm = BatchSwarm(
    name="Batch-Processor",
    agents=[agent1, agent2, agent3]
)

# Run multiple tasks
tasks = [
    "Task 1: Analyze data",
    "Task 2: Generate report",
    "Task 3: Send summary"
]

results = swarm.batched_run(tasks)

Async Swarm Execution

import asyncio

class AsyncSwarm(BaseSwarm):
    async def run(self, task: str, *args, **kwargs):
        # Async implementation
        pass

swarm = AsyncSwarm(
    name="Async-Swarm",
    agents=[agent1, agent2]
)

# Run asynchronously
result = await swarm.arun("Process async task")

# Run batch async
tasks = ["Task 1", "Task 2", "Task 3"]
results = await swarm.abatch_run(tasks)

Swarm with State Persistence

class PersistentSwarm(BaseSwarm):
    def run(self, task: str, *args, **kwargs):
        results = self.run_all(task)
        
        # Auto-save after execution
        if self.autosave:
            self.save_to_json(self.metadata_filename)
        
        return results

swarm = PersistentSwarm(
    name="Persistent-Swarm",
    agents=[agent1, agent2],
    autosave=True,
    metadata_filename="swarm_state.json"
)

result = swarm.run("Execute task")

# Load saved state later
swarm.load_from_json("swarm_state.json")

Properties

BaseSwarm provides several built-in properties:
  • agents_dict: Dictionary mapping agent names to agent instances
  • conversation: Conversation object for tracking agent interactions

Best Practices

  1. Always validate agents: Ensure agents list is not empty and all agents are valid
  2. Implement abstract methods: Override run(), communicate(), and other abstract methods
  3. Use async for I/O-bound tasks: Leverage async methods for better performance
  4. Enable autosave for long-running swarms: Prevent state loss
  5. Set appropriate max_loops: Prevent infinite loops
  6. Use callbacks for monitoring: Track swarm execution progress
  7. Implement proper error handling: Handle agent failures gracefully
  8. Scale dynamically based on load: Use scaling methods to optimize resources

Build docs developers (and LLMs) love