The Scribe pipeline implements a sophisticated error handling system that distinguishes between fatal errors (stop execution) and non-fatal errors (continue with degraded service).
StepResult Pattern
Every pipeline step returns a StepResult object that encapsulates success/failure state, error messages, and warnings.
StepResult Structure
@dataclass
class StepResult:
"""
Result of a pipeline step execution.
Returned by BasePipelineStep.execute() to indicate success/failure.
"""
success: bool
"""Whether the step completed successfully"""
step_name: str
"""Name of the step that produced this result"""
error: Optional[str] = None
"""Error message if success=False"""
metadata: Optional[Dict[str, Any]] = None
"""
Optional metadata about execution:
- duration: float (seconds)
- output_size: int (bytes/chars)
- api_calls_made: int
- retries_attempted: int
"""
warnings: List[str] = field(default_factory=list)
"""Non-fatal warnings (e.g., 'some URLs failed to scrape')"""
The StepResult pattern provides a clean separation between success with warnings (continue pipeline) and failure (stop pipeline).
Example: Successful Step
# Email composer successfully generates email
return StepResult(
success=True,
step_name="email_composer",
metadata={
"email_id": str(email_id),
"word_count": len(email_text.split()),
"duration": 3.1
}
)
Example: Success with Warnings
# Web scraper succeeds but some URLs failed
return StepResult(
success=True,
step_name="web_scraper",
warnings=["2 out of 5 URLs failed to scrape"],
metadata={
"citation_count": 3,
"failed_urls": ["https://example.com/timeout"]
}
)
Example: Fatal Failure
# Template parser failed - cannot continue
return StepResult(
success=False,
step_name="template_parser",
error="Failed to classify template type: LLM timeout"
)
If success=False, the error field must be populated. The StepResult.__post_init__ method validates this:def __post_init__(self):
"""Validation: if success=False, error must be set"""
if not self.success and not self.error:
raise ValueError("StepResult with success=False must have error message")
Error Categories
Fatal Errors (Pipeline Stops)
Fatal errors indicate the pipeline cannot continue and should immediately terminate.
When to use:
- Template parser fails (no search terms to proceed)
- Email composer database write fails
- Missing required input fields
- Invalid user authentication
Implementation in runner.py:247-254:
# Execute step
result = await step.execute(pipeline_data, progress_callback)
# Check for failure
if not result.success:
raise StepExecutionError(
step.step_name,
Exception(result.error or "Unknown error")
)
Non-Fatal Errors (Pipeline Continues)
Non-fatal errors are logged but don’t stop the pipeline. The step returns success=True with warnings.
When to use:
- Some web scraping URLs timeout (use successful ones)
- ArXiv API unavailable (skip academic papers)
- Email validation warnings (still persist email)
- JSON parsing fallback (use plain text)
Example from email_composer/main.py:86-99:
try:
parsed = json.loads(response_text)
email_text = parsed["email"]
is_confident = parsed.get("is_confident", False)
except (json.JSONDecodeError, KeyError) as e:
logfire.warning(
"Failed to parse JSON response, falling back to plain text",
error=str(e),
response_preview=response_text[:200]
)
# Fallback: treat entire response as email text
email_text = response_text
is_confident = False # Continue with degraded confidence
Non-fatal errors should still be logged to Logfire for monitoring and debugging. Use logfire.warning() to track degraded service patterns.
Custom Exceptions
The pipeline defines a hierarchy of custom exceptions for intelligent error handling and retry logic.
Exception Hierarchy
class PipelineExecutionError(Exception):
"""
Base exception for pipeline execution failures.
All step-specific exceptions inherit from this.
Celery can catch this for retry logic.
"""
pass
class StepExecutionError(PipelineExecutionError):
"""
Raised when a pipeline step fails.
Attributes:
step_name: Name of the failed step
original_error: The underlying exception
"""
def __init__(self, step_name: str, original_error: Exception):
self.step_name = step_name
self.original_error = original_error
# Embed step_name in message for Celery serialization
error_message = f"Step '{step_name}' failed: {str(original_error)}"
super().__init__(error_message)
class ValidationError(PipelineExecutionError):
"""
Raised when step input/output validation fails.
Example: Required field missing from previous step
"""
pass
class ExternalAPIError(PipelineExecutionError):
"""
Raised when external API calls fail (Anthropic, Exa, ArXiv).
This is a retriable error - Celery should retry with exponential backoff.
"""
pass
Using Custom Exceptions
In step implementation (web_scraper/main.py:124-132):
try:
result = await self.exa_client.dual_answer(
background_query=background_query,
publications_query=publications_query,
timeout=45.0
)
except TimeoutError as e:
logfire.error("Exa timeout", error=str(e), recipient=pipeline_data.recipient_name)
raise ExternalAPIError(f"Exa search timed out: {e}")
except ConnectionError as e:
logfire.error("Exa connection error", error=str(e))
raise ExternalAPIError(f"Failed to connect to Exa API: {e}")
except Exception as e:
logfire.error("Exa failed", error_type=type(e).__name__, error=str(e))
raise ExternalAPIError(f"Exa search failed ({type(e).__name__}): {e}")
In BasePipelineStep (runner.py:114-136):
except Exception as e:
# Calculate duration even on failure
duration = time.perf_counter() - start_time
# Log error with full context
logfire.error(
f"{self.step_name} failed",
task_id=pipeline_data.task_id,
error=str(e),
error_type=type(e).__name__,
duration=duration,
exc_info=True # Include stack trace
)
# Record error in pipeline data
pipeline_data.add_error(self.step_name, str(e))
# Wrap exception for clarity
raise StepExecutionError(self.step_name, e) from e
Error Propagation
Errors propagate through multiple layers: Step → Runner → Celery Task → Queue System.
Layer 1: Step Execution
In BasePipelineStep.execute() (runner.py:66-136):
with logfire.span(
f"pipeline.{self.step_name}",
task_id=pipeline_data.task_id,
step=self.step_name
):
try:
# Execute the step-specific logic
result = await self._execute_step(pipeline_data)
# Log success
logfire.info(
f"{self.step_name} completed",
task_id=pipeline_data.task_id,
success=result.success
)
return result
except Exception as e:
# Log error with context
logfire.error(
f"{self.step_name} failed",
error=str(e),
error_type=type(e).__name__,
exc_info=True
)
# Record in pipeline data for debugging
pipeline_data.add_error(self.step_name, str(e))
# Wrap and re-raise
raise StepExecutionError(self.step_name, e) from e
Layer 2: Pipeline Runner
In PipelineRunner.run() (runner.py:223-274):
with logfire.span(
"pipeline.full_run",
task_id=pipeline_data.task_id,
user_id=pipeline_data.user_id
):
# Execute each step sequentially
for i, step in enumerate(self.steps):
# Execute step
result = await step.execute(pipeline_data, progress_callback)
# Check for failure
if not result.success:
raise StepExecutionError(
step.step_name,
Exception(result.error or "Unknown error")
)
# Verify email_id was set
email_id = pipeline_data.metadata.get("email_id")
if not email_id:
raise ValueError(
"Pipeline completed but email_id not set. "
"EmailComposer step must set pipeline_data.metadata['email_id']"
)
return email_id
Layer 3: Celery Task
In generate_email_task() (tasks/email_tasks.py:238-281):
try:
email_id = asyncio.run(_execute_pipeline())
except (StepExecutionError, PipelineExecutionError) as exc:
failed_step = getattr(exc, "step_name", None)
error_message = str(exc)
is_timeout = "timed out" in error_message.lower()
is_final_attempt = self.request.retries >= self.max_retries
# Retry logic for timeouts
if is_timeout and not is_final_attempt:
logfire.warning(
"Pipeline timed out, scheduling retry",
task_id=public_task_id,
failed_step=failed_step,
attempt=self.request.retries + 1
)
reset_queue_item_for_retry()
raise self.retry(exc=exc)
# Final failure
logfire.error(
"Pipeline execution failed",
error=error_message,
failed_step=failed_step
)
# Update queue status
update_queue_status(QueueStatus.FAILED, error_message=error_message)
# Prevent Celery from overwriting FAILURE state
raise Ignore()
Layer 4: Queue System
Error truncation for database storage (email_tasks.py:100-107):
if error_message:
# Truncate error message to 1000 characters to prevent database bloat
# Stack traces can be very large and degrade query performance
MAX_ERROR_LENGTH = 1000
if len(error_message) > MAX_ERROR_LENGTH:
item.error_message = error_message[:MAX_ERROR_LENGTH] + "... [truncated]"
else:
item.error_message = error_message
Error messages stored in the database are truncated to 1000 characters. Full stack traces are available in Logfire for debugging.
Retry Strategies
Celery Task Retries
Configuration in email_tasks.py:30:
@celery_app.task(bind=True, max_retries=1)
def generate_email_task(self, *, queue_item_id: Optional[str] = None, ...):
# Task implementation
Retry conditions:
- External API timeouts (Anthropic, Exa, ArXiv)
- Database connection errors
- Network failures
Non-retriable errors:
- Invalid input data (would fail again)
- User not found in database
- Validation errors
LLM Agent Retries
Built into agent creation (email_composer/main.py:30-38):
self.composition_agent = create_agent(
model=self.model,
system_prompt=SYSTEM_PROMPT,
temperature=self.temperature,
max_tokens=self.max_tokens,
retries=3, # 3 automatic retries for LLM calls
timeout=90.0 # 90 second timeout per attempt
)
Exa Search Timeout
Per-request timeout in web_scraper/main.py:64-68:
result = await self.exa_client.dual_answer(
background_query=background_query,
publications_query=publications_query,
timeout=45.0 # 45 second timeout
)
Error Recording in PipelineData
Non-fatal errors are tracked in the PipelineData.errors list for debugging and monitoring.
Helper method in core.py:148-150:
def add_error(self, step_name: str, error_message: str) -> None:
"""Record non-fatal error"""
self.errors.append(f"{step_name}: {error_message}")
Usage example:
# In step implementation
try:
await scrape_url(url)
except TimeoutError as e:
# Record but continue
pipeline_data.add_error("web_scraper", f"URL timeout: {url}")
logfire.warning("URL scraping timeout", url=url, error=str(e))
# Continue with other URLs
Final logging in runner.py:266-272:
logfire.info(
"Pipeline execution completed",
task_id=pipeline_data.task_id,
email_id=email_id,
total_duration=total_duration,
step_timings=pipeline_data.step_timings,
errors=pipeline_data.errors # Include all non-fatal errors
)
Best Practices
1. Always Return StepResult
# ✅ Good: Explicit success/failure
if validation_failed:
return StepResult(
success=False,
step_name=self.step_name,
error="Validation failed: missing required field"
)
return StepResult(
success=True,
step_name=self.step_name,
metadata={"items_processed": count}
)
# ❌ Bad: Raising exception for non-fatal issues
if some_urls_failed:
raise Exception("Some URLs failed") # Stops pipeline unnecessarily
2. Use Appropriate Exception Types
# ✅ Good: Use ExternalAPIError for retriable failures
try:
response = await external_api_call()
except TimeoutError as e:
raise ExternalAPIError(f"API timeout: {e}") # Celery can retry
# ❌ Bad: Generic exception loses retry information
try:
response = await external_api_call()
except TimeoutError as e:
raise Exception(f"API failed: {e}") # Celery doesn't know if retriable
3. Log Errors with Context
# ✅ Good: Rich context for debugging
logfire.error(
"Email composer failed",
task_id=pipeline_data.task_id,
user_id=pipeline_data.user_id,
error=str(e),
error_type=type(e).__name__,
recipient=pipeline_data.recipient_name,
exc_info=True # Include stack trace
)
# ❌ Bad: Missing context
logfire.error("Failed", error=str(e))
# ✅ Good: Fail fast with clear error
async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
if not pipeline_data.scraped_content:
return "scraped_content is missing (Step 2 must run first)"
if not pipeline_data.user_id:
return "user_id is missing (required for database write)"
return None
5. Track Both Fatal and Non-Fatal Errors
# ✅ Good: Distinguish error severities
warnings = []
for url in urls:
try:
content = await scrape(url)
results.append(content)
except TimeoutError:
warnings.append(f"URL timeout: {url}")
pipeline_data.add_error("web_scraper", f"Timeout: {url}")
# Continue with other URLs
if not results:
# No successful scrapes - fatal
return StepResult(
success=False,
step_name=self.step_name,
error="All scraping attempts failed"
)
return StepResult(
success=True,
step_name=self.step_name,
warnings=warnings # Non-fatal issues logged
)