Source Architecture
All data sources inherit fromBaseSource and implement a standardized interface:
backend/app/sources/base.py
class BaseSource:
channel_name: str
async def fetch(
self, domain: str, query: str, search_depth: str
) -> SourceResult:
raise NotImplementedError
SourceResult Structure
@dataclass
class SourceResult:
channel: str # Source identifier (e.g., "google_search")
domain: str # Company domain being researched
query: str # Search query executed
evidences: List[Evidence] # List of evidence found
ok: bool # Success/failure status
error: Optional[str] # Error message if ok=False
Building a Custom Source
Step 1: Create Source Class
Let’s build a custom source for LinkedIn company pages:Define the Source Class
Create a new file
backend/app/sources/linkedin_search.py:linkedin_search.py
import asyncio
import os
from typing import List
import httpx
from dotenv import load_dotenv
from app.decorators import rate_limited
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client
load_dotenv()
class LinkedInSearchSource(BaseSource):
"""Custom source for LinkedIn company intelligence."""
channel_name = "linkedin_search"
def __init__(self):
api_key = os.getenv("LINKEDIN_API_KEY")
if not api_key:
raise ValueError("LINKEDIN_API_KEY environment variable is required")
self.api_key = api_key
self.base_url = "https://api.linkedin.com/v2"
Implement the Fetch Method
Add rate limiting and fetch logic:
linkedin_search.py
@rate_limited("linkedin") # Apply rate limiting decorator
async def fetch(
self, domain: str, query: str, search_depth: str
) -> SourceResult:
"""Fetch company data from LinkedIn API."""
try:
# Extract company name from domain
company_name = domain.split('.')[0]
# Build API request
headers = {
"Authorization": f"Bearer {self.api_key}",
"X-Restli-Protocol-Version": "2.0.0"
}
async with httpx.AsyncClient() as client:
# Search for company
response = await client.get(
f"{self.base_url}/companies",
headers=headers,
params={
"q": "universalName",
"universalName": company_name
}
)
if response.status_code != 200:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"LinkedIn API returned {response.status_code}"
)
data = response.json()
evidences = await self._process_company_data(data, domain)
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=evidences,
ok=True
)
except Exception as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"LinkedIn search failed: {str(e)}"
)
Process API Response
Convert API data to Evidence objects:
linkedin_search.py
async def _process_company_data(
self, data: dict, domain: str
) -> List[Evidence]:
"""Convert LinkedIn company data to Evidence objects."""
evidences = []
elements = data.get("elements", [])
for company in elements:
# Extract company information
name = company.get("localizedName", "")
description = company.get("description", {}).get("localized", {}).get("en_US", "")
industries = company.get("industries", [])
specialties = company.get("specialties", [])
company_url = company.get("websiteUrl", "")
# Create evidence from company profile
evidence = Evidence(
url=company_url or f"https://linkedin.com/company/{domain.split('.')[0]}",
title=f"{name} - LinkedIn Company Profile",
snippet=f"{description}\n\nIndustries: {', '.join(industries)}\nSpecialties: {', '.join(specialties)}",
source_name=self.channel_name
)
# Check cache to avoid duplicates
is_cached = await asyncio.to_thread(
redis_client.is_evidence_cached, domain, evidence
)
if is_cached:
continue
# Add to cache
await asyncio.to_thread(
redis_client.add_evidence_to_cache, domain, evidence
)
evidences.append(evidence)
return evidences
Step 2: Register the Source
Add your custom source to the pipeline:from app.sources.base import BaseSource, SourceResult
from app.sources.google_search import GoogleSearchSource
from app.sources.jobs_search import JobsSearchSource
from app.sources.news_search import NewsSearchSource
from app.sources.linkedin_search import LinkedInSearchSource # Add import
__all__ = [
"BaseSource",
"SourceResult",
"GoogleSearchSource",
"JobsSearchSource",
"NewsSearchSource",
"LinkedInSearchSource", # Export custom source
]
Step 3: Configure Rate Limiting
Add rate limit configuration for your custom source:@dataclass(frozen=True)
class Settings:
max_parallel_searches: int
circuit_breaker_failures: int
circuit_breaker_reset_seconds: float
# API Rate Limits
tavily_rpm: int
gemini_rpm: int
newsapi_rpm: int
linkedin_rpm: int # Add new rate limit
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings(
max_parallel_searches=20,
circuit_breaker_failures=5,
circuit_breaker_reset_seconds=30.0,
tavily_rpm=500,
gemini_rpm=2000,
newsapi_rpm=300,
linkedin_rpm=100, # LinkedIn API limit
)
Real-World Examples
Example 1: Crunchbase API Source
Integrate funding and acquisition data:Crunchbase Source Implementation
Crunchbase Source Implementation
crunchbase_search.py
import asyncio
import os
from typing import List
import httpx
from dotenv import load_dotenv
from app.decorators import rate_limited
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client
load_dotenv()
class CrunchbaseSearchSource(BaseSource):
"""Crunchbase funding and company data source."""
channel_name = "crunchbase_search"
def __init__(self):
api_key = os.getenv("CRUNCHBASE_API_KEY")
if not api_key:
raise ValueError("CRUNCHBASE_API_KEY required")
self.api_key = api_key
self.base_url = "https://api.crunchbase.com/api/v4"
@rate_limited("crunchbase")
async def fetch(
self, domain: str, query: str, search_depth: str
) -> SourceResult:
try:
async with httpx.AsyncClient() as client:
# Search for organization
response = await client.post(
f"{self.base_url}/searches/organizations",
headers={"X-cb-user-key": self.api_key},
json={
"field_ids": [
"name", "short_description", "website",
"funding_total", "last_funding_type",
"categories", "investor_identifiers"
],
"query": [{"type": "predicate",
"field_id": "website_url",
"operator_id": "includes",
"values": [domain]}]
}
)
if response.status_code != 200:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"Crunchbase API error: {response.status_code}"
)
data = response.json()
evidences = await self._process_crunchbase_data(data, domain)
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=evidences,
ok=True
)
except Exception as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=str(e)
)
async def _process_crunchbase_data(
self, data: dict, domain: str
) -> List[Evidence]:
evidences = []
for entity in data.get("entities", []):
properties = entity.get("properties", {})
name = properties.get("name", "")
description = properties.get("short_description", "")
funding = properties.get("funding_total", {}).get("value_usd", 0)
categories = [cat.get("value") for cat in properties.get("categories", [])]
evidence = Evidence(
url=properties.get("website", {}).get("value", ""),
title=f"{name} - Crunchbase Profile",
snippet=f"{description}\n\nTotal Funding: ${funding:,}\nCategories: {', '.join(categories)}",
source_name=self.channel_name
)
# Deduplication check
is_cached = await asyncio.to_thread(
redis_client.is_evidence_cached, domain, evidence
)
if not is_cached:
await asyncio.to_thread(
redis_client.add_evidence_to_cache, domain, evidence
)
evidences.append(evidence)
return evidences
Example 2: Custom Database Source
Query internal company databases:Internal Database Source
Internal Database Source
internal_db_search.py
import asyncio
from typing import List
import asyncpg
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
class InternalDBSearchSource(BaseSource):
"""Query internal PostgreSQL database for company intelligence."""
channel_name = "internal_db_search"
def __init__(self):
self.db_pool = None
async def _get_pool(self):
"""Lazy initialize connection pool."""
if self.db_pool is None:
self.db_pool = await asyncpg.create_pool(
host="localhost",
database="company_intel",
user="postgres",
password="password",
min_size=5,
max_size=20
)
return self.db_pool
async def fetch(
self, domain: str, query: str, search_depth: str
) -> SourceResult:
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
# Query company interactions from CRM
rows = await conn.fetch("""
SELECT
c.name,
c.website,
i.interaction_type,
i.notes,
i.created_at
FROM companies c
JOIN interactions i ON c.id = i.company_id
WHERE c.domain = $1
ORDER BY i.created_at DESC
LIMIT 10
""", domain)
evidences = []
for row in rows:
evidence = Evidence(
url=row['website'],
title=f"{row['name']} - {row['interaction_type']}",
snippet=f"{row['notes']} (Date: {row['created_at']})",
source_name=self.channel_name
)
evidences.append(evidence)
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=evidences,
ok=True
)
except Exception as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=str(e)
)
Example 3: Web Scraping Source
Scrape company websites with Playwright:Web Scraping Source
Web Scraping Source
web_scraping_source.py
import asyncio
from typing import List
from playwright.async_api import async_playwright
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client
class WebScrapingSource(BaseSource):
"""Scrape company websites for technology signals."""
channel_name = "web_scraping"
async def fetch(
self, domain: str, query: str, search_depth: str
) -> SourceResult:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# Navigate to company homepage
await page.goto(f"https://{domain}", timeout=10000)
# Extract meta tags and page content
title = await page.title()
meta_description = await page.get_attribute(
'meta[name="description"]', 'content'
) or ""
# Look for technology indicators in page source
html_content = await page.content()
tech_signals = self._detect_technologies(html_content)
await browser.close()
# Create evidence from scraped data
evidence = Evidence(
url=f"https://{domain}",
title=title,
snippet=f"{meta_description}\n\nDetected Technologies: {', '.join(tech_signals)}",
source_name=self.channel_name
)
# Check cache
is_cached = await asyncio.to_thread(
redis_client.is_evidence_cached, domain, evidence
)
evidences = []
if not is_cached:
await asyncio.to_thread(
redis_client.add_evidence_to_cache, domain, evidence
)
evidences.append(evidence)
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=evidences,
ok=True
)
except Exception as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"Scraping failed: {str(e)}"
)
def _detect_technologies(self, html: str) -> List[str]:
"""Detect technologies from HTML content."""
signals = []
tech_patterns = {
"React": ["react", "_react"],
"Vue.js": ["vue.js", "__vue__"],
"Angular": ["ng-", "angular"],
"WordPress": ["wp-content", "wordpress"],
"Shopify": ["cdn.shopify.com"],
"Google Analytics": ["google-analytics.com", "gtag"],
}
html_lower = html.lower()
for tech, patterns in tech_patterns.items():
if any(pattern in html_lower for pattern in patterns):
signals.append(tech)
return signals
Best Practices
Error Handling
Error Handling
Always return a
SourceResult with ok=False on errors:try:
# API call logic
response = await client.get(...)
except httpx.TimeoutException as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"Request timeout: {str(e)}"
)
except Exception as e:
return SourceResult(
channel=self.channel_name,
domain=domain,
query=query,
evidences=[],
ok=False,
error=f"Unexpected error: {str(e)}"
)
Rate Limiting
Rate Limiting
Always use the Benefits:
@rate_limited decorator:@rate_limited("custom_api") # Match your API config
async def fetch(self, domain: str, query: str, search_depth: str):
# Automatic rate limiting and retry logic
pass
- Token bucket smoothing
- Automatic retries on rate limit errors
- Exponential backoff
Caching
Caching
Use Redis deduplication for all evidence:
# Check cache before adding
is_cached = await asyncio.to_thread(
redis_client.is_evidence_cached, domain, evidence
)
if is_cached:
continue
# Add to cache
await asyncio.to_thread(
redis_client.add_evidence_to_cache, domain, evidence
)
Async I/O
Async I/O
Use
asyncio.to_thread() for blocking operations:# For sync API clients
response = await asyncio.to_thread(
self.sync_client.search,
query=query
)
# For CPU-intensive tasks
results = await asyncio.to_thread(
self._process_large_dataset,
data
)
Testing Custom Sources
Unit Tests
test_linkedin_search.py
import pytest
from app.sources.linkedin_search import LinkedInSearchSource
@pytest.mark.asyncio
async def test_linkedin_fetch_success():
source = LinkedInSearchSource()
result = await source.fetch(
domain="stripe.com",
query="fintech payment processing",
search_depth="standard"
)
assert result.ok is True
assert result.channel == "linkedin_search"
assert len(result.evidences) > 0
@pytest.mark.asyncio
async def test_linkedin_fetch_error_handling():
source = LinkedInSearchSource()
# Test with invalid domain
result = await source.fetch(
domain="invalid-domain-12345.com",
query="test",
search_depth="quick"
)
# Should not crash, should return error result
assert result.ok is False
assert result.error is not None
Integration Tests
@pytest.mark.asyncio
async def test_custom_source_in_pipeline():
"""Test custom source integrates with pipeline."""
from app.services.pipeline import ResearchPipeline
from app.core import RunMetrics
import time
pipeline = ResearchPipeline(
run_id="test-run",
research_goal="Find fintech companies",
search_depth="quick",
company_domains=["stripe.com"],
strategies=[
QueryStrategy(channel="linkedin_search", strategy_type="company_profile")
],
max_parallel_searches=5,
confidence_threshold=0.7,
metrics=RunMetrics(start_time=time.perf_counter())
)
results, total_planned = await pipeline.run()
assert len(results) == 1
assert results[0].domain == "stripe.com"
# Verify LinkedIn source contributed evidence
linkedin_evidence = [
e for e in results[0].findings.evidence
if e.source_name == "linkedin_search"
]
assert len(linkedin_evidence) > 0
Troubleshooting
Common Issues:
- Source not found: Verify source is registered in
pipeline.pyself.sourcesdict - Rate limiting not working: Check
api_typein@rate_limited()matches config - Circuit breaker always open: Lower
circuit_breaker_failuresthreshold or check API errors - No evidence returned: Add debug logging to
_process_*methods
Next Steps
Performance Tuning
Optimize your custom source for high throughput
API Reference
Complete API documentation for BaseSource