Skip to main content
The Scribe pipeline is designed for extensibility. You can add custom steps, modify existing ones, or create entirely new pipelines.

BasePipelineStep Architecture

All pipeline steps inherit from BasePipelineStep, which provides:
  • Automatic Logfire instrumentation
  • Error handling and logging
  • Timing metrics collection
  • Progress callbacks
  • Input validation framework
From pipeline/core/runner.py:17-169:
class BasePipelineStep(ABC):
    """
    Abstract base class for all pipeline steps.
    
    Each step must implement:
    - _execute_step(): Core business logic
    - Optionally: _validate_input(): Input validation
    
    The execute() method wraps step execution with:
    - Logfire observability spans
    - Error handling and logging
    - Timing metrics
    - Result validation
    """
    
    def __init__(self, step_name: str):
        """
        Initialize pipeline step.
        
        Args:
            step_name: Unique identifier for this step (used in logs)
        """
        self.step_name = step_name
    
    async def execute(
        self,
        pipeline_data: PipelineData,
        progress_callback: Optional[Callable[[str, str], Awaitable[None]]] = None
    ) -> StepResult:
        """
        Execute the pipeline step with full observability.
        
        This is the public interface - it wraps the step-specific
        _execute_step() method with error handling and logging.
        """
        start_time = time.perf_counter()
        
        with logfire.span(
            f"pipeline.{self.step_name}",
            task_id=pipeline_data.task_id,
            step=self.step_name
        ):
            try:
                # Validate input prerequisites
                validation_error = await self._validate_input(pipeline_data)
                if validation_error:
                    raise ValidationError(f"Input validation failed: {validation_error}")
                
                # Execute the step-specific logic
                result = await self._execute_step(pipeline_data)
                
                # Calculate duration and update metadata
                duration = time.perf_counter() - start_time
                pipeline_data.add_timing(self.step_name, duration)
                
                return result
                
            except Exception as e:
                logfire.error(
                    f"{self.step_name} failed",
                    task_id=pipeline_data.task_id,
                    error=str(e),
                    error_type=type(e).__name__,
                    exc_info=True
                )
                pipeline_data.add_error(self.step_name, str(e))
                raise StepExecutionError(self.step_name, e) from e
    
    async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
        """
        Validate that prerequisites for this step are met.
        
        Override this method to check:
        - Required fields are populated
        - Data is in expected format
        - Dependencies from previous steps exist
        
        Returns:
            Error message if validation fails, None if valid
        """
        return None
    
    @abstractmethod
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        """
        Execute step-specific business logic.
        
        MUST BE IMPLEMENTED by each step.
        
        Args:
            pipeline_data: Shared data object (modify in-place)
        
        Returns:
            StepResult with success=True/False
        """
        pass

Creating a Custom Step

Step 1: Define Your Step Class

Create a new file in pipeline/steps/your_step/main.py:
import logfire
from typing import Optional
from pipeline.core.runner import BasePipelineStep
from pipeline.models.core import PipelineData, StepResult


class CustomAnalyzerStep(BasePipelineStep):
    """Custom step that analyzes recipient information."""
    
    def __init__(self, analysis_depth: str = "basic"):
        # Always call super().__init__ with unique step name
        super().__init__(step_name="custom_analyzer")
        
        # Store configuration
        self.analysis_depth = analysis_depth
    
    async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
        """
        Validate required fields from previous steps.
        
        This runs BEFORE _execute_step() and can prevent execution
        if prerequisites are not met.
        """
        # Check required fields
        if not pipeline_data.recipient_name:
            return "recipient_name is missing"
        
        if not pipeline_data.scraped_content:
            return "scraped_content is missing (WebScraper must run first)"
        
        # Validation passed
        return None
    
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        """
        Core business logic for custom analysis.
        
        Modify pipeline_data in-place to share results with later steps.
        """
        try:
            logfire.info(
                "Starting custom analysis",
                task_id=pipeline_data.task_id,
                analysis_depth=self.analysis_depth
            )
            
            # Perform analysis (example: sentiment analysis)
            sentiment_score = await self._analyze_sentiment(
                pipeline_data.scraped_content
            )
            
            # Store results in pipeline_data for later steps
            pipeline_data.metadata["sentiment_score"] = sentiment_score
            pipeline_data.metadata["analysis_depth"] = self.analysis_depth
            
            logfire.info(
                "Custom analysis completed",
                task_id=pipeline_data.task_id,
                sentiment_score=sentiment_score
            )
            
            # Return success
            return StepResult(
                success=True,
                step_name=self.step_name,
                metadata={
                    "sentiment_score": sentiment_score,
                    "content_length": len(pipeline_data.scraped_content)
                }
            )
            
        except Exception as e:
            # Log error (BasePipelineStep will also log)
            logfire.error(
                "Custom analysis failed",
                task_id=pipeline_data.task_id,
                error=str(e),
                error_type=type(e).__name__
            )
            
            # Return failure
            return StepResult(
                success=False,
                step_name=self.step_name,
                error=f"Analysis failed: {str(e)}"
            )
    
    async def _analyze_sentiment(self, content: str) -> float:
        """Helper method for sentiment analysis."""
        # Your analysis logic here
        # Return score between -1.0 (negative) and 1.0 (positive)
        return 0.75

