Skip to main content

Overview

The Email Composer is the final step in the email generation pipeline. It uses Claude Sonnet (the most powerful Claude model) to generate a personalized email based on all the information gathered in previous steps, then writes the result to the PostgreSQL database. Purpose:
  • Synthesize all pipeline data into a personalized email
  • Fill template placeholders with specific information
  • Reference papers, books, or background details naturally
  • Persist the final email to the database
  • Increment user’s generation count
Timing: ~3.1 seconds (single attempt), up to ~9 seconds (with retries)
Model: Claude Sonnet (configurable, currently Kimi K2p5)
Temperature: 0.4 (balanced creativity and consistency)
Database Write: Atomic transaction with user count increment

Input Schema

The Email Composer requires data from all previous steps:

From Step 1 (Template Parser)

email_template
string
required
Original template with placeholders to be filledExample:
Hi {{name}},

I read your paper on {{research_topic}} and was particularly 
interested in {{specific_finding}}...
template_analysis
object
required
Placeholder analysis from Step 1Usage: Ensures all placeholders are filled in the final email

From Step 2 (Web Scraper)

scraped_content
string
required
Summarized background and publications informationMax Length: 3,000 characters Usage: Primary context for personalization

From Step 3 (ArXiv Helper)

arxiv_papers
object[]
required
Array of academic papers (may be empty)Usage: Enables specific paper references in RESEARCH templates

From API Request

user_id
string
required
UUID of the user (from JWT token)Usage: Database write and generation count increment
recipient_name
string
required
Name of the email recipientUsage: Database record and email validation
recipient_interest
string
required
Research interest or topic areaUsage: Database record for filtering/search

Output Schema

The Email Composer updates PipelineData and writes to database:
final_email
string
The generated email ready to sendCharacteristics:
  • All placeholders filled
  • Natural references to recipient’s work
  • Professional tone
  • Typically 150-400 words
Example:
Dear Dr. Smith,

I came across your recent paper "Deep Learning for Medical Image 
Segmentation" (2023) on ArXiv and was particularly impressed by your 
approach to handling class imbalance in medical datasets...

[personalized content based on scraped_content]

Best regards,
[Student name]
is_confident
boolean
Whether the LLM is confident in the generated email qualityValues:
  • true - LLM found sufficient information for personalization
  • false - LLM had to use generic language due to limited context
Usage: Frontend can warn users about low-confidence emails
composition_metadata
object
Metadata about the email generation:
{
  "email_id": "550e8400-e29b-41d4-a716-446655440000",
  "word_count": 237,
  "model": "openrouter:moonshot/kimi-k2p5",
  "temperature": 0.4,
  "is_confident": true
}

Database Record

The step creates a record in the emails table:
INSERT INTO emails (
  id,                    -- Auto-generated UUID
  user_id,               -- From JWT token
  recipient_name,        -- From request
  recipient_interest,    -- From request
  email_message,         -- Generated content
  template_type,         -- RESEARCH/BOOK/GENERAL
  metadata,              -- JSONB with full pipeline context
  is_confident,          -- Boolean from LLM
  created_at             -- Auto-generated timestamp
) VALUES (...);
Metadata JSONB Structure:
{
  "search_terms": ["Dr. Smith machine learning"],
  "template_type": "RESEARCH",
  "scraped_urls": ["https://..."],
  "scraping_metadata": { "citation_count": 3 },
  "arxiv_papers": [
    {
      "title": "...",
      "arxiv_url": "https://arxiv.org/abs/...",
      "year": "2023"
    }
  ],
  "step_timings": {
    "template_parser": 1.2,
    "web_scraper": 5.3,
    "arxiv_helper": 0.8,
    "email_composer": 3.1
  },
  "generation_metadata": {
    "model": "openrouter:moonshot/kimi-k2p5",
    "temperature": 0.4
  }
}
Source: pipeline/steps/email_composer/main.py:118-136

Implementation Details

LLM Agent Configuration

The step uses a Pydantic-AI agent optimized for the selected model:
self.composition_agent = create_agent(
    model=self.model,  # Currently: openrouter:moonshot/kimi-k2p5
    system_prompt=SYSTEM_PROMPT,
    temperature=0.4,   # Balanced for Kimi K2p5
    max_tokens=10000,  # Generous for long emails
    retries=3,
    timeout=90.0
)
Why Kimi K2p5?
  • Cost-effective compared to Claude Sonnet (0.35/Mvs0.35/M vs 3/M tokens)
  • Excellent at structured output and literal interpretation
  • Strong performance on creative writing tasks
  • Temperature 0.3-0.5 recommended for consistent quality
Source: pipeline/steps/email_composer/main.py:29-38

Composition Prompt

