Overview
The Email Composer is the final step in the email generation pipeline. It uses Claude Sonnet (the most powerful Claude model) to generate a personalized email based on all the information gathered in previous steps, then writes the result to the PostgreSQL database.
Purpose:
Synthesize all pipeline data into a personalized email
Fill template placeholders with specific information
Reference papers, books, or background details naturally
Persist the final email to the database
Increment user’s generation count
Timing: ~3.1 seconds (single attempt), up to ~9 seconds (with retries)
Model: Claude Sonnet (configurable, currently Kimi K2p5)
Temperature: 0.4 (balanced creativity and consistency)
Database Write: Atomic transaction with user count increment
The Email Composer requires data from all previous steps:
From Step 1 (Template Parser)
Original template with placeholders to be filled Example: Hi {{name}},
I read your paper on {{research_topic}} and was particularly
interested in {{specific_finding}}...
Placeholder analysis from Step 1 Usage: Ensures all placeholders are filled in the final email
From Step 2 (Web Scraper)
Summarized background and publications information Max Length: 3,000 characters
Usage: Primary context for personalization
From Step 3 (ArXiv Helper)
Array of academic papers (may be empty) Usage: Enables specific paper references in RESEARCH templates
From API Request
UUID of the user (from JWT token) Usage: Database write and generation count increment
Name of the email recipient Usage: Database record and email validation
Research interest or topic area Usage: Database record for filtering/search
Output Schema
The Email Composer updates PipelineData and writes to database:
The generated email ready to send Characteristics:
All placeholders filled
Natural references to recipient’s work
Professional tone
Typically 150-400 words
Example: Dear Dr. Smith,
I came across your recent paper "Deep Learning for Medical Image
Segmentation" (2023) on ArXiv and was particularly impressed by your
approach to handling class imbalance in medical datasets...
[personalized content based on scraped_content]
Best regards,
[Student name]
Whether the LLM is confident in the generated email quality Values:
true - LLM found sufficient information for personalization
false - LLM had to use generic language due to limited context
Usage: Frontend can warn users about low-confidence emails
Metadata about the email generation: {
"email_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"word_count" : 237 ,
"model" : "openrouter:moonshot/kimi-k2p5" ,
"temperature" : 0.4 ,
"is_confident" : true
}
Database Record
The step creates a record in the emails table:
INSERT INTO emails (
id, -- Auto-generated UUID
user_id, -- From JWT token
recipient_name, -- From request
recipient_interest, -- From request
email_message, -- Generated content
template_type, -- RESEARCH/BOOK/GENERAL
metadata, -- JSONB with full pipeline context
is_confident, -- Boolean from LLM
created_at -- Auto-generated timestamp
) VALUES (...);
Metadata JSONB Structure:
{
"search_terms" : [ "Dr. Smith machine learning" ],
"template_type" : "RESEARCH" ,
"scraped_urls" : [ "https://..." ],
"scraping_metadata" : { "citation_count" : 3 },
"arxiv_papers" : [
{
"title" : "..." ,
"arxiv_url" : "https://arxiv.org/abs/..." ,
"year" : "2023"
}
],
"step_timings" : {
"template_parser" : 1.2 ,
"web_scraper" : 5.3 ,
"arxiv_helper" : 0.8 ,
"email_composer" : 3.1
},
"generation_metadata" : {
"model" : "openrouter:moonshot/kimi-k2p5" ,
"temperature" : 0.4
}
}
Source: pipeline/steps/email_composer/main.py:118-136
Implementation Details
LLM Agent Configuration
The step uses a Pydantic-AI agent optimized for the selected model:
self .composition_agent = create_agent(
model = self .model, # Currently: openrouter:moonshot/kimi-k2p5
system_prompt = SYSTEM_PROMPT ,
temperature = 0.4 , # Balanced for Kimi K2p5
max_tokens = 10000 , # Generous for long emails
retries = 3 ,
timeout = 90.0
)
Why Kimi K2p5?
Cost-effective compared to Claude Sonnet (0.35 / M v s 0.35/M vs 0.35/ M v s 3/M tokens)
Excellent at structured output and literal interpretation
Strong performance on creative writing tasks
Temperature 0.3-0.5 recommended for consistent quality
Source: pipeline/steps/email_composer/main.py:29-38
Composition Prompt
The prompt combines all pipeline context:
user_prompt = create_composition_prompt(
email_template = pipeline_data.email_template,
recipient_name = pipeline_data.recipient_name,
recipient_interest = pipeline_data.recipient_interest,
scraped_content = pipeline_data.scraped_content,
arxiv_papers = pipeline_data.arxiv_papers or [],
template_analysis = pipeline_data.template_analysis
)
Prompt Structure:
Task Definition - “Generate a personalized email…”
Template - Original template with placeholders
Recipient Context - Name, interest, background summary
Research Context - Papers (if RESEARCH template)
Instructions - Fill placeholders, maintain tone, be specific
Output Format - JSON with email and is_confident fields
Source: pipeline/steps/email_composer/main.py:66-73 and pipeline/steps/email_composer/prompts.py
JSON Response Parsing
The LLM returns structured JSON:
result = await self .composition_agent.run(user_prompt)
response_text = result.output.strip()
# Parse JSON response
try :
parsed = json.loads(response_text)
email_text = parsed[ "email" ]
is_confident = parsed.get( "is_confident" , False )
except (json.JSONDecodeError, KeyError ) as e:
# Fallback: treat entire response as email text
logfire.warning(
"Failed to parse JSON response, falling back to plain text" ,
error = str (e)
)
email_text = response_text
is_confident = False
Why Fallback?
Some models may return text without JSON wrapper
Ensures pipeline always completes (graceful degradation)
User still gets an email, even if metadata is missing
Source: pipeline/steps/email_composer/main.py:82-99
Execution Flow
Validate Input - Check all required fields from previous steps
Prepare Prompt - Combine template, context, and instructions
Generate Email - Call LLM agent (with automatic retries)
Parse Response - Extract email text and confidence flag
Create Email Object - Wrap in ComposedEmail model
Prepare Metadata - Aggregate all pipeline data for JSONB
Write to Database - Atomic transaction (email + user count)
Update Pipeline Data - Store email_id and metadata
Return Success - With email_id and word count
async def _execute_step ( self , pipeline_data : PipelineData) -> StepResult:
# Step 1-2: Prepare prompt
user_prompt = create_composition_prompt( ... )
# Step 3-4: Generate and parse
result = await self .composition_agent.run(user_prompt)
parsed = json.loads(result.output)
# Step 5: Create composed email
composed_email = ComposedEmail(
email_content = parsed[ "email" ],
is_confident = parsed.get( "is_confident" , False ),
generation_metadata = { "model" : self .model}
)
# Step 6: Prepare database metadata
database_metadata = {
"search_terms" : pipeline_data.search_terms,
"template_type" : pipeline_data.template_type.value,
"scraped_urls" : pipeline_data.scraped_urls,
"arxiv_papers" : [{
"title" : p.get( "title" ),
"arxiv_url" : p.get( "arxiv_url" ),
"year" : p.get( "year" )
} for p in (pipeline_data.arxiv_papers or [])],
"step_timings" : pipeline_data.step_timings,
"model" : self .model,
"temperature" : self .temperature
}
# Step 7: Write to database
email_id = await write_email_to_db(
user_id = pipeline_data.user_id,
recipient_name = pipeline_data.recipient_name,
recipient_interest = pipeline_data.recipient_interest,
email_content = composed_email.email_content,
template_type = pipeline_data.template_type,
metadata = database_metadata,
is_confident = composed_email.is_confident
)
# Step 8: Increment user count (non-critical)
await increment_user_generation_count( user_id = pipeline_data.user_id)
# Step 9: Update and return
pipeline_data.final_email = composed_email.email_content
pipeline_data.is_confident = composed_email.is_confident
pipeline_data.metadata[ "email_id" ] = email_id
return StepResult(
success = True ,
metadata = { "email_id" : str (email_id), "word_count" : len (email_text.split())}
)
Source: pipeline/steps/email_composer/main.py:62-187
Database Write Implementation
Atomic Transaction
The database write uses an async transaction:
async def write_email_to_db (
user_id : str ,
recipient_name : str ,
recipient_interest : str ,
email_content : str ,
template_type : TemplateType,
metadata : dict ,
is_confident : bool
) -> uuid. UUID :
"""Write email to database with atomic transaction."""
async with get_async_session() as session:
# Create email record
email = Email(
user_id = uuid.UUID(user_id),
recipient_name = recipient_name,
recipient_interest = recipient_interest,
email_message = email_content,
template_type = template_type.value,
metadata = metadata,
is_confident = is_confident
)
session.add(email)
await session.commit()
await session.refresh(email)
return email.id
Source: pipeline/steps/email_composer/db_utils.py
User Count Increment
Separate transaction (non-critical, fails silently):
async def increment_user_generation_count ( user_id : str ) -> None :
"""Increment user's generation count (non-critical)."""
try :
async with get_async_session() as session:
user = await session.get(User, uuid.UUID(user_id))
if user:
user.generation_count += 1
await session.commit()
except Exception as e:
logfire.warning(
"Failed to increment user generation count" ,
user_id = user_id,
error = str (e)
)
# Don't fail pipeline if count increment fails
Why Separate Transaction?
Email write is critical (must succeed)
Count increment is analytics (nice to have)
If count fails, email is still saved
Source: pipeline/steps/email_composer/db_utils.py
Error Handling
Fatal Errors (Pipeline Stops)
These errors will halt the pipeline:
LLM Generation Failed - API errors, timeouts, or invalid responses after 3 retries
Database Write Failed - Connection errors, constraint violations
Missing Required Fields - user_id, email_template, scraped_content, etc.
except Exception as e:
logfire.error(
"Error in email composer" ,
error = str (e),
error_type = type (e). __name__
)
return StepResult(
success = False ,
step_name = self .step_name,
error = f "Email composer error: { str (e) } "
)
Source: pipeline/steps/email_composer/main.py:189-200
Retry Strategy
The Pydantic-AI agent automatically retries on:
API connection errors
Timeout errors (90s timeout per attempt)
Transient failures
Configuration:
Max retries: 3
Timeout: 90 seconds per attempt
Total max time: ~270 seconds (4.5 minutes)
Source: pipeline/steps/email_composer/main.py:37
Logging & Observability
The step emits detailed logs throughout execution:
Pre-Generation
logfire.info(
"Generating email with LLM" ,
model = self .model,
temperature = self .temperature,
recipient_name = pipeline_data.recipient_name
)
Post-Generation
logfire.info(
"Email generated successfully" ,
word_count = len (email_text.split()),
length = len (email_text),
is_confident = is_confident
)
Post-Database Write
logfire.info(
"Email written to database" ,
email_id = str (email_id),
word_count = len (composed_email.email_content.split())
)
Source: pipeline/steps/email_composer/main.py:75-160
Tracked Metrics
Model used
Temperature setting
Email word count
Email character length
Confidence flag
Email ID (UUID)
Generation duration
Database write duration
Average Execution Time: 3.1 seconds
Breakdown:
LLM API call: ~2.5s
JSON parsing: ~0.05s
Database write: ~0.3s
User count increment: ~0.2s
Overhead: ~0.05s
Variance Factors:
Email length (longer = more tokens = slower)
Model load (API queue time)
Database connection pool availability
Network latency
With Retries:
1 retry: +3s
2 retries: +6s
3 retries (max): +9s
Configuration
The step is highly configurable via environment variables:
# Model selection (hot-swappable)
EMAIL_COMPOSER_MODEL = openrouter:moonshot/kimi-k2p5
# Alternative models:
# EMAIL_COMPOSER_MODEL=anthropic:claude-sonnet-4-5
# EMAIL_COMPOSER_MODEL=openai:gpt-4o
# EMAIL_COMPOSER_MODEL=openrouter:anthropic/claude-3.5-sonnet
# Generation parameters (model-specific)
EMAIL_COMPOSER_TEMPERATURE = 0.4 # 0.3-0.5 for Kimi K2p5
EMAIL_COMPOSER_MAX_TOKENS = 10000 # Generous for long emails
EMAIL_COMPOSER_TIMEOUT = 90.0 # Per-attempt timeout (seconds)
Source: config/settings.py
Model Comparison
Model Avg Time Cost/Email Quality Notes Kimi K2p5 2.5s $0.007 High Current default, cost-effective Claude Sonnet 4.5 3.5s $0.023 Highest Best quality, more expensive GPT-4o 2.8s $0.015 High Good balance Claude Haiku 4.5 1.8s $0.004 Medium Fast but less creative
Recommendation: Kimi K2p5 for production (best cost/quality ratio)
Pipeline Completion
After the Email Composer completes:
Email is persisted in the emails table with full metadata
User count is incremented (if successful)
Pipeline data is complete with final_email and email_id
Celery task returns the email_id to the API
Frontend fetches the completed email via GET /api/emails/{email_id}
Pipeline Overview View the complete pipeline architecture and data flow