Skip to main content

Overview

Swarms provides powerful utilities for scaling agent execution horizontally through concurrent and parallel processing. This guide covers the different scaling patterns, performance optimization techniques, and best practices for running agents at scale.

Scaling Patterns

Swarms offers multiple execution patterns for different scaling scenarios:
PatternUse CaseMax WorkersExecution
ConcurrentI/O-bound tasks, API calls95% of CPU coresThread-based
AsyncHigh-throughput async operationsConfigurableEvent loop
BatchLarge-scale processingConfigurableBatched
GridDifferent tasks per agent90% of CPU coresThread-based
UVLoopUltra-high performance async95% of CPU coresOptimized event loop

Concurrent Execution

Basic Concurrent Execution

Run multiple agents on the same task concurrently using ThreadPoolExecutor:
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently

# Create multiple agents
agents = [
    Agent(
        agent_name=f"Worker-{i}",
        model_name="gpt-4o-mini",
        max_loops=1,
    )
    for i in range(5)
]

# Run all agents concurrently on the same task
results = run_agents_concurrently(
    agents=agents,
    task="Analyze the potential impact of AI on healthcare",
    max_workers=None,  # Uses 95% of CPU cores by default
)

# Results is a list of outputs in completion order
for i, result in enumerate(results):
    print(f"Agent {i+1} result: {result}")

Concurrent with Dictionary Output

Get results as a dictionary mapping agent names to outputs:
results_dict = run_agents_concurrently(
    agents=agents,
    task="Generate a market analysis report",
    return_agent_output_dict=True,  # Return as dict
)

# Results preserve agent order
for agent_name, output in results_dict.items():
    print(f"Result from {agent_name}:")
    print(output)
    print("-" * 50)

Concurrent with Images

from swarms.structs.multi_agent_exec import run_agents_concurrently

# Process image across multiple agents
results = run_agents_concurrently(
    agents=vision_agents,
    task="Analyze this medical scan",
    img="path/to/scan.jpg",
    max_workers=5,
)

Asynchronous Execution

Basic Async Execution

import asyncio
from swarms.structs.multi_agent_exec import (
    run_agent_async,
    run_agents_concurrently_async,
)

async def process_tasks():
    agents = [
        Agent(agent_name=f"Async-Agent-{i}", model_name="gpt-4o-mini")
        for i in range(10)
    ]
    
    # Run all agents asynchronously
    results = await run_agents_concurrently_async(
        agents=agents,
        task="Process this data"
    )
    
    return results

# Run the async function
results = asyncio.run(process_tasks())

High-Performance with UVLoop

For maximum async performance, use uvloop (Linux/macOS) or winloop (Windows):
from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop

# Automatically uses uvloop on Unix, winloop on Windows
results = run_agents_concurrently_uvloop(
    agents=agents,
    task="High-throughput task processing",
    max_workers=None,  # Uses 95% of CPU cores
)

# Also supports different tasks per agent
results = run_agents_with_tasks_uvloop(
    agents=agents,
    tasks=[f"Task {i}" for i in range(len(agents))],
    max_workers=10,
)

Batch Processing

Batched Concurrent Execution

Process agents in batches to avoid resource exhaustion:
from swarms.structs.multi_agent_exec import (
    run_agents_concurrently_multiprocess
)
import os

# Process large number of agents in batches
agents = [Agent(agent_name=f"Agent-{i}") for i in range(100)]

results = run_agents_concurrently_multiprocess(
    agents=agents,
    task="Process this task",
    batch_size=os.cpu_count(),  # Process in CPU-sized batches
)

Grid Execution (Different Tasks)

Run different tasks across different agents:
from swarms.structs.multi_agent_exec import batched_grid_agent_execution

# Create specialized agents
agents = [
    Agent(agent_name="Researcher", system_prompt="Research expert"),
    Agent(agent_name="Writer", system_prompt="Content writer"),
    Agent(agent_name="Analyst", system_prompt="Data analyst"),
]

# Different task for each agent
tasks = [
    "Research AI trends",
    "Write a blog post",
    "Analyze market data",
]

# Execute in parallel
results = batched_grid_agent_execution(
    agents=agents,
    tasks=tasks,
    max_workers=None,  # Uses 90% of CPU cores
)