The prompt combines all pipeline context:
user_prompt = create_composition_prompt(
    email_template=pipeline_data.email_template,
    recipient_name=pipeline_data.recipient_name,
    recipient_interest=pipeline_data.recipient_interest,
    scraped_content=pipeline_data.scraped_content,
    arxiv_papers=pipeline_data.arxiv_papers or [],
    template_analysis=pipeline_data.template_analysis
)
Prompt Structure:
  1. Task Definition - “Generate a personalized email…”
  2. Template - Original template with placeholders
  3. Recipient Context - Name, interest, background summary
  4. Research Context - Papers (if RESEARCH template)
  5. Instructions - Fill placeholders, maintain tone, be specific
  6. Output Format - JSON with email and is_confident fields
Source: pipeline/steps/email_composer/main.py:66-73 and pipeline/steps/email_composer/prompts.py

JSON Response Parsing

The LLM returns structured JSON:
result = await self.composition_agent.run(user_prompt)
response_text = result.output.strip()

# Parse JSON response
try:
    parsed = json.loads(response_text)
    email_text = parsed["email"]
    is_confident = parsed.get("is_confident", False)
except (json.JSONDecodeError, KeyError) as e:
    # Fallback: treat entire response as email text
    logfire.warning(
        "Failed to parse JSON response, falling back to plain text",
        error=str(e)
    )
    email_text = response_text
    is_confident = False
Why Fallback?
  • Some models may return text without JSON wrapper
  • Ensures pipeline always completes (graceful degradation)
  • User still gets an email, even if metadata is missing
Source: pipeline/steps/email_composer/main.py:82-99

Execution Flow

  1. Validate Input - Check all required fields from previous steps
  2. Prepare Prompt - Combine template, context, and instructions
  3. Generate Email - Call LLM agent (with automatic retries)
  4. Parse Response - Extract email text and confidence flag
  5. Create Email Object - Wrap in ComposedEmail model
  6. Prepare Metadata - Aggregate all pipeline data for JSONB
  7. Write to Database - Atomic transaction (email + user count)
  8. Update Pipeline Data - Store email_id and metadata
  9. Return Success - With email_id and word count
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
    # Step 1-2: Prepare prompt
    user_prompt = create_composition_prompt(...)
    
    # Step 3-4: Generate and parse
    result = await self.composition_agent.run(user_prompt)
    parsed = json.loads(result.output)
    
    # Step 5: Create composed email
    composed_email = ComposedEmail(
        email_content=parsed["email"],
        is_confident=parsed.get("is_confident", False),
        generation_metadata={"model": self.model}
    )
    
    # Step 6: Prepare database metadata
    database_metadata = {
        "search_terms": pipeline_data.search_terms,
        "template_type": pipeline_data.template_type.value,
        "scraped_urls": pipeline_data.scraped_urls,
        "arxiv_papers": [{
            "title": p.get("title"),
            "arxiv_url": p.get("arxiv_url"),
            "year": p.get("year")
        } for p in (pipeline_data.arxiv_papers or [])],
        "step_timings": pipeline_data.step_timings,
        "model": self.model,
        "temperature": self.temperature
    }
    
    # Step 7: Write to database
    email_id = await write_email_to_db(
        user_id=pipeline_data.user_id,
        recipient_name=pipeline_data.recipient_name,
        recipient_interest=pipeline_data.recipient_interest,
        email_content=composed_email.email_content,
        template_type=pipeline_data.template_type,
        metadata=database_metadata,
        is_confident=composed_email.is_confident
    )
    
    # Step 8: Increment user count (non-critical)
    await increment_user_generation_count(user_id=pipeline_data.user_id)
    
    # Step 9: Update and return
    pipeline_data.final_email = composed_email.email_content
    pipeline_data.is_confident = composed_email.is_confident
    pipeline_data.metadata["email_id"] = email_id
    
    return StepResult(
        success=True,
        metadata={"email_id": str(email_id), "word_count": len(email_text.split())}
    )
Source: pipeline/steps/email_composer/main.py:62-187

Database Write Implementation

Atomic Transaction

The database write uses an async transaction:
async def write_email_to_db(
    user_id: str,
    recipient_name: str,
    recipient_interest: str,
    email_content: str,
    template_type: TemplateType,
    metadata: dict,
    is_confident: bool
) -> uuid.UUID:
    """Write email to database with atomic transaction."""
    
    async with get_async_session() as session:
        # Create email record
        email = Email(
            user_id=uuid.UUID(user_id),
            recipient_name=recipient_name,
            recipient_interest=recipient_interest,
            email_message=email_content,
            template_type=template_type.value,
            metadata=metadata,
            is_confident=is_confident
        )
        
        session.add(email)
        await session.commit()
        await session.refresh(email)
        
        return email.id
