All pipeline state lives in a single PipelineData dataclass passed through each step. No intermediate database writes—only the final email is persisted.
Why Dataclass over Pydantic Model?
Lighter weight: No validation overhead during pipeline executionFaster instantiation: Direct attribute access without property validationValidation at boundaries: Pydantic used only at API request/response boundaries
pipeline/models/core.py
from dataclasses import dataclass, field@dataclassclass PipelineData: """In-memory state - not persisted to database.""" # Input data task_id: str user_id: str email_template: str recipient_name: str recipient_interest: str # Step outputs (populated during execution) search_terms: List[str] = field(default_factory=list) scraped_content: str = "" arxiv_papers: List[Dict] = field(default_factory=list) final_email: str = "" # Metadata for final DB write metadata: Dict[str, Any] = field(default_factory=dict) # Transient data (Logfire only, not persisted) step_timings: Dict[str, float] = field(default_factory=dict) errors: List[str] = field(default_factory=list)
Benefits of Stateless Design
Performance: No I/O between steps, all operations in RAM (10x faster than DB writes)Simplicity: Only 1 database write per pipeline execution (atomic transaction)Scalability: Workers scale horizontally with no database connection bottleneckObservability: Logfire captures full execution history without DB overheadMemory Efficiency: Runs on 512MB RAM with careful memory management
All pipeline steps inherit from an abstract base class that provides:
Automatic Logfire instrumentation
Consistent error handling
Timing metrics collection
Progress callback integration
pipeline/core/runner.py
from abc import ABC, abstractmethodimport logfireimport timeclass BasePipelineStep(ABC): """ Abstract base class for all pipeline steps. Each step must implement: - _execute_step(): Core business logic - Optionally: _validate_input(): Input validation """ def __init__(self, step_name: str): self.step_name = step_name async def execute( self, pipeline_data: PipelineData, progress_callback: Optional[Callable] = None ) -> StepResult: """Execute step with full observability.""" start_time = time.perf_counter() # Create Logfire span for distributed tracing with logfire.span( f"pipeline.{self.step_name}", task_id=pipeline_data.task_id, step=self.step_name ): try: logfire.info( f"{self.step_name} started", task_id=pipeline_data.task_id ) # Notify progress callback (for Celery state updates) if progress_callback: await progress_callback(self.step_name, "started") # Validate input prerequisites validation_error = await self._validate_input(pipeline_data) if validation_error: raise ValidationError(f"Input validation failed: {validation_error}") # Execute step-specific logic result = await self._execute_step(pipeline_data) # Calculate and record timing duration = time.perf_counter() - start_time pipeline_data.add_timing(self.step_name, duration) logfire.info( f"{self.step_name} completed", task_id=pipeline_data.task_id, duration=duration, success=result.success ) return result except Exception as e: duration = time.perf_counter() - start_time logfire.error( f"{self.step_name} failed", task_id=pipeline_data.task_id, error=str(e), error_type=type(e).__name__, duration=duration, exc_info=True # Include stack trace ) # Record error in pipeline data pipeline_data.add_error(self.step_name, str(e)) raise StepExecutionError(self.step_name, e) from e @abstractmethod async def _execute_step(self, pipeline_data: PipelineData) -> StepResult: """Step-specific business logic - MUST BE IMPLEMENTED.""" pass async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]: """Optional input validation - override to check prerequisites.""" return None # Default: no validation
Design Pattern: Template Method PatternThe execute() method is the template that defines the overall algorithm (logging, timing, error handling).Each step provides the custom logic via _execute_step() implementation.
Each step returns a StepResult object that encapsulates success/failure state.
pipeline/models/core.py
@dataclassclass StepResult: """Result of a pipeline step execution.""" success: bool """Whether the step completed successfully""" step_name: str """Name of the step that produced this result""" error: Optional[str] = None """Error message if success=False""" metadata: Optional[Dict[str, Any]] = None """ Optional metadata about execution: - duration: float (seconds) - output_size: int (bytes/chars) - api_calls_made: int - retries_attempted: int """ warnings: List[str] = field(default_factory=list) """Non-fatal warnings (e.g., 'some URLs failed to scrape')""" def __post_init__(self): """Validation: if success=False, error must be set""" if not self.success and not self.error: raise ValueError("StepResult with success=False must have error message")
Fatal vs Non-Fatal ErrorsReturn StepResult(success=False, error="...") for fatal errors that should stop the pipeline.Return StepResult(success=True, warnings=[...]) for non-fatal warnings that allow the pipeline to continue.
Only 1 Playwright browser instance active at a time to limit memory usage.
pipeline/steps/web_scraper/main.py
# Process URLs sequentiallyfor url in urls: content = await self._scrape_with_playwright(url) all_pages.append(content)# NOT: await asyncio.gather(*[scrape(url) for url in urls])
Smart Chunking
Split large content at sentence boundaries (not mid-sentence) to avoid loading full content in memory.
pipeline/steps/web_scraper/summarizer.py
def _smart_chunk(self, content: str, chunk_size: int = 30_000) -> List[str]: """Split content at sentence boundaries to prevent truncation.""" chunks = [] current_pos = 0 while current_pos < len(content): end_pos = min(current_pos + chunk_size, len(content)) # Look backward for sentence boundary (max 500 chars) sentence_endings = ['\n\n', '. ', '.\n', '! ', '!\n', '? ', '?\n'] for i in range(end_pos, max(end_pos - 500, current_pos), -1): for ending in sentence_endings: if content[i:i+len(ending)] == ending: end_pos = i + len(ending) break chunks.append(content[current_pos:end_pos]) current_pos = end_pos return chunks
Content Limits
Enforce maximum content sizes at each step:
Web scraper summarized content: 3,000 chars (prevents context overflow)
ArXiv papers: Top 5 most relevant (limited by API quota)
Final email: 10,000 chars max (reasonable email length)