Skip to main content
This example demonstrates how to build a complete data analysis system that can load data, perform statistical analysis, create visualizations, and generate reports.

What You’ll Learn

  • Automated data analysis workflows
  • Safe code execution for data processing
  • Visualization generation
  • Automated report creation
  • Error handling and data validation

Prerequisites

1

Install AutoGen with Docker support

pip install -U "autogen-agentchat" "autogen-ext[openai,docker]"
2

Install Docker

Install Docker Desktop from docker.com
3

Set your OpenAI API key

export OPENAI_API_KEY="sk-..."

Architecture

The data analysis system uses:
  • Data Analyst: Plans analysis approach
  • Code Writer: Generates Python analysis code
  • Code Executor: Runs code safely in Docker
  • Report Writer: Synthesizes findings into reports

Code Example

import asyncio
from typing import Optional
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.code_executors import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient


class DataAnalysisSystem:
    """Automated data analysis system."""
    
    def __init__(self, model_client, work_dir: str = "./analysis_workspace"):
        self.model_client = model_client
        self.work_dir = work_dir
        self.agents = {}
    
    async def setup(self):
        """Initialize all agents with code executor."""
        
        # Create Docker executor with data science packages
        self.executor = DockerCommandLineCodeExecutor(
            image="python:3.11-slim",
            work_dir=self.work_dir,
            timeout=120,
        )
        await self.executor.__aenter__()
        
        # Install required packages in container
        await self._install_packages()
        
        # Data analyst - plans the analysis
        self.agents["analyst"] = AssistantAgent(
            "data_analyst",
            model_client=self.model_client,
            system_message="""You are a data analyst.
            
            When given a data analysis task:
            1. Understand the data and objectives
            2. Plan the analysis approach
            3. Specify what code needs to be written
            4. Interpret results and provide insights
            
            Be thorough and methodical.""",
        )
        
        # Code writer - generates analysis code
        self.agents["coder"] = AssistantAgent(
            "code_writer",
            model_client=self.model_client,
            system_message="""You are a Python data analysis programmer.
            
            Write clean, well-documented code using:
            - pandas for data manipulation
            - numpy for numerical operations
            - matplotlib/seaborn for visualizations
            - scipy/statsmodels for statistics
            
            Always:
            - Add error handling
            - Save visualizations as files
            - Print key findings
            - Use descriptive variable names
            """,
        )
        
        # Code executor - runs the code
        self.agents["executor"] = CodeExecutorAgent(
            "code_executor",
            code_executor=self.executor,
        )
        
        # Report writer - creates final report
        self.agents["reporter"] = AssistantAgent(
            "report_writer",
            model_client=self.model_client,
            system_message="""You are a data analysis report writer.
            
            Create comprehensive reports with:
            1. Executive Summary
            2. Data Overview
            3. Analysis Methodology
            4. Key Findings (with statistics)
            5. Visualizations (reference saved files)
            6. Conclusions and Recommendations
            
            Use clear headings, bullet points, and professional language.""",
        )
    
    async def _install_packages(self):
        """Install data science packages in Docker container."""
        install_code = '''
import subprocess
import sys

packages = [
    "pandas",
    "numpy", 
    "matplotlib",
    "seaborn",
    "scipy",
    "scikit-learn",
]

for package in packages:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])

print("Packages installed successfully")
'''
        result = await self.executor.execute_code_blocks([
            {"code": install_code, "language": "python"}
        ])
    
    async def analyze(self, data_path: str, analysis_task: str) -> str:
        """Perform automated data analysis."""
        
        # Create team
        team = RoundRobinGroupChat(
            participants=[
                self.agents["analyst"],
                self.agents["coder"],
                self.agents["executor"],
                self.agents["reporter"],
            ],
            max_turns=20,
        )
        
        # Run analysis
        task = f"""Analyze the data file: {data_path}
        
        Analysis objective: {analysis_task}
        
        Steps:
        1. Load and explore the data
        2. Perform requested analysis
        3. Create relevant visualizations
        4. Generate a comprehensive report
        """
        
        result = await Console(team.run_stream(task=task))
        
        return result.messages[-1].content
    
    async def cleanup(self):
        """Clean up resources."""
        await self.executor.__aexit__(None, None, None)