# Results maintain order of input agents
for i, result in enumerate(results):
    print(f"{agents[i].agent_name} completed: {tasks[i]}")
    print(f"Result: {result}")

Batch with Agent-Task Pairs

from swarms.structs.multi_agent_exec import run_agents_with_different_tasks

# Create agent-task pairs
agent_task_pairs = [
    (researcher, "Research quantum computing"),
    (writer, "Write about AI ethics"),
    (analyst, "Analyze stock trends"),
    (editor, "Edit research paper"),
    # ... hundreds more pairs
]

# Process in batches
results = run_agents_with_different_tasks(
    agent_task_pairs=agent_task_pairs,
    batch_size=10,  # Process 10 at a time
    max_workers=5,  # Use 5 workers per batch
)

Queue-Based Scaling

For production deployments, use AOP’s queue-based execution:
from swarms import Agent
from swarms.structs.aop import AOP

# Create agents
agents = [
    Agent(agent_name=f"Worker-{i}", model_name="gpt-4o")
    for i in range(10)
]

# Deploy with queue-based execution
deployer = AOP(
    server_name="ScalableCluster",
    agents=agents,
    queue_enabled=True,
    max_workers_per_agent=5,  # 5 workers per agent
    max_queue_size_per_agent=1000,  # Large queue capacity
)

deployer.run()

Queue Configuration for Scale

deployer = AOP(
    server_name="HighThroughputCluster",
    queue_enabled=True,
    
    # Worker configuration
    max_workers_per_agent=10,  # More workers = more throughput
    
    # Queue configuration
    max_queue_size_per_agent=10000,  # Large queue
    processing_timeout=60,  # Adjust based on task complexity
    retry_delay=0.5,  # Quick retries
    
    # Performance tuning
    verbose=False,  # Reduce logging overhead
)

Performance Optimization

Worker Configuration

import os

# Calculate optimal worker count
num_cores = os.cpu_count()

# For I/O-bound tasks (API calls, network)
io_bound_workers = int(num_cores * 2)  # 2x cores

# For CPU-bound tasks
cpu_bound_workers = num_cores  # 1x cores

# For mixed workloads
mixed_workers = int(num_cores * 1.5)  # 1.5x cores

results = run_agents_concurrently(
    agents=agents,
    task="Task",
    max_workers=io_bound_workers,
)

Dynamic Context Window

Optimize token usage with dynamic context windows:
agent = Agent(
    agent_name="Optimized-Agent",
    model_name="gpt-4o",
    dynamic_context_window=True,  # Auto-manage context
    context_length=8000,
)

Memory Management

# Truncate history to prevent memory bloat
agent = Agent(
    agent_name="Memory-Efficient-Agent",
    model_name="gpt-4o",
    context_length=4000,
)

# Manual truncation after processing
for task in large_task_list:
    result = agent.run(task)
    agent.truncate_history(max_messages=20)  # Keep only recent messages

Batch Size Tuning

def find_optimal_batch_size(agents, test_task):
    """
    Find optimal batch size through testing.
    """
    import time
    
    batch_sizes = [5, 10, 20, 50, 100]
    results = {}
    
    for batch_size in batch_sizes:
        start = time.time()
        run_agents_concurrently_multiprocess(
            agents[:batch_size],
            test_task,
            batch_size=batch_size,
        )
        duration = time.time() - start
        results[batch_size] = duration
        print(f"Batch size {batch_size}: {duration:.2f}s")
    
    optimal = min(results.items(), key=lambda x: x[1])
    print(f"Optimal batch size: {optimal[0]}")
    return optimal[0]

Load Balancing

Round-Robin Distribution

class LoadBalancer:
    def __init__(self, agents):
        self.agents = agents
        self.current_index = 0
    
    def get_next_agent(self):
        agent = self.agents[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.agents)
        return agent
    
    def process_tasks(self, tasks):
        results = []
        for task in tasks:
            agent = self.get_next_agent()
            result = agent.run(task)
            results.append(result)
        return results

# Usage
balancer = LoadBalancer(agents)
results = balancer.process_tasks(large_task_list)

Priority-Based Distribution

import heapq

