Skip to main content
This guide covers common issues and their solutions based on the actual implementation.

Common Errors

API Authentication Errors

Error Message:
ValueError: GEMINI_API_KEY environment variable is required
ValueError: TAVILY_API_KEY environment variable is required
ValueError: NEWS_API_KEY environment variable is required
Solution:
1

Create .env File

Add all required API keys:
.env
GEMINI_API_KEY=your_gemini_api_key_here
TAVILY_API_KEY=your_tavily_api_key_here
NEWS_API_KEY=your_news_api_key_here
2

Verify Environment Loading

Check that dotenv is loading:
import os
from dotenv import load_dotenv

load_dotenv()
print(os.getenv("GEMINI_API_KEY"))  # Should not be None
3

Get API Keys

Error Message:
Tavily search failed: 401 Unauthorized
NewsAPI search failed: 401 Unauthorized
Diagnosis:
# Test each API key individually
from tavily import TavilyClient
from newsapi import NewsApiClient
import google.generativeai as genai

# Test Tavily
try:
    client = TavilyClient(api_key="your_key")
    result = client.search("test")
    print("✓ Tavily key valid")
except Exception as e:
    print(f"✗ Tavily error: {e}")

# Test NewsAPI
try:
    client = NewsApiClient(api_key="your_key")
    result = client.get_everything(q="test", page_size=1)
    print("✓ NewsAPI key valid")
except Exception as e:
    print(f"✗ NewsAPI error: {e}")

# Test Gemini
try:
    genai.configure(api_key="your_key")
    model = genai.GenerativeModel('gemini-2.5-flash-lite')
    response = model.generate_content("test")
    print("✓ Gemini key valid")
except Exception as e:
    print(f"✗ Gemini error: {e}")
Solution: Regenerate invalid API keys from provider dashboards.
Error Message:
Rate limit hit for tavily API call. Attempt 1/3. Retrying in 1.00s
Rate limit hit for newsapi API call. Attempt 2/3. Retrying in 2.00s
How Rate Limiting Works:The engine uses token bucket rate limiting with automatic retries:
backend/app/decorators/api_rate_limiter.py
async def with_retry(self, operation: Callable, context: str) -> Any:
    attempt = 1
    while attempt <= config.max_retry_attempts:  # 3 attempts
        try:
            if self._is_rate_limited():
                wait_time = self._calculate_wait_time()
                await self._delay(wait_time)
            
            return await operation()
        
        except Exception as error:
            if self._is_rate_limit_error(error):
                delay = self._calculate_delay(attempt)  # Exponential backoff
                await self._delay(delay)
                attempt += 1
                continue
            raise error
Solutions:
Lower max_parallel_searches in request:
{
  "max_parallel_searches": 10,  // Reduced from 20
  "search_depth": "quick"
}

Circuit Breaker Issues

Understanding Circuit States