async def main() -> None:
    # Initialize
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    analysis_system = DataAnalysisSystem(model_client)
    
    try:
        await analysis_system.setup()
        
        # Example: Analyze sales data
        report = await analysis_system.analyze(
            data_path="sales_data.csv",
            analysis_task="""Analyze sales performance:
            - Calculate total revenue by product category
            - Identify top 10 products
            - Show monthly sales trends
            - Analyze regional performance
            - Create visualizations for key metrics
            """
        )
        
        print("\n" + "="*80)
        print("ANALYSIS REPORT")
        print("="*80)
        print(report)
        
    finally:
        await analysis_system.cleanup()
        await model_client.close()


if __name__ == "__main__":
    asyncio.run(main())

Run the Example

First, create sample data:
# create_sample_data.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Generate sample sales data
np.random.seed(42)

products = ['Widget A', 'Widget B', 'Gadget X', 'Gadget Y', 'Tool Z']
categories = ['Widgets', 'Widgets', 'Gadgets', 'Gadgets', 'Tools']
regions = ['North', 'South', 'East', 'West']

data = []
start_date = datetime(2024, 1, 1)

for i in range(1000):
    data.append({
        'date': start_date + timedelta(days=np.random.randint(0, 365)),
        'product': np.random.choice(products),
        'category': categories[products.index(np.random.choice(products))],
        'region': np.random.choice(regions),
        'quantity': np.random.randint(1, 50),
        'price': np.random.uniform(10, 500),
    })

df = pd.DataFrame(data)
df['revenue'] = df['quantity'] * df['price']
df.to_csv('analysis_workspace/sales_data.csv', index=False)
print("Sample data created: sales_data.csv")
Run the analysis:
python create_sample_data.py
python data_analysis.py

Expected Output

---------- data_analyst ----------
I'll analyze the sales data focusing on:
1. Revenue by product category
2. Top performing products
3. Temporal trends
4. Regional distribution

---------- code_writer ----------
```python
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load data
df = pd.read_csv('sales_data.csv')
df['date'] = pd.to_datetime(df['date'])

# 1. Revenue by category
category_revenue = df.groupby('category')['revenue'].sum().sort_values(ascending=False)
print("Revenue by Category:")
print(category_revenue)

# 2. Top 10 products
top_products = df.groupby('product')['revenue'].sum().nlargest(10)
print("\nTop 10 Products:")
print(top_products)

# 3. Monthly trends
df['month'] = df['date'].dt.to_period('M')
monthly_sales = df.groupby('month')['revenue'].sum()

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Category revenue pie chart
axes[0, 0].pie(category_revenue, labels=category_revenue.index, autopct='%1.1f%%')
axes[0, 0].set_title('Revenue by Category')

# Top products bar chart  
top_products.plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Products by Revenue')
axes[0, 1].tick_params(axis='x', rotation=45)

# Monthly trend line chart
monthly_sales.plot(ax=axes[1, 0])
axes[1, 0].set_title('Monthly Sales Trend')
axes[1, 0].set_xlabel('Month')
axes[1, 0].set_ylabel('Revenue')

# Regional distribution
regional_revenue = df.groupby('region')['revenue'].sum()
regional_revenue.plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Revenue by Region')

plt.tight_layout()
plt.savefig('sales_analysis.png', dpi=300)
print("\nVisualization saved: sales_analysis.png")
---------- code_executor ---------- Code executed successfully. Revenue by Category: category Widgets 245,678Gadgets245,678 Gadgets 198,432
Tools $87,234
Top 10 Products: [… output …] Visualization saved: sales_analysis.png ---------- report_writer ----------

SALES ANALYSIS REPORT

Executive Summary

Total revenue analyzed: 531,344across1,000transactionsTopcategory:Widgets(46.2Highestperformingproduct:WidgetA(531,344 across 1,000 transactions Top category: Widgets (46.2% of revenue) Highest performing product: Widget A (128,456)

Key Findings

  • Widgets category dominates with nearly half of total revenue
  • Strong seasonality observed with peaks in Q2 and Q4
  • Regional distribution relatively balanced
  • Top 3 products account for 52% of total revenue

Recommendations

  1. Increase inventory for Widget products during peak seasons
  2. Investigate underperformance in Tools category
  3. Expand successful product lines
See sales_analysis.png for detailed visualizations.

## Advanced Features

### Statistical Analysis

```python
# Add statistical analyst agent
statistical_analyst = AssistantAgent(
    "statistician",
    model_client=model_client,
    system_message="""You are a statistical analyst.
    
    Perform rigorous statistical analysis:
    - Hypothesis testing
    - Correlation analysis
    - Regression modeling
    - Outlier detection
    - Confidence intervals
    
    Always report p-values and confidence levels.""",
)

