Skip to main content
The Scribe pipeline is built on a stateless, in-memory architecture with a focus on observability, performance, and horizontal scalability.

Core Design Principles

1. Stateless In-Memory Processing

All pipeline state lives in a single PipelineData dataclass passed through each step. No intermediate database writes—only the final email is persisted.
Lighter weight: No validation overhead during pipeline executionFaster instantiation: Direct attribute access without property validationValidation at boundaries: Pydantic used only at API request/response boundaries
pipeline/models/core.py
from dataclasses import dataclass, field

@dataclass
class PipelineData:
    """In-memory state - not persisted to database."""
    
    # Input data
    task_id: str
    user_id: str
    email_template: str
    recipient_name: str
    recipient_interest: str
    
    # Step outputs (populated during execution)
    search_terms: List[str] = field(default_factory=list)
    scraped_content: str = ""
    arxiv_papers: List[Dict] = field(default_factory=list)
    final_email: str = ""
    
    # Metadata for final DB write
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    # Transient data (Logfire only, not persisted)
    step_timings: Dict[str, float] = field(default_factory=dict)
    errors: List[str] = field(default_factory=list)
Performance: No I/O between steps, all operations in RAM (10x faster than DB writes)Simplicity: Only 1 database write per pipeline execution (atomic transaction)Scalability: Workers scale horizontally with no database connection bottleneckObservability: Logfire captures full execution history without DB overheadMemory Efficiency: Runs on 512MB RAM with careful memory management

2. BasePipelineStep Pattern

All pipeline steps inherit from an abstract base class that provides:
  • Automatic Logfire instrumentation
  • Consistent error handling
  • Timing metrics collection
  • Progress callback integration
pipeline/core/runner.py
from abc import ABC, abstractmethod
import logfire
import time

class BasePipelineStep(ABC):
    """
    Abstract base class for all pipeline steps.
    
    Each step must implement:
    - _execute_step(): Core business logic
    - Optionally: _validate_input(): Input validation
    """
    
    def __init__(self, step_name: str):
        self.step_name = step_name
    
    async def execute(
        self,
        pipeline_data: PipelineData,
        progress_callback: Optional[Callable] = None
    ) -> StepResult:
        """Execute step with full observability."""
        start_time = time.perf_counter()
        
        # Create Logfire span for distributed tracing
        with logfire.span(
            f"pipeline.{self.step_name}",
            task_id=pipeline_data.task_id,
            step=self.step_name
        ):
            try:
                logfire.info(
                    f"{self.step_name} started",
                    task_id=pipeline_data.task_id
                )
                
                # Notify progress callback (for Celery state updates)
                if progress_callback:
                    await progress_callback(self.step_name, "started")
                
                # Validate input prerequisites
                validation_error = await self._validate_input(pipeline_data)
                if validation_error:
                    raise ValidationError(f"Input validation failed: {validation_error}")
                
                # Execute step-specific logic
                result = await self._execute_step(pipeline_data)
                
                # Calculate and record timing
                duration = time.perf_counter() - start_time
                pipeline_data.add_timing(self.step_name, duration)
                
                logfire.info(
                    f"{self.step_name} completed",
                    task_id=pipeline_data.task_id,
                    duration=duration,
                    success=result.success
                )
                
                return result
                
            except Exception as e:
                duration = time.perf_counter() - start_time
                
                logfire.error(
                    f"{self.step_name} failed",
                    task_id=pipeline_data.task_id,
                    error=str(e),
                    error_type=type(e).__name__,
                    duration=duration,
                    exc_info=True  # Include stack trace
                )
                
                # Record error in pipeline data
                pipeline_data.add_error(self.step_name, str(e))
                
                raise StepExecutionError(self.step_name, e) from e
    
    @abstractmethod
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        """Step-specific business logic - MUST BE IMPLEMENTED."""
        pass
    
    async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
        """Optional input validation - override to check prerequisites."""
        return None  # Default: no validation
Design Pattern: Template Method PatternThe execute() method is the template that defines the overall algorithm (logging, timing, error handling).Each step provides the custom logic via _execute_step() implementation.

3. PipelineRunner Orchestration