Step 2: Register Your Step

Add your step to the pipeline in pipeline/__init__.py:
from pipeline.core.runner import PipelineRunner
from pipeline.steps.template_parser import TemplateParserStep
from pipeline.steps.web_scraper import WebScraperStep
from pipeline.steps.custom_analyzer import CustomAnalyzerStep  # Import your step
from pipeline.steps.arxiv_helper import ArxivHelperStep
from pipeline.steps.email_composer import EmailComposerStep


def create_email_pipeline() -> PipelineRunner:
    """Factory function to create default email generation pipeline."""
    runner = PipelineRunner()
    
    # Register steps in execution order
    runner.register_step(TemplateParserStep())
    runner.register_step(WebScraperStep())
    runner.register_step(CustomAnalyzerStep(analysis_depth="detailed"))  # Add your step
    runner.register_step(ArxivHelperStep())
    runner.register_step(EmailComposerStep())
    
    return runner

Step 3: Use Results in Later Steps

Access your step’s output in subsequent steps:
class EmailComposerStep(BasePipelineStep):
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        # Access sentiment score from custom analyzer
        sentiment_score = pipeline_data.metadata.get("sentiment_score", 0.0)
        
        # Adjust email tone based on sentiment
        if sentiment_score > 0.5:
            tone = "enthusiastic"
        elif sentiment_score < -0.5:
            tone = "professional"
        else:
            tone = "neutral"
        
        # Use in email composition prompt...

Modifying Existing Steps

Extending a Step with Custom Behavior

You can subclass existing steps to modify their behavior:
from pipeline.steps.web_scraper import WebScraperStep


class EnhancedWebScraperStep(WebScraperStep):
    """Extended web scraper with additional data sources."""
    
    def __init__(self):
        super().__init__()
        self.enable_linkedin = True
    
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        # Call parent implementation first
        result = await super()._execute_step(pipeline_data)
        
        if not result.success:
            return result
        
        # Add LinkedIn scraping
        if self.enable_linkedin:
            linkedin_data = await self._scrape_linkedin(
                pipeline_data.recipient_name
            )
            
            if linkedin_data:
                # Append to existing scraped content
                pipeline_data.scraped_content += f"\n\nLinkedIn: {linkedin_data}"
                pipeline_data.scraped_urls.append(linkedin_data["url"])
        
        return result
    
    async def _scrape_linkedin(self, name: str) -> Optional[dict]:
        """Scrape LinkedIn profile."""
        # Your LinkedIn scraping logic
        pass

Overriding Validation Logic

from pipeline.steps.email_composer import EmailComposerStep


class StrictEmailComposerStep(EmailComposerStep):
    """Email composer with stricter validation."""
    
    async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
        # Call parent validation
        parent_error = await super()._validate_input(pipeline_data)
        if parent_error:
            return parent_error
        
        # Additional validation: require ArXiv papers for RESEARCH templates
        if pipeline_data.template_type == TemplateType.RESEARCH:
            if not pipeline_data.arxiv_papers:
                return "ArXiv papers required for RESEARCH template type"
        
        # Require minimum content length
        if len(pipeline_data.scraped_content) < 500:
            return "Insufficient scraped content (minimum 500 chars required)"
        
        return None

Adding New Fields to PipelineData

To share custom data between steps, extend PipelineData:
# In pipeline/models/core.py

@dataclass
class PipelineData:
    # ... existing fields ...
    
    # Add your custom fields
    sentiment_score: float = 0.0
    linkedin_profile: Optional[dict] = None
    custom_metadata: Dict[str, Any] = field(default_factory=dict)
Changing PipelineData requires updating all places where it’s instantiated. Consider using the metadata dict for new fields to avoid breaking changes:
# ✅ Good: Use metadata dict
pipeline_data.metadata["sentiment_score"] = 0.75

# ❌ Avoid: Adding new fields requires code changes everywhere
pipeline_data.sentiment_score = 0.75

Creating Custom Pipelines

Alternative Pipeline for Different Use Cases

You can create specialized pipelines for different email types:
# In pipeline/__init__.py

