Skip to main content
The Scribe pipeline uses Logfire for comprehensive observability: structured logging, distributed tracing, LLM call tracking, and real-time monitoring.

Logfire Configuration

Initialization

Logfire is initialized once at application startup with automatic instrumentation for Pydantic AI agents. From observability/logfire_config.py:28-57:
class LogfireConfig:
    """
    Logfire configuration singleton.
    
    Ensures Logfire is initialized only once and provides
    graceful degradation when Logfire is not available.
    """
    
    _initialized = False
    
    @classmethod
    def initialize(cls, token: Optional[str] = None) -> None:
        """
        Initialize Logfire with project token.
        
        Args:
            token: Logfire project token (or set LOGFIRE_TOKEN env var)
        """
        if cls._initialized:
            return
        
        token = token or os.getenv("LOGFIRE_TOKEN")
        if not token:
            raise ValueError("LOGFIRE_TOKEN environment variable must be provided.")
        
        logfire.configure(
            token=token,
            service_name="scribe-pipeline",
            send_to_logfire=True,
        )
        
        # Enable automatic instrumentation for pydantic-ai agents
        # This logs all LLM calls with inputs, outputs, tokens, cost, and latency
        logfire.instrument_pydantic_ai()
        
        cls._initialized = True
Environment variables:
# Required
LOGFIRE_TOKEN=your_logfire_project_token

# Optional (defaults)
ENVIRONMENT=production  # Used for filtering in Logfire dashboard

Pydantic AI Instrumentation

The logfire.instrument_pydantic_ai() call automatically logs:
  • All LLM API calls (Anthropic, OpenAI, etc.)
  • Request parameters (model, temperature, max_tokens)
  • Input prompts and output completions
  • Token usage (prompt tokens, completion tokens, total)
  • Estimated cost per call
  • Latency (time to first token, total duration)
  • Retries and errors
You don’t need to manually log LLM calls - Pydantic AI instrumentation handles it automatically for all agents created with create_agent().

Distributed Tracing

Span Hierarchy

The pipeline creates nested spans that trace execution from Celery task → Pipeline → Steps → LLM calls.
tasks.generate_email (root span)
├─ pipeline.full_run
│  ├─ pipeline.template_parser
│  │  ├─ pydantic_ai.agent.run
│  │  │  └─ anthropic.completion (auto-instrumented)
│  │  └─ validation
│  ├─ pipeline.web_scraper
│  │  ├─ exa.dual_answer
│  │  └─ formatting
│  ├─ pipeline.arxiv_helper
│  │  └─ arxiv.search
│  └─ pipeline.email_composer
│     ├─ pydantic_ai.agent.run
│     │  └─ anthropic.completion (auto-instrumented)
│     └─ database.write

Creating Spans

Root Span (Celery Task)

From tasks/email_tasks.py:215-226:
with logfire.span(
    "tasks.generate_email",
    task_id=public_task_id,
    celery_id=celery_request_id,
    user_id=user_id,
):
    logfire.info(
        "Email generation task started",
        task_id=public_task_id,
        celery_id=celery_request_id,
        user_id=user_id,
    )
    # Execute pipeline...

Pipeline Runner Span

From runner.py:224-229:
with logfire.span(
    "pipeline.full_run",
    task_id=pipeline_data.task_id,
    user_id=pipeline_data.user_id,
    template_type=pipeline_data.template_type.value if pipeline_data.template_type else None
):
    logfire.info(
        "Pipeline execution started",
        task_id=pipeline_data.task_id,
        total_steps=len(self.steps)
    )
    # Execute steps...

Step Execution Span

From runner.py:66-76:
with logfire.span(
    f"pipeline.{self.step_name}",
    task_id=pipeline_data.task_id,
    step=self.step_name
):
    try:
        logfire.info(
            f"{self.step_name} started",
            task_id=pipeline_data.task_id
        )
        # Execute step logic...
Spans automatically inherit context from parent spans, enabling distributed tracing across the entire pipeline execution.

Structured Logging

Log Levels

Use appropriate log levels to categorize events:
  • logfire.debug() - Verbose diagnostic information
  • logfire.info() - Normal operational events
  • logfire.warning() - Non-fatal issues that should be monitored
  • logfire.error() - Fatal errors that stop execution

Logging Best Practices

1. Include Task Context

# ✅ Good: Always include task_id for correlation
logfire.info(
    "Template parser completed",
    task_id=pipeline_data.task_id,
    template_type=pipeline_data.template_type.value,
    search_terms_count=len(pipeline_data.search_terms)
)
# ❌ Bad: Missing context
logfire.info("Template parser completed")

2. Log Step Progress

From runner.py:238-244:
for i, step in enumerate(self.steps):
    # Log progress
    progress_pct = int(((i + 1) / len(self.steps)) * 100)
    logfire.info(
        f"Executing step {i+1}/{len(self.steps)}",
        step=step.step_name,
        progress_pct=progress_pct
    )