The PipelineRunner coordinates sequential execution of all registered steps.
pipeline/core/runner.py
class PipelineRunner:
    """
    Orchestrates sequential execution of all pipeline steps.
    
    Responsibilities:
    - Register steps in execution order
    - Execute steps sequentially (not parallel)
    - Handle step failures
    - Track overall progress
    - Return final result (email_id)
    """
    
    def __init__(self, steps: Optional[List[BasePipelineStep]] = None):
        self.steps = steps or []
    
    def register_step(self, step: BasePipelineStep) -> None:
        """Add a step to the pipeline (executed in registration order)."""
        self.steps.append(step)
    
    async def run(
        self,
        pipeline_data: PipelineData,
        progress_callback: Optional[Callable] = None
    ) -> str:
        """Run all pipeline steps sequentially."""
        
        # Create overall pipeline span
        with logfire.span(
            "pipeline.full_run",
            task_id=pipeline_data.task_id,
            user_id=pipeline_data.user_id,
            template_type=pipeline_data.template_type.value if pipeline_data.template_type else None
        ):
            logfire.info(
                "Pipeline execution started",
                task_id=pipeline_data.task_id,
                total_steps=len(self.steps)
            )
            
            # Execute each step sequentially
            for i, step in enumerate(self.steps):
                progress_pct = int(((i + 1) / len(self.steps)) * 100)
                
                logfire.info(
                    f"Executing step {i+1}/{len(self.steps)}",
                    step=step.step_name,
                    progress_pct=progress_pct
                )
                
                # Execute step (will raise StepExecutionError on failure)
                result = await step.execute(pipeline_data, progress_callback)
                
                # Check for failure
                if not result.success:
                    raise StepExecutionError(
                        step.step_name,
                        Exception(result.error or "Unknown error")
                    )
            
            # Verify email_id was set by final step
            email_id = pipeline_data.metadata.get("email_id")
            if not email_id:
                raise ValueError(
                    "Pipeline completed but email_id not set. "
                    "EmailComposer step must set pipeline_data.metadata['email_id']"
                )
            
            # Log completion
            total_duration = pipeline_data.total_duration()
            logfire.info(
                "Pipeline execution completed",
                task_id=pipeline_data.task_id,
                email_id=email_id,
                total_duration=total_duration,
                step_timings=pipeline_data.step_timings
            )
            
            return email_id

Celery Integration

The pipeline is orchestrated by a Celery task that manages the full execution lifecycle.
tasks/email_tasks.py
@celery_app.task(bind=True, max_retries=1)
def generate_email_task(
    self,
    *,
    queue_item_id: Optional[str] = None,
    user_id: str,
    email_template: str,
    recipient_name: str,
    recipient_interest: str,
    template_type: str | TemplateType | None = None,
) -> Dict[str, Any]:
    """Celery entrypoint for the 4-step email generation pipeline."""
    
    # Create in-memory pipeline data
    pipeline_data = PipelineData(
        task_id=self.request.id,
        user_id=user_id,
        email_template=email_template,
        recipient_name=recipient_name,
        recipient_interest=recipient_interest,
        template_type=template_type,
    )
    
    # Create pipeline runner and register steps
    runner = create_email_pipeline()  # Factory function
    
    # Progress callback for Celery state updates
    async def progress_callback(step_name: str, step_status: str) -> None:
        self.update_state(
            state='STARTED',
            meta={
                'current_step': step_name,
                'step_status': step_status,
                'step_timings': pipeline_data.step_timings
            }
        )
        # Also update database queue status
        update_queue_status(queue_item_id, status="PROCESSING", current_step=step_name)
    
    # Execute pipeline
    with logfire.span("tasks.generate_email", task_id=pipeline_data.task_id):
        try:
            email_id = await runner.run(pipeline_data, progress_callback=progress_callback)
            
            # Update final status
            self.update_state(
                state='SUCCESS',
                meta={'email_id': str(email_id), 'total_duration': pipeline_data.total_duration()}
            )
            update_queue_status(queue_item_id, status="COMPLETED", email_id=str(email_id))
            
            return {
                "task_id": pipeline_data.task_id,
                "email_id": str(email_id),
                "metadata": {
                    "step_timings": pipeline_data.step_timings,
                    "total_duration": pipeline_data.total_duration()
                }
            }
        except Exception as e:
            logfire.error("Pipeline execution failed", error=str(e), exc_info=True)
            update_queue_status(queue_item_id, status="FAILED", error_message=str(e))
            raise