def create_quick_email_pipeline() -> PipelineRunner:
    """
    Fast pipeline for simple emails (skips ArXiv, uses cached scraping).
    """
    runner = PipelineRunner()
    
    runner.register_step(TemplateParserStep())
    runner.register_step(CachedWebScraperStep())  # Use cache
    # Skip ArxivHelperStep for speed
    runner.register_step(FastEmailComposerStep())  # Haiku instead of Sonnet
    
    return runner


def create_research_pipeline() -> PipelineRunner:
    """
    Research-focused pipeline with deep analysis.
    """
    runner = PipelineRunner()
    
    runner.register_step(TemplateParserStep())
    runner.register_step(WebScraperStep())
    runner.register_step(ScholarSearchStep())  # Google Scholar scraping
    runner.register_step(ArxivHelperStep())
    runner.register_step(PublicationAnalyzerStep())  # Analyze papers
    runner.register_step(EmailComposerStep())
    
    return runner

Conditional Step Execution

You can implement conditional logic in steps:
class ConditionalArxivStep(BasePipelineStep):
    """Only run ArXiv search for RESEARCH templates."""
    
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        # Skip if not RESEARCH template
        if pipeline_data.template_type != TemplateType.RESEARCH:
            logfire.info(
                "Skipping ArXiv search (not RESEARCH template)",
                task_id=pipeline_data.task_id,
                template_type=pipeline_data.template_type.value
            )
            return StepResult(
                success=True,
                step_name=self.step_name,
                warnings=["Skipped: template type not RESEARCH"]
            )
        
        # Execute ArXiv search
        papers = await self._search_arxiv(pipeline_data.recipient_name)
        pipeline_data.arxiv_papers = papers
        
        return StepResult(
            success=True,
            step_name=self.step_name,
            metadata={"papers_found": len(papers)}
        )

Advanced Patterns

Parallel Step Execution

For independent steps that can run concurrently:
import asyncio
from typing import List