class PriorityLoadBalancer:
    def __init__(self, agents):
        # Track agent load (priority queue)
        self.agent_load = [(0, agent) for agent in agents]
        heapq.heapify(self.agent_load)
    
    def assign_task(self, task, priority=0):
        # Get least loaded agent
        load, agent = heapq.heappop(self.agent_load)
        
        # Process task
        result = agent.run(task)
        
        # Update load and re-add to queue
        heapq.heappush(self.agent_load, (load + 1 - priority, agent))
        
        return result

Monitoring at Scale

Real-Time Metrics

import time
from collections import defaultdict

class ScalingMetrics:
    def __init__(self):
        self.agent_metrics = defaultdict(lambda: {
            "requests": 0,
            "successes": 0,
            "failures": 0,
            "total_duration": 0.0,
        })
    
    def record(self, agent_name, success, duration):
        metrics = self.agent_metrics[agent_name]
        metrics["requests"] += 1
        metrics["total_duration"] += duration
        if success:
            metrics["successes"] += 1
        else:
            metrics["failures"] += 1
    
    def get_summary(self):
        summary = {}
        for agent_name, metrics in self.agent_metrics.items():
            avg_duration = (
                metrics["total_duration"] / metrics["requests"]
                if metrics["requests"] > 0 else 0
            )
            success_rate = (
                metrics["successes"] / metrics["requests"]
                if metrics["requests"] > 0 else 0
            )
            summary[agent_name] = {
                "requests": metrics["requests"],
                "success_rate": f"{success_rate:.1%}",
                "avg_duration": f"{avg_duration:.2f}s",
            }
        return summary

metrics = ScalingMetrics()

def monitored_run(agent, task):
    start = time.time()
    success = False
    try:
        result = agent.run(task)
        success = True
        return result
    finally:
        duration = time.time() - start
        metrics.record(agent.agent_name, success, duration)

Throughput Monitoring

import threading
import time

class ThroughputMonitor:
    def __init__(self, window_size=60):
        self.window_size = window_size
        self.requests = []
        self.lock = threading.Lock()
    
    def record_request(self):
        with self.lock:
            now = time.time()
            self.requests.append(now)
            # Remove old requests outside window
            cutoff = now - self.window_size
            self.requests = [t for t in self.requests if t > cutoff]
    
    def get_throughput(self):
        with self.lock:
            return len(self.requests) / self.window_size
    
    def start_reporting(self, interval=10):
        def report():
            while True:
                throughput = self.get_throughput()
                print(f"Current throughput: {throughput:.2f} req/s")
                time.sleep(interval)
        
        thread = threading.Thread(target=report, daemon=True)
        thread.start()

monitor = ThroughputMonitor()
monitor.start_reporting()

# Record each request
for task in tasks:
    result = agent.run(task)
    monitor.record_request()

Best Practices

1. Choose the Right Pattern

  • I/O-bound tasks (API calls): Use run_agents_concurrently with high worker count
  • CPU-bound tasks: Use run_agents_concurrently with worker count = CPU cores
  • Async workloads: Use run_agents_concurrently_uvloop
  • Large scale: Use AOP with queue-based execution
  • Mixed tasks: Use batched_grid_agent_execution

2. Configure Workers Appropriately

import os

# Get CPU count
num_cores = os.cpu_count()

# I/O-bound: 2x cores
max_workers = num_cores * 2

# CPU-bound: 1x cores  
max_workers = num_cores

# Conservative: 90-95% of cores
max_workers = int(num_cores * 0.9)

3. Implement Graceful Degradation

def resilient_concurrent_run(agents, task, max_workers=None):
    try:
        return run_agents_concurrently(
            agents=agents,
            task=task,
            max_workers=max_workers,
        )
    except Exception as e:
        logger.error(f"Concurrent execution failed: {e}")
        logger.info("Falling back to sequential execution")
        # Fallback to sequential
        return [agent.run(task) for agent in agents]

4. Monitor Resource Usage

import psutil

def check_resources():
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent
    
    if cpu_percent > 90:
        logger.warning(f"High CPU usage: {cpu_percent}%")
    if memory_percent > 85:
        logger.warning(f"High memory usage: {memory_percent}%")
    
    return {
        "cpu_percent": cpu_percent,
        "memory_percent": memory_percent,
    }

5. Use Appropriate Timeouts

agent = Agent(
    agent_name="Timeout-Agent",
    model_name="gpt-4o",
    timeout=30,  # Set based on expected task duration
)

Build docs developers (and LLMs) love