BasePipelineStep Architecture
All pipeline steps inherit fromBasePipelineStep, which provides:
- Automatic Logfire instrumentation
- Error handling and logging
- Timing metrics collection
- Progress callbacks
- Input validation framework
pipeline/core/runner.py:17-169:
class BasePipelineStep(ABC):
"""
Abstract base class for all pipeline steps.
Each step must implement:
- _execute_step(): Core business logic
- Optionally: _validate_input(): Input validation
The execute() method wraps step execution with:
- Logfire observability spans
- Error handling and logging
- Timing metrics
- Result validation
"""
def __init__(self, step_name: str):
"""
Initialize pipeline step.
Args:
step_name: Unique identifier for this step (used in logs)
"""
self.step_name = step_name
async def execute(
self,
pipeline_data: PipelineData,
progress_callback: Optional[Callable[[str, str], Awaitable[None]]] = None
) -> StepResult:
"""
Execute the pipeline step with full observability.
This is the public interface - it wraps the step-specific
_execute_step() method with error handling and logging.
"""
start_time = time.perf_counter()
with logfire.span(
f"pipeline.{self.step_name}",
task_id=pipeline_data.task_id,
step=self.step_name
):
try:
# Validate input prerequisites
validation_error = await self._validate_input(pipeline_data)
if validation_error:
raise ValidationError(f"Input validation failed: {validation_error}")
# Execute the step-specific logic
result = await self._execute_step(pipeline_data)
# Calculate duration and update metadata
duration = time.perf_counter() - start_time
pipeline_data.add_timing(self.step_name, duration)
return result
except Exception as e:
logfire.error(
f"{self.step_name} failed",
task_id=pipeline_data.task_id,
error=str(e),
error_type=type(e).__name__,
exc_info=True
)
pipeline_data.add_error(self.step_name, str(e))
raise StepExecutionError(self.step_name, e) from e
async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
"""
Validate that prerequisites for this step are met.
Override this method to check:
- Required fields are populated
- Data is in expected format
- Dependencies from previous steps exist
Returns:
Error message if validation fails, None if valid
"""
return None
@abstractmethod
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
"""
Execute step-specific business logic.
MUST BE IMPLEMENTED by each step.
Args:
pipeline_data: Shared data object (modify in-place)
Returns:
StepResult with success=True/False
"""
pass
Creating a Custom Step
Step 1: Define Your Step Class
Create a new file inpipeline/steps/your_step/main.py:
import logfire
from typing import Optional
from pipeline.core.runner import BasePipelineStep
from pipeline.models.core import PipelineData, StepResult
class CustomAnalyzerStep(BasePipelineStep):
"""Custom step that analyzes recipient information."""
def __init__(self, analysis_depth: str = "basic"):
# Always call super().__init__ with unique step name
super().__init__(step_name="custom_analyzer")
# Store configuration
self.analysis_depth = analysis_depth
async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
"""
Validate required fields from previous steps.
This runs BEFORE _execute_step() and can prevent execution
if prerequisites are not met.
"""
# Check required fields
if not pipeline_data.recipient_name:
return "recipient_name is missing"
if not pipeline_data.scraped_content:
return "scraped_content is missing (WebScraper must run first)"
# Validation passed
return None
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
"""
Core business logic for custom analysis.
Modify pipeline_data in-place to share results with later steps.
"""
try:
logfire.info(
"Starting custom analysis",
task_id=pipeline_data.task_id,
analysis_depth=self.analysis_depth
)
# Perform analysis (example: sentiment analysis)
sentiment_score = await self._analyze_sentiment(
pipeline_data.scraped_content
)
# Store results in pipeline_data for later steps
pipeline_data.metadata["sentiment_score"] = sentiment_score
pipeline_data.metadata["analysis_depth"] = self.analysis_depth
logfire.info(
"Custom analysis completed",
task_id=pipeline_data.task_id,
sentiment_score=sentiment_score
)
# Return success
return StepResult(
success=True,
step_name=self.step_name,
metadata={
"sentiment_score": sentiment_score,
"content_length": len(pipeline_data.scraped_content)
}
)
except Exception as e:
# Log error (BasePipelineStep will also log)
logfire.error(
"Custom analysis failed",
task_id=pipeline_data.task_id,
error=str(e),
error_type=type(e).__name__
)
# Return failure
return StepResult(
success=False,
step_name=self.step_name,
error=f"Analysis failed: {str(e)}"
)
async def _analyze_sentiment(self, content: str) -> float:
"""Helper method for sentiment analysis."""
# Your analysis logic here
# Return score between -1.0 (negative) and 1.0 (positive)
return 0.75
Step 2: Register Your Step
Add your step to the pipeline inpipeline/__init__.py:
from pipeline.core.runner import PipelineRunner
from pipeline.steps.template_parser import TemplateParserStep
from pipeline.steps.web_scraper import WebScraperStep
from pipeline.steps.custom_analyzer import CustomAnalyzerStep # Import your step
from pipeline.steps.arxiv_helper import ArxivHelperStep
from pipeline.steps.email_composer import EmailComposerStep
def create_email_pipeline() -> PipelineRunner:
"""Factory function to create default email generation pipeline."""
runner = PipelineRunner()
# Register steps in execution order
runner.register_step(TemplateParserStep())
runner.register_step(WebScraperStep())
runner.register_step(CustomAnalyzerStep(analysis_depth="detailed")) # Add your step
runner.register_step(ArxivHelperStep())
runner.register_step(EmailComposerStep())
return runner
Step 3: Use Results in Later Steps
Access your step’s output in subsequent steps:class EmailComposerStep(BasePipelineStep):
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
# Access sentiment score from custom analyzer
sentiment_score = pipeline_data.metadata.get("sentiment_score", 0.0)
# Adjust email tone based on sentiment
if sentiment_score > 0.5:
tone = "enthusiastic"
elif sentiment_score < -0.5:
tone = "professional"
else:
tone = "neutral"
# Use in email composition prompt...
Modifying Existing Steps
Extending a Step with Custom Behavior
You can subclass existing steps to modify their behavior:from pipeline.steps.web_scraper import WebScraperStep
class EnhancedWebScraperStep(WebScraperStep):
"""Extended web scraper with additional data sources."""
def __init__(self):
super().__init__()
self.enable_linkedin = True
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
# Call parent implementation first
result = await super()._execute_step(pipeline_data)
if not result.success:
return result
# Add LinkedIn scraping
if self.enable_linkedin:
linkedin_data = await self._scrape_linkedin(
pipeline_data.recipient_name
)
if linkedin_data:
# Append to existing scraped content
pipeline_data.scraped_content += f"\n\nLinkedIn: {linkedin_data}"
pipeline_data.scraped_urls.append(linkedin_data["url"])
return result
async def _scrape_linkedin(self, name: str) -> Optional[dict]:
"""Scrape LinkedIn profile."""
# Your LinkedIn scraping logic
pass
Overriding Validation Logic
from pipeline.steps.email_composer import EmailComposerStep
class StrictEmailComposerStep(EmailComposerStep):
"""Email composer with stricter validation."""
async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
# Call parent validation
parent_error = await super()._validate_input(pipeline_data)
if parent_error:
return parent_error
# Additional validation: require ArXiv papers for RESEARCH templates
if pipeline_data.template_type == TemplateType.RESEARCH:
if not pipeline_data.arxiv_papers:
return "ArXiv papers required for RESEARCH template type"
# Require minimum content length
if len(pipeline_data.scraped_content) < 500:
return "Insufficient scraped content (minimum 500 chars required)"
return None
Adding New Fields to PipelineData
To share custom data between steps, extendPipelineData:
# In pipeline/models/core.py
@dataclass
class PipelineData:
# ... existing fields ...
# Add your custom fields
sentiment_score: float = 0.0
linkedin_profile: Optional[dict] = None
custom_metadata: Dict[str, Any] = field(default_factory=dict)
Changing
PipelineData requires updating all places where it’s instantiated. Consider using the metadata dict for new fields to avoid breaking changes:# ✅ Good: Use metadata dict
pipeline_data.metadata["sentiment_score"] = 0.75
# ❌ Avoid: Adding new fields requires code changes everywhere
pipeline_data.sentiment_score = 0.75
Creating Custom Pipelines
Alternative Pipeline for Different Use Cases
You can create specialized pipelines for different email types:# In pipeline/__init__.py
def create_quick_email_pipeline() -> PipelineRunner:
"""
Fast pipeline for simple emails (skips ArXiv, uses cached scraping).
"""
runner = PipelineRunner()
runner.register_step(TemplateParserStep())
runner.register_step(CachedWebScraperStep()) # Use cache
# Skip ArxivHelperStep for speed
runner.register_step(FastEmailComposerStep()) # Haiku instead of Sonnet
return runner
def create_research_pipeline() -> PipelineRunner:
"""
Research-focused pipeline with deep analysis.
"""
runner = PipelineRunner()
runner.register_step(TemplateParserStep())
runner.register_step(WebScraperStep())
runner.register_step(ScholarSearchStep()) # Google Scholar scraping
runner.register_step(ArxivHelperStep())
runner.register_step(PublicationAnalyzerStep()) # Analyze papers
runner.register_step(EmailComposerStep())
return runner
Conditional Step Execution
You can implement conditional logic in steps:class ConditionalArxivStep(BasePipelineStep):
"""Only run ArXiv search for RESEARCH templates."""
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
# Skip if not RESEARCH template
if pipeline_data.template_type != TemplateType.RESEARCH:
logfire.info(
"Skipping ArXiv search (not RESEARCH template)",
task_id=pipeline_data.task_id,
template_type=pipeline_data.template_type.value
)
return StepResult(
success=True,
step_name=self.step_name,
warnings=["Skipped: template type not RESEARCH"]
)
# Execute ArXiv search
papers = await self._search_arxiv(pipeline_data.recipient_name)
pipeline_data.arxiv_papers = papers
return StepResult(
success=True,
step_name=self.step_name,
metadata={"papers_found": len(papers)}
)
Advanced Patterns
Parallel Step Execution
For independent steps that can run concurrently:import asyncio
from typing import List
class ParallelPipelineRunner(PipelineRunner):
"""Pipeline runner that can execute steps in parallel."""
async def run_parallel(
self,
pipeline_data: PipelineData,
parallel_groups: List[List[BasePipelineStep]]
) -> str:
"""
Execute groups of steps in parallel.
Args:
parallel_groups: List of step groups. Steps within a group
run in parallel, groups run sequentially.
Example:
parallel_groups = [
[TemplateParserStep()], # Group 1: runs first
[WebScraperStep(), TwitterScraperStep()], # Group 2: parallel
[EmailComposerStep()] # Group 3: runs after group 2
]
"""
for group_idx, step_group in enumerate(parallel_groups):
logfire.info(
f"Executing parallel group {group_idx + 1}/{len(parallel_groups)}",
steps=[s.step_name for s in step_group]
)
# Execute all steps in group concurrently
tasks = [
step.execute(pipeline_data) for step in step_group
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for failures
for step, result in zip(step_group, results):
if isinstance(result, Exception):
raise result
if not result.success:
raise StepExecutionError(step.step_name, Exception(result.error))
return pipeline_data.metadata["email_id"]
Retry Logic for Specific Steps
class RetryableWebScraperStep(WebScraperStep):
"""Web scraper with built-in retry logic."""
def __init__(self, max_retries: int = 3):
super().__init__()
self.max_retries = max_retries
async def _execute_step(self, pipeline_data: PipelineData) -> StepResult:
last_error = None
for attempt in range(1, self.max_retries + 1):
try:
logfire.info(
f"Web scraping attempt {attempt}/{self.max_retries}",
task_id=pipeline_data.task_id
)
result = await super()._execute_step(pipeline_data)
if result.success:
return result
last_error = result.error
except Exception as e:
last_error = str(e)
logfire.warning(
f"Attempt {attempt} failed",
error=str(e),
task_id=pipeline_data.task_id
)
if attempt < self.max_retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# All retries exhausted
return StepResult(
success=False,
step_name=self.step_name,
error=f"Failed after {self.max_retries} attempts: {last_error}"
)
Step with External Configuration
from config.settings import settings
class ConfigurableEmailComposerStep(EmailComposerStep):
"""Email composer with hot-swappable configuration."""
def __init__(self):
super().__init__()
# Load from environment variables
self.model = settings.email_composer_model
self.temperature = settings.email_composer_temperature
self.max_tokens = settings.email_composer_max_tokens
logfire.info(
"Email composer initialized",
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens
)
# .env
EMAIL_COMPOSER_MODEL=anthropic:claude-sonnet-4-5
EMAIL_COMPOSER_TEMPERATURE=0.4
EMAIL_COMPOSER_MAX_TOKENS=10000
Testing Custom Steps
Unit Test Example
import pytest
from pipeline.models.core import PipelineData
from pipeline.steps.custom_analyzer import CustomAnalyzerStep
@pytest.mark.asyncio
async def test_custom_analyzer_basic():
"""Test custom analyzer with valid input."""
step = CustomAnalyzerStep(analysis_depth="basic")
data = PipelineData(
task_id="test-123",
user_id="user-456",
email_template="Hello {{name}}",
recipient_name="Dr. Smith",
recipient_interest="AI",
scraped_content="Dr. Smith is a professor of AI at MIT."
)
result = await step.execute(data)
assert result.success
assert "sentiment_score" in data.metadata
assert -1.0 <= data.metadata["sentiment_score"] <= 1.0
@pytest.mark.asyncio
async def test_custom_analyzer_validation_failure():
"""Test that validation catches missing prerequisites."""
step = CustomAnalyzerStep()
data = PipelineData(
task_id="test-123",
user_id="user-456",
email_template="Hello {{name}}",
recipient_name="Dr. Smith",
recipient_interest="AI",
scraped_content="" # Empty - should fail validation
)
with pytest.raises(ValidationError):
await step.execute(data)
Integration Test Example
@pytest.mark.integration
@pytest.mark.asyncio
async def test_custom_pipeline_end_to_end():
"""Test complete pipeline with custom step."""
runner = PipelineRunner()
runner.register_step(TemplateParserStep())
runner.register_step(WebScraperStep())
runner.register_step(CustomAnalyzerStep())
runner.register_step(EmailComposerStep())
data = PipelineData(
task_id="integration-test",
user_id="user-123",
email_template="Hello {{name}}, I read about {{research}}.",
recipient_name="Dr. Jane Smith",
recipient_interest="machine learning"
)
email_id = await runner.run(data)
assert email_id is not None
assert data.final_email != ""
assert "sentiment_score" in data.metadata
Best Practices
1. Keep Steps Focused
# ✅ Good: Single responsibility
class SentimentAnalyzerStep(BasePipelineStep):
"""Analyzes sentiment of scraped content."""
async def _execute_step(self, data: PipelineData) -> StepResult:
data.metadata["sentiment"] = await analyze_sentiment(data.scraped_content)
return StepResult(success=True, step_name=self.step_name)
# ❌ Bad: Too many responsibilities
class AnalyzeAndEmailStep(BasePipelineStep):
"""Analyzes sentiment AND composes email."""
# Violates single responsibility principle
2. Validate Inputs Explicitly
# ✅ Good: Clear validation
async def _validate_input(self, data: PipelineData) -> Optional[str]:
if not data.scraped_content:
return "scraped_content missing (WebScraper must run first)"
if len(data.scraped_content) < 100:
return "scraped_content too short (minimum 100 chars)"
return None
3. Log Meaningful Events
# ✅ Good: Structured logging with context
logfire.info(
"Sentiment analysis completed",
task_id=pipeline_data.task_id,
sentiment_score=score,
confidence=confidence,
content_length=len(content)
)
4. Handle Errors Gracefully
# ✅ Good: Distinguish fatal vs non-fatal
try:
result = await external_api_call()
except TimeoutError as e:
# Non-fatal: log warning and continue
logfire.warning("API timeout, using cached data", error=str(e))
result = get_cached_data()
except AuthenticationError as e:
# Fatal: return failure
return StepResult(
success=False,
step_name=self.step_name,
error=f"Authentication failed: {e}"
)
5. Document Configuration Options
class CustomStep(BasePipelineStep):
"""
Custom analyzer with configurable behavior.
Args:
analysis_depth: Level of analysis ('basic', 'detailed', 'comprehensive')
enable_caching: Whether to cache results (default: True)
timeout: Max time in seconds (default: 30.0)
Environment Variables:
CUSTOM_ANALYZER_API_KEY: Required API key for analysis service
"""
def __init__(
self,
analysis_depth: str = "basic",
enable_caching: bool = True,
timeout: float = 30.0
):
super().__init__(step_name="custom_analyzer")
self.analysis_depth = analysis_depth
self.enable_caching = enable_caching
self.timeout = timeout
Related Documentation
- Error Handling - StepResult patterns and error propagation
- Observability - Logging and tracing for custom steps
- Pipeline Architecture - Overall system design
- Core Models - PipelineData and StepResult structures
