The Scribe pipeline uses Logfire for comprehensive observability: structured logging, distributed tracing, LLM call tracking, and real-time monitoring.
Logfire Configuration
Initialization
Logfire is initialized once at application startup with automatic instrumentation for Pydantic AI agents.
From observability/logfire_config.py:28-57:
class LogfireConfig:
"""
Logfire configuration singleton.
Ensures Logfire is initialized only once and provides
graceful degradation when Logfire is not available.
"""
_initialized = False
@classmethod
def initialize(cls, token: Optional[str] = None) -> None:
"""
Initialize Logfire with project token.
Args:
token: Logfire project token (or set LOGFIRE_TOKEN env var)
"""
if cls._initialized:
return
token = token or os.getenv("LOGFIRE_TOKEN")
if not token:
raise ValueError("LOGFIRE_TOKEN environment variable must be provided.")
logfire.configure(
token=token,
service_name="scribe-pipeline",
send_to_logfire=True,
)
# Enable automatic instrumentation for pydantic-ai agents
# This logs all LLM calls with inputs, outputs, tokens, cost, and latency
logfire.instrument_pydantic_ai()
cls._initialized = True
Environment variables:
# Required
LOGFIRE_TOKEN=your_logfire_project_token
# Optional (defaults)
ENVIRONMENT=production # Used for filtering in Logfire dashboard
Pydantic AI Instrumentation
The logfire.instrument_pydantic_ai() call automatically logs:
- All LLM API calls (Anthropic, OpenAI, etc.)
- Request parameters (model, temperature, max_tokens)
- Input prompts and output completions
- Token usage (prompt tokens, completion tokens, total)
- Estimated cost per call
- Latency (time to first token, total duration)
- Retries and errors
You don’t need to manually log LLM calls - Pydantic AI instrumentation handles it automatically for all agents created with create_agent().
Distributed Tracing
Span Hierarchy
The pipeline creates nested spans that trace execution from Celery task → Pipeline → Steps → LLM calls.
tasks.generate_email (root span)
├─ pipeline.full_run
│ ├─ pipeline.template_parser
│ │ ├─ pydantic_ai.agent.run
│ │ │ └─ anthropic.completion (auto-instrumented)
│ │ └─ validation
│ ├─ pipeline.web_scraper
│ │ ├─ exa.dual_answer
│ │ └─ formatting
│ ├─ pipeline.arxiv_helper
│ │ └─ arxiv.search
│ └─ pipeline.email_composer
│ ├─ pydantic_ai.agent.run
│ │ └─ anthropic.completion (auto-instrumented)
│ └─ database.write
Creating Spans
Root Span (Celery Task)
From tasks/email_tasks.py:215-226:
with logfire.span(
"tasks.generate_email",
task_id=public_task_id,
celery_id=celery_request_id,
user_id=user_id,
):
logfire.info(
"Email generation task started",
task_id=public_task_id,
celery_id=celery_request_id,
user_id=user_id,
)
# Execute pipeline...
Pipeline Runner Span
From runner.py:224-229:
with logfire.span(
"pipeline.full_run",
task_id=pipeline_data.task_id,
user_id=pipeline_data.user_id,
template_type=pipeline_data.template_type.value if pipeline_data.template_type else None
):
logfire.info(
"Pipeline execution started",
task_id=pipeline_data.task_id,
total_steps=len(self.steps)
)
# Execute steps...
Step Execution Span
From runner.py:66-76:
with logfire.span(
f"pipeline.{self.step_name}",
task_id=pipeline_data.task_id,
step=self.step_name
):
try:
logfire.info(
f"{self.step_name} started",
task_id=pipeline_data.task_id
)
# Execute step logic...
Spans automatically inherit context from parent spans, enabling distributed tracing across the entire pipeline execution.
Structured Logging
Log Levels
Use appropriate log levels to categorize events:
logfire.debug() - Verbose diagnostic information
logfire.info() - Normal operational events
logfire.warning() - Non-fatal issues that should be monitored
logfire.error() - Fatal errors that stop execution
Logging Best Practices
1. Include Task Context
# ✅ Good: Always include task_id for correlation
logfire.info(
"Template parser completed",
task_id=pipeline_data.task_id,
template_type=pipeline_data.template_type.value,
search_terms_count=len(pipeline_data.search_terms)
)
# ❌ Bad: Missing context
logfire.info("Template parser completed")
2. Log Step Progress
From runner.py:238-244:
for i, step in enumerate(self.steps):
# Log progress
progress_pct = int(((i + 1) / len(self.steps)) * 100)
logfire.info(
f"Executing step {i+1}/{len(self.steps)}",
step=step.step_name,
progress_pct=progress_pct
)
3. Log Timing Metrics
From runner.py:90-104:
# Calculate duration
duration = time.perf_counter() - start_time
pipeline_data.add_timing(self.step_name, duration)
# Update result metadata
if result.metadata is None:
result.metadata = {}
result.metadata["duration"] = duration
# Log success with timing
logfire.info(
f"{self.step_name} completed",
task_id=pipeline_data.task_id,
duration=duration,
success=result.success
)
4. Log Errors with Full Context
From runner.py:114-126:
except Exception as e:
duration = time.perf_counter() - start_time
logfire.error(
f"{self.step_name} failed",
task_id=pipeline_data.task_id,
error=str(e),
error_type=type(e).__name__,
duration=duration,
exc_info=True # Include full stack trace
)
pipeline_data.add_error(self.step_name, str(e))
raise StepExecutionError(self.step_name, e) from e
Always include exc_info=True when logging exceptions to capture full stack traces in Logfire.
LLM Call Tracking
Pydantic AI instrumentation automatically tracks all LLM interactions. You can add additional context:
Template Parser Example
From email_composer/main.py:75-83:
logfire.info(
"Generating email with LLM",
model=self.model,
temperature=self.temperature,
recipient_name=pipeline_data.recipient_name
)
# Agent run is auto-instrumented
result = await self.composition_agent.run(user_prompt)
logfire.info(
"Email generated successfully",
word_count=len(email_text.split()),
length=len(email_text),
is_confident=is_confident
)
Automatic instrumentation captures:
- Model used (e.g.,
anthropic:claude-sonnet-4-5)
- Temperature and max_tokens settings
- Full prompt text
- Full completion text
- Tokens:
{"prompt": 1234, "completion": 567, "total": 1801}
- Cost:
$0.0225 (estimated based on model pricing)
- Latency:
3.1s
- Any retries or errors
Viewing LLM Calls in Logfire
In the Logfire dashboard:
- Filter by span name:
pydantic_ai.agent.run or anthropic.completion
- View token usage: Check the
tokens field
- Analyze costs: Use the
cost field for budget tracking
- Debug prompts: Inspect
input and output fields
- Identify slow calls: Sort by
duration
Pipeline Completion Logging
Final Summary
From runner.py:265-272:
total_duration = pipeline_data.total_duration()
logfire.info(
"Pipeline execution completed",
task_id=pipeline_data.task_id,
email_id=email_id,
total_duration=total_duration,
step_timings=pipeline_data.step_timings
)
Example log output:
{
"message": "Pipeline execution completed",
"task_id": "abc123-def456",
"email_id": "550e8400-e29b-41d4-a716-446655440000",
"total_duration": 10.4,
"step_timings": {
"template_parser": 1.2,
"web_scraper": 5.3,
"arxiv_helper": 0.8,
"email_composer": 3.1
}
}
Step Timings
Each step records its execution time in PipelineData.step_timings.
Helper method from core.py:144-146:
def add_timing(self, step_name: str, duration: float) -> None:
"""Record step timing"""
self.step_timings[step_name] = duration
Usage:
start_time = time.perf_counter()
result = await self._execute_step(pipeline_data)
duration = time.perf_counter() - start_time
pipeline_data.add_timing(self.step_name, duration)
Total Pipeline Duration
Helper method from core.py:140-142:
def total_duration(self) -> float:
"""Calculate total pipeline execution time in seconds"""
return (datetime.utcnow() - self.started_at).total_seconds()
Metrics to Monitor
| Metric | Source | Threshold |
|---|
| Step duration | step_timings | >10s per step |
| Total pipeline duration | total_duration() | >30s total |
| LLM token usage | Auto-instrumented | >5000 tokens/email |
| LLM cost per email | Auto-instrumented | >$0.05/email |
| Error rate | Error spans | >5% failure rate |
| Retry rate | Celery retries | >10% retry rate |
Queue Status Updates
The pipeline updates queue status throughout execution for real-time progress tracking.
From email_tasks.py:62-122:
def update_queue_status(
status: str,
current_step: Optional[str] = None,
email_id: Optional[str] = None,
error_message: Optional[str] = None,
) -> None:
"""Update queue item status in the database."""
if not queue_item_id:
return
try:
with get_db_context() as db:
item = db.query(QueueItem).filter(
QueueItem.id == queue_item_id
).first()
if not item:
logfire.warning(
"Queue item not found for status update",
queue_item_id=queue_item_id
)
return
item.status = status
if current_step:
item.current_step = current_step
# Set started_at when transitioning to PROCESSING
if status == QueueStatus.PROCESSING and not item.started_at:
item.started_at = datetime.now(timezone.utc)
# Set completed_at when finishing
if status in (QueueStatus.COMPLETED, QueueStatus.FAILED):
item.completed_at = datetime.now(timezone.utc)
if email_id:
item.email_id = email_id
if error_message:
# Truncate error message to 1000 characters
MAX_ERROR_LENGTH = 1000
if len(error_message) > MAX_ERROR_LENGTH:
item.error_message = error_message[:MAX_ERROR_LENGTH] + "... [truncated]"
else:
item.error_message = error_message
db.commit()
logfire.debug(
"Queue item status updated",
queue_item_id=queue_item_id,
status=status,
current_step=current_step
)
except Exception as e:
logfire.error(
"Failed to update queue item status",
queue_item_id=queue_item_id,
error=str(e)
)
Status transitions:
PENDING → PROCESSING → COMPLETED
↘ FAILED
Progress Callbacks
The pipeline supports optional progress callbacks for real-time status updates.
From runner.py:200-210:
async def run(
self,
pipeline_data: PipelineData,
progress_callback: Optional[Callable[[str, str], Awaitable[None]]] = None
) -> str:
"""
Run all pipeline steps sequentially.
Args:
pipeline_data: Shared data object
progress_callback: Optional callback for progress updates
Signature: callback(step_name, status)
"""
Callback usage in steps (runner.py:78-80, 107-110):
# Notify progress callback (if provided)
if progress_callback:
await progress_callback(self.step_name, "started")
# ... execute step ...
if progress_callback:
status = "completed" if result.success else "failed"
await progress_callback(self.step_name, status)
Celery integration (email_tasks.py:200-210):
async def progress_callback(step_name: str, step_status: str) -> None:
_update_status(
JobStatus.RUNNING,
{
"current_step": step_name,
"step_status": step_status,
"step_timings": pipeline_data.step_timings,
},
)
# Also update database queue status
update_queue_status(QueueStatus.PROCESSING, current_step=step_name)
Debugging with Logfire
Finding a Specific Execution
-
Search by task_id:
- Filter:
task_id = "abc123-def456"
- View entire trace tree from Celery task to completion
-
Search by user_id:
- Filter:
user_id = "550e8400-..."
- View all pipeline executions for a user
-
Search by error:
- Filter:
level = "error" or error_type = "StepExecutionError"
- Find failed executions
-
Find slow steps:
Filter: span_name = "pipeline.*" AND duration > 10
Sort by: duration DESC
-
Find expensive LLM calls:
Filter: span_name = "anthropic.completion"
Sort by: cost DESC
-
Find token-heavy calls:
Filter: tokens.total > 5000
Sort by: tokens.total DESC
Viewing Pipeline Metrics
In Logfire dashboard:
-
Create a chart for avg pipeline duration:
- Metric:
avg(duration) where span_name = "pipeline.full_run"
- Group by:
hour or day
-
Track error rates:
- Metric:
count(*) where level = "error"
- Group by:
step field
-
Monitor LLM costs:
- Metric:
sum(cost) where span_name LIKE "anthropic.%"
- Group by:
model
Best Practices
1. Always Include task_id
# ✅ Good: Enables distributed tracing
logfire.info(
"Processing started",
task_id=pipeline_data.task_id,
step=self.step_name
)
# ❌ Bad: Cannot correlate with other logs
logfire.info("Processing started")
2. Use Spans for Long Operations
# ✅ Good: Measure specific operation
with logfire.span(
"database.write",
task_id=pipeline_data.task_id,
email_id=email_id
):
await write_email_to_db(...)
3. Log Both Success and Failure
# ✅ Good: Complete audit trail
logfire.info("Starting email composition", task_id=task_id)
try:
email = await compose_email(...)
logfire.info(
"Email composition succeeded",
task_id=task_id,
word_count=len(email.split())
)
except Exception as e:
logfire.error(
"Email composition failed",
task_id=task_id,
error=str(e),
exc_info=True
)
raise
4. Use Structured Fields
# ✅ Good: Structured fields for filtering
logfire.info(
"Step completed",
step_name="web_scraper",
duration=5.3,
urls_scraped=3,
urls_failed=2
)
# ❌ Bad: Unstructured message
logfire.info("web_scraper completed in 5.3s, scraped 3 URLs, 2 failed")
# ✅ Good: Track state transitions
logfire.debug(
"Queue item status updated",
queue_item_id=queue_item_id,
old_status="PENDING",
new_status="PROCESSING",
current_step="web_scraper"
)