Skip to main content

Overview

This example demonstrates how to build a data analysis swarm that processes complex datasets through parallel and sequential workflows. The swarm handles data collection, cleaning, analysis, visualization, and reporting in an orchestrated manner.

Business Value

  • Faster Insights: Reduce analysis time from days to hours
  • Comprehensive Analysis: Multiple analytical perspectives on the same data
  • Automated Reporting: Generate executive-ready reports automatically
  • Error Reduction: Built-in validation and quality checks
  • Scalability: Handle multiple datasets and analysis requests concurrently

Architecture Choice

We use ConcurrentWorkflow for parallel analysis combined with sequential reporting because:
  • Multiple analytical approaches can run simultaneously
  • Different analysis types (statistical, trend, anomaly) are independent
  • Parallel execution dramatically reduces time-to-insight
  • Final reporting stage synthesizes all parallel results

Complete Implementation

from swarms import Agent, ConcurrentWorkflow, SequentialWorkflow
import os

# Configure your LLM
api_key = os.getenv("OPENAI_API_KEY")

# Define data analysis agents
data_collector = Agent(
    agent_name="Data-Collector",
    system_prompt="""
    You are a data collection specialist.
    Your role is to:
    - Identify relevant data sources for the analysis
    - Extract and gather required datasets
    - Document data provenance and metadata
    - Flag data quality issues
    - Structure data for analysis
    
    Provide clean, well-documented datasets ready for analysis.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

data_cleaner = Agent(
    agent_name="Data-Cleaner",
    system_prompt="""
    You are a data cleaning and validation expert.
    Your role is to:
    - Identify missing, duplicate, or invalid data
    - Apply appropriate cleaning strategies
    - Normalize and standardize data formats
    - Document all transformations
    - Validate data integrity
    
    Ensure data quality before analysis begins.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

statistical_analyst = Agent(
    agent_name="Statistical-Analyst",
    system_prompt="""
    You are a statistical analyst specializing in quantitative analysis.
    Your role is to:
    - Calculate descriptive statistics (mean, median, variance, etc.)
    - Perform hypothesis testing and significance tests
    - Identify correlations and relationships
    - Assess statistical confidence and margins of error
    - Interpret statistical findings in business context
    
    Provide rigorous statistical insights.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

trend_analyst = Agent(
    agent_name="Trend-Analyst",
    system_prompt="""
    You are a trend analysis expert.
    Your role is to:
    - Identify patterns and trends over time
    - Perform time-series analysis
    - Detect seasonality and cyclical patterns
    - Project future trends based on historical data
    - Assess trend strength and reliability
    
    Uncover temporal patterns and forecast future states.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

anomaly_detector = Agent(
    agent_name="Anomaly-Detector",
    system_prompt="""
    You are an anomaly detection specialist.
    Your role is to:
    - Identify outliers and unusual patterns
    - Distinguish between noise and significant anomalies
    - Investigate potential causes of anomalies
    - Assess impact and urgency of anomalies
    - Recommend investigation priorities
    
    Find the unexpected insights hidden in data.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

visualization_expert = Agent(
    agent_name="Visualization-Expert",
    system_prompt="""
    You are a data visualization expert.
    Your role is to:
    - Design effective charts and graphs for findings
    - Choose appropriate visualization types for data
    - Create visual narratives that tell the story
    - Ensure visualizations are accessible and clear
    - Provide specifications for implementation
    
    Transform complex data into compelling visuals.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

report_synthesizer = Agent(
    agent_name="Report-Synthesizer",
    system_prompt="""
    You are a data analyst who synthesizes findings into executive reports.
    Your role is to:
    - Combine insights from all analysis streams
    - Identify key findings and actionable insights
    - Structure information for executive audience
    - Highlight business implications
    - Provide clear recommendations
    
    Create comprehensive, actionable analysis reports.
    """,
    model_name="gpt-4o",
    max_loops=1,
    dynamic_temperature_enabled=True,
)

# Create the data analysis workflow
# First: Sequential data preparation
prep_workflow = SequentialWorkflow(
    name="Data-Preparation",
    agents=[data_collector, data_cleaner],
    max_loops=1,
)

# Second: Concurrent analysis of different aspects
analysis_workflow = ConcurrentWorkflow(
    name="Parallel-Analysis",
    agents=[statistical_analyst, trend_analyst, anomaly_detector],
    max_loops=1,
)

# Third: Sequential visualization and reporting
reporting_workflow = SequentialWorkflow(
    name="Reporting-Pipeline",
    agents=[visualization_expert, report_synthesizer],
    max_loops=1,
)

# Execute complete data analysis
if __name__ == "__main__":
    analysis_request = """
    Analyze the following dataset:
    
    Dataset: Quarterly sales data for the past 3 years
    Columns: Date, Region, Product, Revenue, Units, Customer_Segment
    
    Analysis objectives:
    1. Identify revenue trends and growth patterns
    2. Detect any anomalies or unusual performance
    3. Compare performance across regions and segments
    4. Forecast next quarter's performance
    5. Recommend strategic actions based on findings
    
    Provide comprehensive analysis with visualizations and executive summary.
    """
    
    # Stage 1: Prepare the data
    print("Stage 1: Data preparation...")
    cleaned_data = prep_workflow.run(analysis_request)
    
    # Stage 2: Run parallel analyses
    print("Stage 2: Running parallel analyses...")
    analysis_results = analysis_workflow.run(cleaned_data)
    
    # Stage 3: Visualize and synthesize report
    print("Stage 3: Creating visualizations and final report...")
    final_report = reporting_workflow.run(analysis_results)
    
    # Save the final report
    with open("sales_analysis_report.md", "w") as f:
        f.write(final_report)
    
    print("Analysis complete! Report saved to sales_analysis_report.md")

How It Works

Stage 1: Data Preparation (Sequential)

  1. Data Collector gathers and structures the dataset
  2. Data Cleaner validates and cleans the data

Stage 2: Analysis (Concurrent)

Three analysts work in parallel:
  • Statistical Analyst performs quantitative analysis
  • Trend Analyst identifies patterns and forecasts
  • Anomaly Detector finds outliers and unusual patterns

Stage 3: Reporting (Sequential)

  1. Visualization Expert designs charts and graphs
  2. Report Synthesizer combines all insights into final report

Customization Tips

Add Domain-Specific Analysts

marketing_analyst = Agent(
    agent_name="Marketing-Analyst",
    system_prompt="""
    You analyze marketing-specific metrics:
    - Customer acquisition cost (CAC)
    - Lifetime value (LTV)
    - Conversion rates and funnel analysis
    - Campaign performance and ROI
    - Channel attribution
    """,
    model_name="gpt-4o",
    max_loops=1,
)

# Add to parallel analysis
analysis_workflow = ConcurrentWorkflow(
    agents=[statistical_analyst, trend_analyst, anomaly_detector, marketing_analyst],
)

Add Predictive Modeling

ml_modeler = Agent(
    agent_name="ML-Modeler",
    system_prompt="""
    You are a machine learning specialist.
    Your role is to:
    - Recommend appropriate ML models for the data
    - Define features and target variables
    - Assess model performance and accuracy
    - Interpret model predictions
    - Identify key predictive factors
    """,
    model_name="gpt-4o",
    max_loops=1,
)

Add Comparative Analysis

comparative_analyst = Agent(
    agent_name="Comparative-Analyst",
    system_prompt="""
    You perform comparative analysis:
    - Benchmark against industry standards
    - Compare performance across segments
    - Identify best and worst performers
    - Analyze competitive positioning
    - Highlight performance gaps
    """,
    model_name="gpt-4o",
    max_loops=1,
)

Configure for Real-Time Analysis

from swarms import Agent

real_time_monitor = Agent(
    agent_name="Real-Time-Monitor",
    system_prompt="Monitor streaming data for immediate insights...",
    max_loops="auto",  # Continuous monitoring
    stopping_condition="critical_threshold_reached",
)

Add Data Quality Scoring

data_cleaner = Agent(
    agent_name="Data-Cleaner",
    system_prompt="""
    Clean data and provide quality score:
    
    Quality Dimensions:
    - Completeness: % of non-null values
    - Accuracy: % of valid values
    - Consistency: % of standardized formats
    - Timeliness: Age of data
    
    Overall Quality Score: (0-100)
    
    Flag if quality score < 80.
    """,
    model_name="gpt-4o",
    max_loops=1,
)

Real-World Applications

  • Sales Analytics: Revenue analysis, pipeline forecasting, quota tracking
  • Customer Analytics: Churn prediction, segmentation, lifetime value
  • Operations Analytics: Efficiency metrics, bottleneck identification
  • Financial Analytics: P&L analysis, budget variance, financial forecasting
  • Product Analytics: Usage patterns, feature adoption, user engagement
  • Supply Chain Analytics: Inventory optimization, demand forecasting

Performance Optimization

Parallel Processing for Large Datasets

# Split dataset into chunks for parallel processing
chunk_analysts = [
    Agent(agent_name=f"Chunk-Analyst-{i}", ...)
    for i in range(4)  # 4 parallel processors
]

parallel_processing = ConcurrentWorkflow(
    agents=chunk_analysts,
    max_loops=1,
)

Incremental Analysis

incremental_analyst = Agent(
    agent_name="Incremental-Analyst",
    system_prompt="""
    Perform incremental analysis on new data:
    - Compare to previous analysis
    - Identify changes and deltas
    - Update trends and forecasts
    - Flag significant changes
    """,
    max_loops=1,
)

Caching for Repeated Analyses

from swarms.memory import ChromaDB

analysis_memory = ChromaDB(
    output_dir="analysis_cache",
    n_results=3,
)

statistical_analyst = Agent(
    agent_name="Statistical-Analyst",
    long_term_memory=analysis_memory,  # Cache common analyses
    # ... other params
)

Output Examples

The final report includes:
# Sales Analysis Report - Q4 2025

## Executive Summary
- Revenue up 23% YoY, driven by Enterprise segment
- Anomaly detected: 40% spike in returns in APAC region (investigate)
- Forecast: Q1 2026 revenue projected at $4.2M (±8%)

## Statistical Analysis
[Detailed statistics...]

## Trend Analysis
[Trend charts and forecasts...]

## Anomalies & Alerts
[Unusual patterns requiring attention...]

## Recommendations
1. Investigate APAC returns spike
2. Increase investment in Enterprise sales
3. ...

Next Steps

Build docs developers (and LLMs) love