Project Structure
Recommended Layout
my-ai-agent/
├── .env # API keys (never commit!)
├── .env.example # Template for .env
├── .gitignore # Git ignore rules
├── pyproject.toml # Dependencies and metadata
├── README.md # Project documentation
├── main.py # Entry point
├── src/
│ ├── __init__.py
│ ├── agents.py # Agent definitions
│ ├── tools.py # Custom tools
│ ├── config.py # Configuration
│ ├── prompts.py # System prompts
│ └── utils.py # Helper functions
├── tests/
│ ├── __init__.py
│ ├── test_agents.py
│ └── test_tools.py
└── tmp/ # Temporary files (gitignored)
└── .gitkeep
Error Handling
Graceful API Failures
from typing import Optional
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
# ✅ Good: Retry with exponential backoff
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def call_llm_with_retry(prompt: str) -> str:
"""Call LLM with automatic retries."""
try:
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise
# ✅ Good: Fallback to simpler model
def call_llm_with_fallback(prompt: str) -> Optional[str]:
"""Try primary model, fallback to secondary."""
try:
return call_primary_model(prompt)
except Exception as e:
logger.warning(f"Primary model failed: {e}, using fallback")
try:
return call_fallback_model(prompt)
except Exception as e2:
logger.error(f"Fallback model also failed: {e2}")
return None
# ❌ Bad: No error handling
def call_llm(prompt: str) -> str:
response = client.chat.completions.create(...) # Can crash
return response.choices[0].message.content
Validate Tool Outputs
from pydantic import BaseModel, ValidationError
from typing import Optional
class SearchResult(BaseModel):
title: str
url: str
snippet: str
# ✅ Good: Validate and sanitize
def web_search(query: str) -> Optional[list[SearchResult]]:
"""Search web with validated output."""
try:
raw_results = search_api.search(query)
validated_results = []
for result in raw_results:
try:
validated = SearchResult(**result)
validated_results.append(validated)
except ValidationError as e:
logger.warning(f"Invalid search result: {e}")
continue
return validated_results
except Exception as e:
logger.error(f"Search failed: {e}")
return None
# ❌ Bad: Assume API returns valid data
def web_search(query: str):
return search_api.search(query) # Could return malformed data
Prompt Engineering
Structured Prompts
from textwrap import dedent
# ✅ Good: Clear, structured prompts
SYSTEM_PROMPT = dedent("""\
You are a helpful research assistant specialized in technical documentation.
Your capabilities:
- Search and analyze technical documents
- Extract key information and summarize findings
- Provide accurate citations and sources
Guidelines:
1. Always cite sources with [Source: URL]
2. Admit uncertainty rather than guessing
3. Format code examples with proper syntax highlighting
4. Break down complex topics into digestible sections
Response format:
- Use markdown for formatting
- Include bullet points for lists
- Add code blocks for technical content
""")
# ✅ Good: Parametrized prompts
def create_analysis_prompt(topic: str, context: str) -> str:
return dedent(f"""\
Analyze the following topic using the provided context:
Topic: {topic}
Context:
{context}
Provide:
1. Key findings (3-5 bullet points)
2. Important details
3. Recommendations
4. Sources cited
""")
# ❌ Bad: Vague, unstructured prompts
SYSTEM_PROMPT = "You are a helpful assistant."
user_prompt = f"Tell me about {topic}" # Too vague
Few-Shot Examples
# ✅ Good: Include examples in prompt
FEW_SHOT_EXAMPLES = dedent("""\
Example 1:
Query: "Extract financial data from Q4 report"
Output:
{
"revenue": "$125M",
"profit": "$23M",
"growth": "15%"
}
Example 2:
Query: "Summarize security incident"
Output:
{
"severity": "high",
"affected_systems": ["api", "database"],
"resolution_time": "2 hours"
}
""")
def create_extraction_prompt(text: str) -> str:
return dedent(f"""\
Extract structured data from the text.
{FEW_SHOT_EXAMPLES}
Now extract from:
{text}
""")
Configuration Management
Environment-Based Config
import os
from enum import Enum
from pydantic import BaseModel
from dotenv import load_dotenv
load_dotenv()
class Environment(str, Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Config(BaseModel):
"""Application configuration."""
# Environment
env: Environment = Environment.DEVELOPMENT
debug: bool = False
# API Keys
openai_api_key: str
nebius_api_key: str
# Model Settings
default_model: str = "gpt-4-turbo"
temperature: float = 0.7
max_tokens: int = 2000
# Rate Limiting
max_requests_per_minute: int = 60
max_tokens_per_day: int = 100000
# Logging
log_level: str = "INFO"
log_file: str = "tmp/app.log"
@classmethod
def from_env(cls) -> "Config":
"""Load configuration from environment variables."""
env = os.getenv("ENVIRONMENT", "development")
return cls(
env=Environment(env),
debug=os.getenv("DEBUG", "false").lower() == "true",
openai_api_key=os.getenv("OPENAI_API_KEY", ""),
nebius_api_key=os.getenv("NEBIUS_API_KEY", ""),
default_model=os.getenv("DEFAULT_MODEL", "gpt-4-turbo"),
temperature=float(os.getenv("TEMPERATURE", "0.7")),
max_tokens=int(os.getenv("MAX_TOKENS", "2000")),
log_level=os.getenv("LOG_LEVEL", "INFO"),
)
# Usage
config = Config.from_env()
if config.env == Environment.PRODUCTION:
# Production-specific settings
config.debug = False
config.log_level = "WARNING"
Logging
Structured Logging
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
"""Logger with structured JSON output."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(console_handler)
# File handler for JSON logs
file_handler = logging.FileHandler('tmp/app.json.log')
self.logger.addHandler(file_handler)
def log(self, level: str, message: str, **kwargs):
"""Log with structured data."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**kwargs
}
log_method = getattr(self.logger, level.lower())
log_method(json.dumps(log_data))
def info(self, message: str, **kwargs):
self.log("INFO", message, **kwargs)
def error(self, message: str, **kwargs):
self.log("ERROR", message, **kwargs)
def warning(self, message: str, **kwargs):
self.log("WARNING", message, **kwargs)
# Usage
logger = StructuredLogger(__name__)
logger.info(
"Agent executed successfully",
agent="ResearchAgent",
query="AI frameworks comparison",
duration_ms=1234,
tokens_used=567
)
logger.error(
"API call failed",
error="RateLimitError",
retry_count=3,
model="gpt-4-turbo"
)
Testing
Unit Tests
import pytest
from unittest.mock import Mock, patch
from src.agents import ResearchAgent
@pytest.fixture
def mock_llm():
"""Mock LLM client."""
mock = Mock()
mock.chat.completions.create.return_value = Mock(
choices=[Mock(message=Mock(content="Test response"))]
)
return mock
def test_research_agent_basic(mock_llm):
"""Test basic agent functionality."""
agent = ResearchAgent(client=mock_llm)
result = agent.run("Test query")
assert result is not None
assert "Test response" in result
assert mock_llm.chat.completions.create.called
def test_research_agent_error_handling(mock_llm):
"""Test agent handles errors gracefully."""
mock_llm.chat.completions.create.side_effect = Exception("API Error")
agent = ResearchAgent(client=mock_llm)
result = agent.run("Test query")
assert result is None # Should return None on error
@patch('src.tools.web_search')
def test_agent_with_tools(mock_search, mock_llm):
"""Test agent using tools."""
mock_search.return_value = [{"title": "Result", "url": "https://example.com"}]
agent = ResearchAgent(client=mock_llm, tools=[mock_search])
result = agent.run("Search for AI frameworks")
assert mock_search.called
assert result is not None
Integration Tests
import pytest
from src.agents import ResearchWorkflow
import os
@pytest.mark.integration
@pytest.mark.skipif(
not os.getenv("OPENAI_API_KEY"),
reason="Requires OPENAI_API_KEY"
)
def test_full_research_workflow():
"""Integration test with real API."""
workflow = ResearchWorkflow()
result = workflow.run(
topic="Python async programming",
max_sources=3
)
assert result is not None
assert len(result.sources) > 0
assert result.summary != ""
assert result.confidence_score > 0.5
Performance Optimization
Caching
from functools import lru_cache
import hashlib
import json
import os
from pathlib import Path
class DiskCache:
"""Simple disk-based cache for LLM responses."""
def __init__(self, cache_dir: str = "tmp/cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_key(self, prompt: str, model: str) -> str:
"""Generate cache key from prompt and model."""
content = f"{model}:{prompt}"
return hashlib.sha256(content.encode()).hexdigest()
def get(self, prompt: str, model: str) -> str | None:
"""Get cached response."""
key = self._get_key(prompt, model)
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
with open(cache_file, 'r') as f:
data = json.load(f)
return data['response']
return None
def set(self, prompt: str, model: str, response: str):
"""Cache response."""
key = self._get_key(prompt, model)
cache_file = self.cache_dir / f"{key}.json"
with open(cache_file, 'w') as f:
json.dump({
'prompt': prompt,
'model': model,
'response': response
}, f)
# Usage
cache = DiskCache()
def call_llm(prompt: str, model: str = "gpt-4-turbo") -> str:
# Check cache first
cached = cache.get(prompt, model)
if cached:
logger.info("Cache hit")
return cached
# Call API
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Cache response
cache.set(prompt, model, result)
return result
Async Operations
import asyncio
from typing import List
# ✅ Good: Parallel execution
async def process_queries_parallel(queries: List[str]) -> List[str]:
"""Process multiple queries in parallel."""
tasks = [call_llm_async(query) for query in queries]
results = await asyncio.gather(*tasks)
return results
# Usage
queries = [
"Explain async programming",
"What is RAG?",
"How do agents work?"
]
results = asyncio.run(process_queries_parallel(queries))
# Much faster than sequential processing
# ❌ Bad: Sequential execution
def process_queries_sequential(queries: List[str]) -> List[str]:
results = []
for query in queries:
result = call_llm(query) # Slow: waits for each
results.append(result)
return results
Security
Input Sanitization
import re
from typing import Optional
def sanitize_user_input(user_input: str) -> Optional[str]:
"""Sanitize user input to prevent injection attacks."""
# Remove potential SQL injection
if any(keyword in user_input.lower() for keyword in ['drop', 'delete', 'update', 'insert']):
logger.warning(f"Blocked potentially malicious input: {user_input}")
return None
# Remove excessive whitespace
sanitized = re.sub(r'\s+', ' ', user_input).strip()
# Limit length
max_length = 1000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
return sanitized
# Usage
user_query = sanitize_user_input(request.query)
if user_query is None:
raise ValueError("Invalid input")
result = agent.run(user_query)
Rate Limiting
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # seconds
self.requests = deque()
def allow_request(self) -> bool:
"""Check if request is allowed."""
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# Remove old requests
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
# Check limit
if len(self.requests) >= self.max_requests:
return False
self.requests.append(now)
return True
# Usage
rate_limiter = RateLimiter(max_requests=10, time_window=60) # 10 req/min
def handle_request(query: str):
if not rate_limiter.allow_request():
raise ValueError("Rate limit exceeded. Try again later.")
return agent.run(query)
Code Quality
Type Hints
from typing import List, Dict, Optional, Union
from pydantic import BaseModel
# ✅ Good: Full type hints
def process_documents(
documents: List[str],
config: Dict[str, Union[str, int]],
max_tokens: Optional[int] = None
) -> List[Dict[str, str]]:
"""Process documents and return results."""
results = []
for doc in documents:
result = analyze_document(doc, config, max_tokens)
results.append(result)
return results
# ❌ Bad: No type hints
def process_documents(documents, config, max_tokens=None):
results = []
for doc in documents:
result = analyze_document(doc, config, max_tokens)
results.append(result)
return results
Documentation
def research_agent(
query: str,
max_sources: int = 5,
search_depth: str = "basic"
) -> Dict[str, Any]:
"""
Execute research query with web search and analysis.
Args:
query: Research question or topic
max_sources: Maximum number of sources to retrieve (default: 5)
search_depth: Search depth - "basic" or "advanced" (default: "basic")
Returns:
Dictionary containing:
- summary: str - Research summary
- sources: List[Dict] - Source documents with metadata
- confidence: float - Confidence score (0.0 to 1.0)
Raises:
ValueError: If query is empty or invalid
APIError: If external API calls fail
Example:
>>> result = research_agent(
... query="AI agent frameworks",
... max_sources=3,
... search_depth="advanced"
... )
>>> print(result['summary'])
'Top AI agent frameworks include...'
"""
if not query or not query.strip():
raise ValueError("Query cannot be empty")
# Implementation
...
Deployment Checklist
- Environment variables configured (
.envfile) - API keys validated at startup
- Error handling for all external API calls
- Retry logic with exponential backoff
- Logging configured (file + console)
- Input sanitization implemented
- Rate limiting enabled
- Caching for expensive operations
- Unit tests passing
- Integration tests passing
- Type hints on all functions
- Documentation complete
- Dependencies pinned in lock file
-
.gitignoreincludes.env,.venv,__pycache__ - README with setup instructions
- Cost monitoring enabled
Next Steps
Environment Setup
Set up development environment
API Keys
Configure credentials securely
Multi-Agent Patterns
Apply best practices to multi-agent systems
RAG Workflows
Implement production RAG systems