3. Log Timing Metrics

From runner.py:90-104:
# Calculate duration
duration = time.perf_counter() - start_time
pipeline_data.add_timing(self.step_name, duration)

# Update result metadata
if result.metadata is None:
    result.metadata = {}
result.metadata["duration"] = duration

# Log success with timing
logfire.info(
    f"{self.step_name} completed",
    task_id=pipeline_data.task_id,
    duration=duration,
    success=result.success
)

4. Log Errors with Full Context

From runner.py:114-126:
except Exception as e:
    duration = time.perf_counter() - start_time
    
    logfire.error(
        f"{self.step_name} failed",
        task_id=pipeline_data.task_id,
        error=str(e),
        error_type=type(e).__name__,
        duration=duration,
        exc_info=True  # Include full stack trace
    )
    
    pipeline_data.add_error(self.step_name, str(e))
    raise StepExecutionError(self.step_name, e) from e
Always include exc_info=True when logging exceptions to capture full stack traces in Logfire.

LLM Call Tracking

Pydantic AI instrumentation automatically tracks all LLM interactions. You can add additional context:

Template Parser Example

From email_composer/main.py:75-83:
logfire.info(
    "Generating email with LLM",
    model=self.model,
    temperature=self.temperature,
    recipient_name=pipeline_data.recipient_name
)

# Agent run is auto-instrumented
result = await self.composition_agent.run(user_prompt)

logfire.info(
    "Email generated successfully",
    word_count=len(email_text.split()),
    length=len(email_text),
    is_confident=is_confident
)
Automatic instrumentation captures:
  • Model used (e.g., anthropic:claude-sonnet-4-5)
  • Temperature and max_tokens settings
  • Full prompt text
  • Full completion text
  • Tokens: {"prompt": 1234, "completion": 567, "total": 1801}
  • Cost: $0.0225 (estimated based on model pricing)
  • Latency: 3.1s
  • Any retries or errors

Viewing LLM Calls in Logfire

In the Logfire dashboard:
  1. Filter by span name: pydantic_ai.agent.run or anthropic.completion
  2. View token usage: Check the tokens field
  3. Analyze costs: Use the cost field for budget tracking
  4. Debug prompts: Inspect input and output fields
  5. Identify slow calls: Sort by duration

Pipeline Completion Logging

Final Summary

From runner.py:265-272:
total_duration = pipeline_data.total_duration()
logfire.info(
    "Pipeline execution completed",
    task_id=pipeline_data.task_id,
    email_id=email_id,
    total_duration=total_duration,
    step_timings=pipeline_data.step_timings
)
Example log output:
{
  "message": "Pipeline execution completed",
  "task_id": "abc123-def456",
  "email_id": "550e8400-e29b-41d4-a716-446655440000",
  "total_duration": 10.4,
  "step_timings": {
    "template_parser": 1.2,
    "web_scraper": 5.3,
    "arxiv_helper": 0.8,
    "email_composer": 3.1
  }
}

Performance Monitoring

Step Timings

Each step records its execution time in PipelineData.step_timings. Helper method from core.py:144-146:
def add_timing(self, step_name: str, duration: float) -> None:
    """Record step timing"""
    self.step_timings[step_name] = duration
Usage:
start_time = time.perf_counter()
result = await self._execute_step(pipeline_data)
duration = time.perf_counter() - start_time
pipeline_data.add_timing(self.step_name, duration)

Total Pipeline Duration

Helper method from core.py:140-142:
def total_duration(self) -> float:
    """Calculate total pipeline execution time in seconds"""
    return (datetime.utcnow() - self.started_at).total_seconds()

Metrics to Monitor

MetricSourceThreshold
Step durationstep_timings>10s per step
Total pipeline durationtotal_duration()>30s total
LLM token usageAuto-instrumented>5000 tokens/email
LLM cost per emailAuto-instrumented>$0.05/email
Error rateError spans>5% failure rate
Retry rateCelery retries>10% retry rate

Queue Status Updates