Automated Insights

# Add insights agent
insights_agent = AssistantAgent(
    "insights_generator",
    model_client=model_client,
    system_message="""You are an insights specialist.
    
    Identify:
    - Anomalies and outliers
    - Trends and patterns  
    - Correlations
    - Actionable recommendations
    
    Focus on business value.""",
)

Key Concepts

Code Generation

Agents write Python code for data analysis tasks.

Safe Execution

Docker isolation ensures safe code execution.

Automation

End-to-end automated analysis pipeline.

Visualization

Automatic generation of charts and graphs.

Best Practices

  1. Data Validation: Always validate input data before analysis
  2. Error Handling: Wrap code in try-except blocks
  3. Reproducibility: Set random seeds for consistent results
  4. Documentation: Comment code and document methodology
  5. Resource Limits: Set timeouts and memory limits
  6. Version Control: Track analysis scripts and results

Production Enhancements

Add Caching

import hashlib
import pickle

def cache_analysis(data_path: str, task: str, result: str):
    """Cache analysis results."""
    cache_key = hashlib.md5(f"{data_path}{task}".encode()).hexdigest()
    with open(f"cache/{cache_key}.pkl", "wb") as f:
        pickle.dump(result, f)

def get_cached_analysis(data_path: str, task: str) -> Optional[str]:
    """Retrieve cached result if available."""
    cache_key = hashlib.md5(f"{data_path}{task}".encode()).hexdigest()
    try:
        with open(f"cache/{cache_key}.pkl", "rb") as f:
            return pickle.load(f)
    except FileNotFoundError:
        return None

Add Scheduling

import schedule
import time

def scheduled_analysis():
    """Run analysis on schedule."""
    asyncio.run(analysis_system.analyze(
        "daily_sales.csv",
        "Daily sales report"
    ))

# Run every day at 9 AM
schedule.every().day.at("09:00").do(scheduled_analysis)

while True:
    schedule.run_pending()
    time.sleep(60)

Add Notifications

import smtplib
from email.mime.text import MIMEText

def send_report_email(report: str, recipients: list):
    """Email analysis report."""
    msg = MIMEText(report)
    msg['Subject'] = 'Daily Analysis Report'
    msg['From'] = '[email protected]'
    msg['To'] = ', '.join(recipients)
    
    with smtplib.SMTP('localhost') as smtp:
        smtp.send_message(msg)

Troubleshooting

Package Installation Fails

Use custom Docker image with pre-installed packages:
FROM python:3.11-slim
RUN pip install pandas numpy matplotlib seaborn scipy scikit-learn

Out of Memory

Increase container memory limit:
executor = DockerCommandLineCodeExecutor(
    container_kwargs={"mem_limit": "2g"},
)

Code Execution Timeout

Increase timeout for large datasets:
executor = DockerCommandLineCodeExecutor(
    timeout=300,  # 5 minutes
)

Next Steps

Code Execution

Learn more about code execution patterns

Tools

Explore tool creation and integration

Build docs developers (and LLMs) love