import asyncio
from typing import Optional
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.code_executors import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
class DataAnalysisSystem:
"""Automated data analysis system."""
def __init__(self, model_client, work_dir: str = "./analysis_workspace"):
self.model_client = model_client
self.work_dir = work_dir
self.agents = {}
async def setup(self):
"""Initialize all agents with code executor."""
# Create Docker executor with data science packages
self.executor = DockerCommandLineCodeExecutor(
image="python:3.11-slim",
work_dir=self.work_dir,
timeout=120,
)
await self.executor.__aenter__()
# Install required packages in container
await self._install_packages()
# Data analyst - plans the analysis
self.agents["analyst"] = AssistantAgent(
"data_analyst",
model_client=self.model_client,
system_message="""You are a data analyst.
When given a data analysis task:
1. Understand the data and objectives
2. Plan the analysis approach
3. Specify what code needs to be written
4. Interpret results and provide insights
Be thorough and methodical.""",
)
# Code writer - generates analysis code
self.agents["coder"] = AssistantAgent(
"code_writer",
model_client=self.model_client,
system_message="""You are a Python data analysis programmer.
Write clean, well-documented code using:
- pandas for data manipulation
- numpy for numerical operations
- matplotlib/seaborn for visualizations
- scipy/statsmodels for statistics
Always:
- Add error handling
- Save visualizations as files
- Print key findings
- Use descriptive variable names
""",
)
# Code executor - runs the code
self.agents["executor"] = CodeExecutorAgent(
"code_executor",
code_executor=self.executor,
)
# Report writer - creates final report
self.agents["reporter"] = AssistantAgent(
"report_writer",
model_client=self.model_client,
system_message="""You are a data analysis report writer.
Create comprehensive reports with:
1. Executive Summary
2. Data Overview
3. Analysis Methodology
4. Key Findings (with statistics)
5. Visualizations (reference saved files)
6. Conclusions and Recommendations
Use clear headings, bullet points, and professional language.""",
)
async def _install_packages(self):
"""Install data science packages in Docker container."""
install_code = '''
import subprocess
import sys
packages = [
"pandas",
"numpy",
"matplotlib",
"seaborn",
"scipy",
"scikit-learn",
]
for package in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
print("Packages installed successfully")
'''
result = await self.executor.execute_code_blocks([
{"code": install_code, "language": "python"}
])
async def analyze(self, data_path: str, analysis_task: str) -> str:
"""Perform automated data analysis."""
# Create team
team = RoundRobinGroupChat(
participants=[
self.agents["analyst"],
self.agents["coder"],
self.agents["executor"],
self.agents["reporter"],
],
max_turns=20,
)
# Run analysis
task = f"""Analyze the data file: {data_path}
Analysis objective: {analysis_task}
Steps:
1. Load and explore the data
2. Perform requested analysis
3. Create relevant visualizations
4. Generate a comprehensive report
"""
result = await Console(team.run_stream(task=task))
return result.messages[-1].content
async def cleanup(self):
"""Clean up resources."""
await self.executor.__aexit__(None, None, None)
async def main() -> None:
# Initialize
model_client = OpenAIChatCompletionClient(model="gpt-4o")
analysis_system = DataAnalysisSystem(model_client)
try:
await analysis_system.setup()
# Example: Analyze sales data
report = await analysis_system.analyze(
data_path="sales_data.csv",
analysis_task="""Analyze sales performance:
- Calculate total revenue by product category
- Identify top 10 products
- Show monthly sales trends
- Analyze regional performance
- Create visualizations for key metrics
"""
)
print("\n" + "="*80)
print("ANALYSIS REPORT")
print("="*80)
print(report)
finally:
await analysis_system.cleanup()
await model_client.close()
if __name__ == "__main__":
asyncio.run(main())