The pipeline updates queue status throughout execution for real-time progress tracking. From email_tasks.py:62-122:
def update_queue_status(
    status: str,
    current_step: Optional[str] = None,
    email_id: Optional[str] = None,
    error_message: Optional[str] = None,
) -> None:
    """Update queue item status in the database."""
    if not queue_item_id:
        return
    
    try:
        with get_db_context() as db:
            item = db.query(QueueItem).filter(
                QueueItem.id == queue_item_id
            ).first()
            
            if not item:
                logfire.warning(
                    "Queue item not found for status update",
                    queue_item_id=queue_item_id
                )
                return
            
            item.status = status
            if current_step:
                item.current_step = current_step
            
            # Set started_at when transitioning to PROCESSING
            if status == QueueStatus.PROCESSING and not item.started_at:
                item.started_at = datetime.now(timezone.utc)
            
            # Set completed_at when finishing
            if status in (QueueStatus.COMPLETED, QueueStatus.FAILED):
                item.completed_at = datetime.now(timezone.utc)
            
            if email_id:
                item.email_id = email_id
            
            if error_message:
                # Truncate error message to 1000 characters
                MAX_ERROR_LENGTH = 1000
                if len(error_message) > MAX_ERROR_LENGTH:
                    item.error_message = error_message[:MAX_ERROR_LENGTH] + "... [truncated]"
                else:
                    item.error_message = error_message
            
            db.commit()
            
            logfire.debug(
                "Queue item status updated",
                queue_item_id=queue_item_id,
                status=status,
                current_step=current_step
            )
    except Exception as e:
        logfire.error(
            "Failed to update queue item status",
            queue_item_id=queue_item_id,
            error=str(e)
        )
Status transitions:
PENDING → PROCESSING → COMPLETED
                     ↘ FAILED

Progress Callbacks

The pipeline supports optional progress callbacks for real-time status updates. From runner.py:200-210:
async def run(
    self,
    pipeline_data: PipelineData,
    progress_callback: Optional[Callable[[str, str], Awaitable[None]]] = None
) -> str:
    """
    Run all pipeline steps sequentially.
    
    Args:
        pipeline_data: Shared data object
        progress_callback: Optional callback for progress updates
                          Signature: callback(step_name, status)
    """
Callback usage in steps (runner.py:78-80, 107-110):
# Notify progress callback (if provided)
if progress_callback:
    await progress_callback(self.step_name, "started")

# ... execute step ...

if progress_callback:
    status = "completed" if result.success else "failed"
    await progress_callback(self.step_name, status)
Celery integration (email_tasks.py:200-210):
async def progress_callback(step_name: str, step_status: str) -> None:
    _update_status(
        JobStatus.RUNNING,
        {
            "current_step": step_name,
            "step_status": step_status,
            "step_timings": pipeline_data.step_timings,
        },
    )
    # Also update database queue status
    update_queue_status(QueueStatus.PROCESSING, current_step=step_name)

Debugging with Logfire

Finding a Specific Execution

  1. Search by task_id:
    • Filter: task_id = "abc123-def456"
    • View entire trace tree from Celery task to completion
  2. Search by user_id:
    • Filter: user_id = "550e8400-..."
    • View all pipeline executions for a user
  3. Search by error:
    • Filter: level = "error" or error_type = "StepExecutionError"
    • Find failed executions

Analyzing Performance Issues

  1. Find slow steps:
    Filter: span_name = "pipeline.*" AND duration > 10
    Sort by: duration DESC
    
  2. Find expensive LLM calls:
    Filter: span_name = "anthropic.completion"
    Sort by: cost DESC
    
  3. Find token-heavy calls:
    Filter: tokens.total > 5000
    Sort by: tokens.total DESC
    

Viewing Pipeline Metrics

In Logfire dashboard:
  1. Create a chart for avg pipeline duration:
    • Metric: avg(duration) where span_name = "pipeline.full_run"
    • Group by: hour or day
  2. Track error rates:
    • Metric: count(*) where level = "error"
    • Group by: step field
  3. Monitor LLM costs:
    • Metric: sum(cost) where span_name LIKE "anthropic.%"
    • Group by: model

Best Practices

1. Always Include task_id

# ✅ Good: Enables distributed tracing
logfire.info(
    "Processing started",
    task_id=pipeline_data.task_id,
    step=self.step_name
)
# ❌ Bad: Cannot correlate with other logs
logfire.info("Processing started")

2. Use Spans for Long Operations

# ✅ Good: Measure specific operation
with logfire.span(
    "database.write",
    task_id=pipeline_data.task_id,
    email_id=email_id
):
    await write_email_to_db(...)

3. Log Both Success and Failure

# ✅ Good: Complete audit trail
logfire.info("Starting email composition", task_id=task_id)

try:
    email = await compose_email(...)
    logfire.info(
        "Email composition succeeded",
        task_id=task_id,
        word_count=len(email.split())
    )
except Exception as e:
    logfire.error(
        "Email composition failed",
        task_id=task_id,
        error=str(e),
        exc_info=True
    )
    raise

4. Use Structured Fields

# ✅ Good: Structured fields for filtering
logfire.info(
    "Step completed",
    step_name="web_scraper",
    duration=5.3,
    urls_scraped=3,
    urls_failed=2
)
# ❌ Bad: Unstructured message
logfire.info("web_scraper completed in 5.3s, scraped 3 URLs, 2 failed")

5. Log Metadata Changes

# ✅ Good: Track state transitions
logfire.debug(
    "Queue item status updated",
    queue_item_id=queue_item_id,
    old_status="PENDING",
    new_status="PROCESSING",
    current_step="web_scraper"
)

Build docs developers (and LLMs) love