Step Result Pattern

Each step returns a StepResult object that encapsulates success/failure state.
pipeline/models/core.py
@dataclass
class StepResult:
    """Result of a pipeline step execution."""
    
    success: bool
    """Whether the step completed successfully"""
    
    step_name: str
    """Name of the step that produced this result"""
    
    error: Optional[str] = None
    """Error message if success=False"""
    
    metadata: Optional[Dict[str, Any]] = None
    """
    Optional metadata about execution:
    - duration: float (seconds)
    - output_size: int (bytes/chars)
    - api_calls_made: int
    - retries_attempted: int
    """
    
    warnings: List[str] = field(default_factory=list)
    """Non-fatal warnings (e.g., 'some URLs failed to scrape')"""
    
    def __post_init__(self):
        """Validation: if success=False, error must be set"""
        if not self.success and not self.error:
            raise ValueError("StepResult with success=False must have error message")
Fatal vs Non-Fatal ErrorsReturn StepResult(success=False, error="...") for fatal errors that should stop the pipeline.Return StepResult(success=True, warnings=[...]) for non-fatal warnings that allow the pipeline to continue.

Memory Management

The pipeline is optimized to run on 512MB RAM with careful memory management:
Base Python + FastAPI:        ~80MB
Playwright Chromium:          ~150MB (when active)
Pipeline overhead:            ~50MB
LLM response caching:         ~30MB
Available buffer:             ~200MB
──────────────────────────────────
Total peak usage:             ~310MB (safe margin)
Optimization Strategies:
Only 1 Playwright browser instance active at a time to limit memory usage.
pipeline/steps/web_scraper/main.py
# Process URLs sequentially
for url in urls:
    content = await self._scrape_with_playwright(url)
    all_pages.append(content)
# NOT: await asyncio.gather(*[scrape(url) for url in urls])
Split large content at sentence boundaries (not mid-sentence) to avoid loading full content in memory.
pipeline/steps/web_scraper/summarizer.py
def _smart_chunk(self, content: str, chunk_size: int = 30_000) -> List[str]:
    """Split content at sentence boundaries to prevent truncation."""
    chunks = []
    current_pos = 0
    
    while current_pos < len(content):
        end_pos = min(current_pos + chunk_size, len(content))
        
        # Look backward for sentence boundary (max 500 chars)
        sentence_endings = ['\n\n', '. ', '.\n', '! ', '!\n', '? ', '?\n']
        for i in range(end_pos, max(end_pos - 500, current_pos), -1):
            for ending in sentence_endings:
                if content[i:i+len(ending)] == ending:
                    end_pos = i + len(ending)
                    break
        
        chunks.append(content[current_pos:end_pos])
        current_pos = end_pos
    
    return chunks
Enforce maximum content sizes at each step:
  • Web scraper summarized content: 3,000 chars (prevents context overflow)
  • ArXiv papers: Top 5 most relevant (limited by API quota)
  • Final email: 10,000 chars max (reasonable email length)

Observability

Every pipeline execution is fully traced with Logfire: Automatic Instrumentation:
  • Full execution trace with nested spans
  • Step-by-step timing breakdown
  • Error stack traces with context
  • API call telemetry (Anthropic, Google, ArXiv)
Custom Metrics:
# Timing metrics
pipeline_data.step_timings = {
    "template_parser": 1.2,
    "web_scraper": 5.3,
    "arxiv_enricher": 0.8,
    "email_composer": 3.1
}

# Error tracking
pipeline_data.errors = [
    "web_scraper: Failed to scrape 2 URLs",
    "email_composer: Validation failed on attempt 1"
]

Next Steps

Data Flow

Learn how PipelineData flows through steps and what each step reads/writes

Step 1: Template Parser

Deep dive into the first step: analyzing templates and extracting search terms

Build docs developers (and LLMs) love