Source: pipeline/steps/email_composer/db_utils.py

User Count Increment

Separate transaction (non-critical, fails silently):
async def increment_user_generation_count(user_id: str) -> None:
    """Increment user's generation count (non-critical)."""
    try:
        async with get_async_session() as session:
            user = await session.get(User, uuid.UUID(user_id))
            if user:
                user.generation_count += 1
                await session.commit()
    except Exception as e:
        logfire.warning(
            "Failed to increment user generation count",
            user_id=user_id,
            error=str(e)
        )
        # Don't fail pipeline if count increment fails
Why Separate Transaction?
  • Email write is critical (must succeed)
  • Count increment is analytics (nice to have)
  • If count fails, email is still saved
Source: pipeline/steps/email_composer/db_utils.py

Error Handling

Fatal Errors (Pipeline Stops)

These errors will halt the pipeline:
  • LLM Generation Failed - API errors, timeouts, or invalid responses after 3 retries
  • Database Write Failed - Connection errors, constraint violations
  • Missing Required Fields - user_id, email_template, scraped_content, etc.
except Exception as e:
    logfire.error(
        "Error in email composer",
        error=str(e),
        error_type=type(e).__name__
    )
    
    return StepResult(
        success=False,
        step_name=self.step_name,
        error=f"Email composer error: {str(e)}"
    )
Source: pipeline/steps/email_composer/main.py:189-200

Retry Strategy

The Pydantic-AI agent automatically retries on:
  • API connection errors
  • Timeout errors (90s timeout per attempt)
  • Transient failures
Configuration:
  • Max retries: 3
  • Timeout: 90 seconds per attempt
  • Total max time: ~270 seconds (4.5 minutes)
Source: pipeline/steps/email_composer/main.py:37

Logging & Observability

The step emits detailed logs throughout execution:

Pre-Generation

logfire.info(
    "Generating email with LLM",
    model=self.model,
    temperature=self.temperature,
    recipient_name=pipeline_data.recipient_name
)

Post-Generation

logfire.info(
    "Email generated successfully",
    word_count=len(email_text.split()),
    length=len(email_text),
    is_confident=is_confident
)

Post-Database Write

logfire.info(
    "Email written to database",
    email_id=str(email_id),
    word_count=len(composed_email.email_content.split())
)
Source: pipeline/steps/email_composer/main.py:75-160

Tracked Metrics

  • Model used
  • Temperature setting
  • Email word count
  • Email character length
  • Confidence flag
  • Email ID (UUID)
  • Generation duration
  • Database write duration

Performance Characteristics

Average Execution Time: 3.1 seconds Breakdown:
  • LLM API call: ~2.5s
  • JSON parsing: ~0.05s
  • Database write: ~0.3s
  • User count increment: ~0.2s
  • Overhead: ~0.05s
Variance Factors:
  • Email length (longer = more tokens = slower)
  • Model load (API queue time)
  • Database connection pool availability
  • Network latency
With Retries:
  • 1 retry: +3s
  • 2 retries: +6s
  • 3 retries (max): +9s

Configuration

The step is highly configurable via environment variables:
# Model selection (hot-swappable)
EMAIL_COMPOSER_MODEL=openrouter:moonshot/kimi-k2p5

# Alternative models:
# EMAIL_COMPOSER_MODEL=anthropic:claude-sonnet-4-5
# EMAIL_COMPOSER_MODEL=openai:gpt-4o
# EMAIL_COMPOSER_MODEL=openrouter:anthropic/claude-3.5-sonnet

# Generation parameters (model-specific)
EMAIL_COMPOSER_TEMPERATURE=0.4      # 0.3-0.5 for Kimi K2p5
EMAIL_COMPOSER_MAX_TOKENS=10000     # Generous for long emails
EMAIL_COMPOSER_TIMEOUT=90.0         # Per-attempt timeout (seconds)
Source: config/settings.py

Model Comparison

ModelAvg TimeCost/EmailQualityNotes
Kimi K2p52.5s$0.007HighCurrent default, cost-effective
Claude Sonnet 4.53.5s$0.023HighestBest quality, more expensive
GPT-4o2.8s$0.015HighGood balance
Claude Haiku 4.51.8s$0.004MediumFast but less creative
Recommendation: Kimi K2p5 for production (best cost/quality ratio)

Pipeline Completion

After the Email Composer completes:
  1. Email is persisted in the emails table with full metadata
  2. User count is incremented (if successful)
  3. Pipeline data is complete with final_email and email_id
  4. Celery task returns the email_id to the API
  5. Frontend fetches the completed email via GET /api/emails/{email_id}

Pipeline Overview

View the complete pipeline architecture and data flow

Build docs developers (and LLMs) love