Circuit breakers protect against cascading failures:
backend/app/core/circuit_breaker.py
class CircuitState(str, Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Source failing, blocking requests
    HALF_OPEN = "half_open" # Testing if source recovered
Symptom: Source returns error="circuit open" for all requestsDiagnosis:
# Check circuit breaker states
from app.services.pipeline import ResearchPipeline

for channel, breaker in pipeline.breakers.items():
    print(f"{channel}: {breaker.state} (failures: {breaker._failure_count})")

# Output:
# google_search: closed (failures: 0)
# news_search: open (failures: 5)  ← Problem!
# jobs_search: closed (failures: 1)
Root Causes:
  1. API Service Down: External API is experiencing outages
  2. Rate Limiting: API rejecting too many requests
  3. Network Issues: Timeouts or connection errors
  4. Invalid Configuration: Wrong API endpoint or credentials
Solutions:
1

Wait for Reset

Circuit automatically transitions to HALF_OPEN after reset_timeout_seconds:
if (time.monotonic() - self._opened_at) >= self.reset_timeout_seconds:
    self._state = CircuitState.HALF_OPEN  # Allow test request
Default timeout: 30 seconds
2

Lower Failure Threshold

Make circuit breaker less sensitive:
backend/app/core/config.py
return Settings(
    circuit_breaker_failures=10,  # Increased from 5
    circuit_breaker_reset_seconds=15.0,  # Reduced from 30
)
3

Fix Underlying Issue

Check source-specific logs:
# Add detailed error logging
logger.error(f"Source {channel} failed: {result.error}")
Symptom: Circuits constantly opening and closingSolution: Increase failure threshold or improve error handling:
# Option 1: Increase threshold
circuit_breaker_failures=10  # More tolerant

# Option 2: Improve source error handling
try:
    response = await self.client.search(query)
except httpx.TimeoutException:
    # Don't count timeouts as failures
    return SourceResult(..., ok=True, evidences=[])
except httpx.HTTPStatusError as e:
    if e.response.status_code in [429, 503]:  # Temporary errors
        # Don't fail circuit for rate limits
        return SourceResult(..., ok=True, evidences=[])
    raise  # Fail circuit for other errors

Redis Connection Issues

Error Message:
redis.exceptions.ConnectionError: Error connecting to localhost:6379
Solutions:
1

Start Redis Server

# macOS
brew services start redis

# Linux
sudo systemctl start redis

# Docker
docker run -d -p 6379:6379 redis:latest
2

Verify Redis is Running

redis-cli ping
# Expected output: PONG
3

Check Redis Configuration

backend/app/db/redis_cache.py
def __init__(self):
    self.client = redis.Redis(
        host='localhost',  # Change if Redis is remote
        port=6379,
        db=0,
        decode_responses=True
    )
Symptom: Duplicate evidence appearing despite Redis cachingDiagnosis:
# Check cache contents
redis-cli
> KEYS *
> SMEMBERS urls:stripe.com
> SMEMBERS titles:stripe.com
Common Causes:
  1. Cache Cleared Too Frequently:
# This clears cache before EVERY domain analysis
await asyncio.to_thread(redis_client.clear_cache, domain)
  1. Evidence Object Mismatch:
# URL must be exactly the same for cache hit
evidence1 = Evidence(url="https://stripe.com")
evidence2 = Evidence(url="https://stripe.com/")  # Trailing slash!
# These won't match in cache
Solution: Normalize URLs before caching:
def normalize_url(url: str) -> str:
    return url.rstrip('/').lower()

evidence = Evidence(
    url=normalize_url(result.get("url")),
    title=result.get("title"),
    snippet=result.get("snippet")
)

Performance Issues

Symptoms: Research taking >10 seconds for small batchesDiagnostic Steps:
1

Add Timing Instrumentation

import time

async def _execute_one(self, domain, strategy, search_depth):
    start = time.time()
    result = await source.fetch(domain, query, search_depth)
    elapsed = time.time() - start
    
    logger.info(f"{strategy.channel} for {domain}: {elapsed:.2f}s")
    return domain, result
2

Identify Slow Sources

Check output for bottlenecks:
google_search for stripe.com: 0.83s
jobs_search for stripe.com: 4.21s  ← Bottleneck!
news_search for stripe.com: 1.12s
3

Optimize Slow Sources

For TF-IDF job matching (often slowest):
backend/app/sources/jobs_search.py
# Run CPU-intensive TF-IDF in thread pool
return await asyncio.to_thread(_compute_similarities)

# Reduce max_features for faster computation
self._tfidf_vectorizer = TfidfVectorizer(
    max_features=1000,  # Reduced from 2000
    ngram_range=(1, 2),  # Reduced from (1, 3)
)
Performance Targets:
Batch SizeSearch DepthTarget Time
10 domainsquick2-4 seconds
10 domainsstandard3-5 seconds
50 domainsstandard8-12 seconds
50 domainscomprehensive15-20 seconds
Symptom: Python process using >2GB RAMCommon Causes:
  1. TF-IDF Vectorizer Cache: Vectorizer stores large matrices in memory
  2. Redis Connection Pool: Many connections open simultaneously
  3. Evidence Accumulation: Thousands of evidence objects in memory
Solutions:
# 1. Limit evidence per domain
MAX_EVIDENCE_PER_DOMAIN = 100
if len(domain_to_evidence[domain]) >= MAX_EVIDENCE_PER_DOMAIN:
    continue  # Skip additional evidence

# 2. Clear TF-IDF cache periodically
self._tfidf_vectorizer = None  # Force recreation

# 3. Process domains in batches
BATCH_SIZE = 10
for i in range(0, len(domains), BATCH_SIZE):
    batch = domains[i:i+BATCH_SIZE]
    results = await process_batch(batch)
    # Clear memory between batches
    gc.collect()
Symptom: Warning messages about slow async operations
WARNING: Executing <Task> took 2.341 seconds
Root Cause: Blocking operations in async functionsBad Examples:
# ✗ Blocking sync API call
response = self.sync_client.search(query)  # Blocks event loop!

# ✗ CPU-intensive computation
tfidf_matrix = vectorizer.fit_transform(texts)  # Blocks event loop!

# ✗ Sync Redis call
redis_client.sadd(f"urls:{domain}", url)  # Blocks event loop!
Good Examples:
# ✓ Run in thread pool
response = await asyncio.to_thread(
    self.sync_client.search, query
)

# ✓ CPU-intensive in thread pool
tfidf_matrix = await asyncio.to_thread(
    vectorizer.fit_transform, texts
)

# ✓ Async Redis wrapper
await asyncio.to_thread(
    redis_client.sadd, f"urls:{domain}", url
)

LLM Analysis Issues

Symptom: All companies returning confidence_score < 0.3Root Causes:
  1. Vague Research Goal: LLM can’t generate relevant keywords
  2. Poor Evidence Quality: Not enough relevant data from sources
  3. LLM Prompt Issues: Extraction strategy too restrictive
Solutions:
Before (Vague)
{
  "research_goal": "Find tech companies"
}
After (Specific)
{
  "research_goal": "Find B2B SaaS companies using AI/ML for fraud detection in fintech, with real-time transaction monitoring capabilities"
}
Error Messages:
Strategy generation failed: 429 Resource Exhausted
Evidence extraction failed: 500 Internal Server Error
Solutions:
1

Check Quota

Visit Google AI Studio to check your quota.
2

Reduce Gemini Calls

The extractor makes 2 LLM calls per research run:
  1. Generate extraction strategy (cached)
  2. Extract technologies from evidence (per domain)
Enable caching:
backend/app/services/extractor.py
# Strategy is cached by research_goal
if research_goal in self._strategy_cache:
    return self._strategy_cache[research_goal]
3

Use Smaller Model

Switch to faster, cheaper model:
self.keyword_model = genai.GenerativeModel(
    model_name='gemini-1.5-flash',  # Cheaper alternative
    system_instruction=keyword_system_prompt
)
Error Message:
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Cause: LLM returning malformed JSONSolution: The extractor has built-in JSON cleaning:
backend/app/services/extractor.py
response_text = response.text.strip()

# Clean up JSON response
if "```json" in response_text:
    response_text = response_text.split("```json")[1].split("```")[0]
elif "```" in response_text:
    response_text = response_text.split("```")[1].split("```")[0]

result = json.loads(response_text)
If errors persist, add more robust parsing:
import re

# Extract JSON from markdown or text
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
    result = json.loads(json_match.group())

Debugging Tools

Enable Debug Logging

backend/app/server.py
import logging

logging.basicConfig(
    level=logging.DEBUG,  # Changed from INFO
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Add loggers for specific modules
logging.getLogger('app.sources').setLevel(logging.DEBUG)
logging.getLogger('app.services').setLevel(logging.DEBUG)
logging.getLogger('app.decorators').setLevel(logging.DEBUG)

Monitor Pipeline Execution

from app.core import RunMetrics
import time

metrics = RunMetrics(start_time=time.perf_counter())

# After pipeline execution
metrics_dict = metrics.to_dict()
logger.info(f"Pipeline metrics: {metrics_dict}")
# Output: Pipeline metrics: {'queries_per_second': 45.2, 'failed_requests': 3}

Test Individual Components

test_sources.py
import asyncio
from app.sources import GoogleSearchSource, NewsSearchSource, JobsSearchSource

async def test_source(source, domain, query):
    result = await source.fetch(
        domain=domain,
        query=query,
        search_depth="quick"
    )
    
    print(f"\n{source.channel_name} Results:")
    print(f"  Success: {result.ok}")
    print(f"  Evidences: {len(result.evidences)}")
    if result.error:
        print(f"  Error: {result.error}")
    for evidence in result.evidences[:2]:
        print(f"  - {evidence.title}")

async def main():
    domain = "stripe.com"
    query = "payment processing API"
    
    await test_source(GoogleSearchSource(), domain, query)
    await test_source(NewsSearchSource(), domain, query)
    await test_source(JobsSearchSource(), domain, query)

if __name__ == "__main__":
    asyncio.run(main())

Getting Help

GitHub Issues

Report bugs and request features

Performance Tuning

Optimize for better performance

Custom Sources

Extend with custom data sources

API Reference

Complete API documentation

Error Code Reference

CodeMeaningCommon CauseSolution
401UnauthorizedInvalid API keyRegenerate API key
429Too Many RequestsRate limit exceededReduce max_parallel_searches
500Internal Server ErrorAPI service issueRetry or contact API support
503Service UnavailableAPI temporarily downWait and retry
ErrorStateAction
circuit openOPENWait for reset_timeout_seconds
unknown channelN/ACheck source is registered
circuit half_openHALF_OPENTesting recovery, retry

Build docs developers (and LLMs) love