class ParallelPipelineRunner(PipelineRunner):
    """Pipeline runner that can execute steps in parallel."""
    
    async def run_parallel(
        self,
        pipeline_data: PipelineData,
        parallel_groups: List[List[BasePipelineStep]]
    ) -> str:
        """
        Execute groups of steps in parallel.
        
        Args:
            parallel_groups: List of step groups. Steps within a group
                           run in parallel, groups run sequentially.
        
        Example:
            parallel_groups = [
                [TemplateParserStep()],  # Group 1: runs first
                [WebScraperStep(), TwitterScraperStep()],  # Group 2: parallel
                [EmailComposerStep()]  # Group 3: runs after group 2
            ]
        """
        for group_idx, step_group in enumerate(parallel_groups):
            logfire.info(
                f"Executing parallel group {group_idx + 1}/{len(parallel_groups)}",
                steps=[s.step_name for s in step_group]
            )
            
            # Execute all steps in group concurrently
            tasks = [
                step.execute(pipeline_data) for step in step_group
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Check for failures
            for step, result in zip(step_group, results):
                if isinstance(result, Exception):
                    raise result
                if not result.success:
                    raise StepExecutionError(step.step_name, Exception(result.error))
        
        return pipeline_data.metadata["email_id"]

Retry Logic for Specific Steps

class RetryableWebScraperStep(WebScraperStep):
    """Web scraper with built-in retry logic."""
    
    def __init__(self, max_retries: int = 3):
        super().__init__()
        self.max_retries = max_retries
    
    async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
        last_error = None
        
        for attempt in range(1, self.max_retries + 1):
            try:
                logfire.info(
                    f"Web scraping attempt {attempt}/{self.max_retries}",
                    task_id=pipeline_data.task_id
                )
                
                result = await super()._execute_step(pipeline_data)
                
                if result.success:
                    return result
                
                last_error = result.error
                
            except Exception as e:
                last_error = str(e)
                logfire.warning(
                    f"Attempt {attempt} failed",
                    error=str(e),
                    task_id=pipeline_data.task_id
                )
                
                if attempt < self.max_retries:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
        
        # All retries exhausted
        return StepResult(
            success=False,
            step_name=self.step_name,
            error=f"Failed after {self.max_retries} attempts: {last_error}"
        )

Step with External Configuration

from config.settings import settings


class ConfigurableEmailComposerStep(EmailComposerStep):
    """Email composer with hot-swappable configuration."""
    
    def __init__(self):
        super().__init__()
        
        # Load from environment variables
        self.model = settings.email_composer_model
        self.temperature = settings.email_composer_temperature
        self.max_tokens = settings.email_composer_max_tokens
        
        logfire.info(
            "Email composer initialized",
            model=self.model,
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )
Environment variables:
# .env
EMAIL_COMPOSER_MODEL=anthropic:claude-sonnet-4-5
EMAIL_COMPOSER_TEMPERATURE=0.4
EMAIL_COMPOSER_MAX_TOKENS=10000

Testing Custom Steps

Unit Test Example

import pytest
from pipeline.models.core import PipelineData
from pipeline.steps.custom_analyzer import CustomAnalyzerStep


@pytest.mark.asyncio
async def test_custom_analyzer_basic():
    """Test custom analyzer with valid input."""
    step = CustomAnalyzerStep(analysis_depth="basic")
    
    data = PipelineData(
        task_id="test-123",
        user_id="user-456",
        email_template="Hello {{name}}",
        recipient_name="Dr. Smith",
        recipient_interest="AI",
        scraped_content="Dr. Smith is a professor of AI at MIT."
    )
    
    result = await step.execute(data)
    
    assert result.success
    assert "sentiment_score" in data.metadata
    assert -1.0 <= data.metadata["sentiment_score"] <= 1.0


@pytest.mark.asyncio
async def test_custom_analyzer_validation_failure():
    """Test that validation catches missing prerequisites."""
    step = CustomAnalyzerStep()
    
    data = PipelineData(
        task_id="test-123",
        user_id="user-456",
        email_template="Hello {{name}}",
        recipient_name="Dr. Smith",
        recipient_interest="AI",
        scraped_content=""  # Empty - should fail validation
    )
    
    with pytest.raises(ValidationError):
        await step.execute(data)

Integration Test Example

@pytest.mark.integration
@pytest.mark.asyncio
async def test_custom_pipeline_end_to_end():
    """Test complete pipeline with custom step."""
    runner = PipelineRunner()
    runner.register_step(TemplateParserStep())
    runner.register_step(WebScraperStep())
    runner.register_step(CustomAnalyzerStep())
    runner.register_step(EmailComposerStep())
    
    data = PipelineData(
        task_id="integration-test",
        user_id="user-123",
        email_template="Hello {{name}}, I read about {{research}}.",
        recipient_name="Dr. Jane Smith",
        recipient_interest="machine learning"
    )
    
    email_id = await runner.run(data)
    
    assert email_id is not None
    assert data.final_email != ""
    assert "sentiment_score" in data.metadata

Best Practices

1. Keep Steps Focused

# ✅ Good: Single responsibility
class SentimentAnalyzerStep(BasePipelineStep):
    """Analyzes sentiment of scraped content."""
    async def _execute_step(self, data: PipelineData) -> StepResult:
        data.metadata["sentiment"] = await analyze_sentiment(data.scraped_content)
        return StepResult(success=True, step_name=self.step_name)

# ❌ Bad: Too many responsibilities
class AnalyzeAndEmailStep(BasePipelineStep):
    """Analyzes sentiment AND composes email."""
    # Violates single responsibility principle

2. Validate Inputs Explicitly

# ✅ Good: Clear validation
async def _validate_input(self, data: PipelineData) -> Optional[str]:
    if not data.scraped_content:
        return "scraped_content missing (WebScraper must run first)"
    if len(data.scraped_content) < 100:
        return "scraped_content too short (minimum 100 chars)"
    return None

3. Log Meaningful Events

# ✅ Good: Structured logging with context
logfire.info(
    "Sentiment analysis completed",
    task_id=pipeline_data.task_id,
    sentiment_score=score,
    confidence=confidence,
    content_length=len(content)
)

4. Handle Errors Gracefully

# ✅ Good: Distinguish fatal vs non-fatal
try:
    result = await external_api_call()
except TimeoutError as e:
    # Non-fatal: log warning and continue
    logfire.warning("API timeout, using cached data", error=str(e))
    result = get_cached_data()
except AuthenticationError as e:
    # Fatal: return failure
    return StepResult(
        success=False,
        step_name=self.step_name,
        error=f"Authentication failed: {e}"
    )

5. Document Configuration Options

class CustomStep(BasePipelineStep):
    """
    Custom analyzer with configurable behavior.
    
    Args:
        analysis_depth: Level of analysis ('basic', 'detailed', 'comprehensive')
        enable_caching: Whether to cache results (default: True)
        timeout: Max time in seconds (default: 30.0)
    
    Environment Variables:
        CUSTOM_ANALYZER_API_KEY: Required API key for analysis service
    """
    
    def __init__(
        self,
        analysis_depth: str = "basic",
        enable_caching: bool = True,
        timeout: float = 30.0
    ):
        super().__init__(step_name="custom_analyzer")
        self.analysis_depth = analysis_depth
        self.enable_caching = enable_caching
        self.timeout = timeout

Build docs developers (and LLMs) love