Skip to main content
The GTM Research Engine supports pluggable data sources. This guide shows how to create custom sources for your proprietary APIs or third-party services.

Source Architecture

All data sources inherit from BaseSource and implement a standardized interface:
backend/app/sources/base.py
class BaseSource:
    channel_name: str

    async def fetch(
        self, domain: str, query: str, search_depth: str
    ) -> SourceResult:
        raise NotImplementedError

SourceResult Structure

@dataclass
class SourceResult:
    channel: str              # Source identifier (e.g., "google_search")
    domain: str               # Company domain being researched
    query: str                # Search query executed
    evidences: List[Evidence] # List of evidence found
    ok: bool                  # Success/failure status
    error: Optional[str]      # Error message if ok=False

Building a Custom Source

Step 1: Create Source Class

Let’s build a custom source for LinkedIn company pages:
1

Define the Source Class

Create a new file backend/app/sources/linkedin_search.py:
linkedin_search.py
import asyncio
import os
from typing import List

import httpx
from dotenv import load_dotenv

from app.decorators import rate_limited
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client

load_dotenv()

class LinkedInSearchSource(BaseSource):
    """Custom source for LinkedIn company intelligence."""
    
    channel_name = "linkedin_search"
    
    def __init__(self):
        api_key = os.getenv("LINKEDIN_API_KEY")
        if not api_key:
            raise ValueError("LINKEDIN_API_KEY environment variable is required")
        
        self.api_key = api_key
        self.base_url = "https://api.linkedin.com/v2"
2

Implement the Fetch Method

Add rate limiting and fetch logic:
linkedin_search.py
@rate_limited("linkedin")  # Apply rate limiting decorator
async def fetch(
    self, domain: str, query: str, search_depth: str
) -> SourceResult:
    """Fetch company data from LinkedIn API."""
    try:
        # Extract company name from domain
        company_name = domain.split('.')[0]
        
        # Build API request
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "X-Restli-Protocol-Version": "2.0.0"
        }
        
        async with httpx.AsyncClient() as client:
            # Search for company
            response = await client.get(
                f"{self.base_url}/companies",
                headers=headers,
                params={
                    "q": "universalName",
                    "universalName": company_name
                }
            )
            
            if response.status_code != 200:
                return SourceResult(
                    channel=self.channel_name,
                    domain=domain,
                    query=query,
                    evidences=[],
                    ok=False,
                    error=f"LinkedIn API returned {response.status_code}"
                )
            
            data = response.json()
            evidences = await self._process_company_data(data, domain)
            
            return SourceResult(
                channel=self.channel_name,
                domain=domain,
                query=query,
                evidences=evidences,
                ok=True
            )
            
    except Exception as e:
        return SourceResult(
            channel=self.channel_name,
            domain=domain,
            query=query,
            evidences=[],
            ok=False,
            error=f"LinkedIn search failed: {str(e)}"
        )
3

Process API Response

Convert API data to Evidence objects:
linkedin_search.py
async def _process_company_data(
    self, data: dict, domain: str
) -> List[Evidence]:
    """Convert LinkedIn company data to Evidence objects."""
    evidences = []
    
    elements = data.get("elements", [])
    for company in elements:
        # Extract company information
        name = company.get("localizedName", "")
        description = company.get("description", {}).get("localized", {}).get("en_US", "")
        industries = company.get("industries", [])
        specialties = company.get("specialties", [])
        company_url = company.get("websiteUrl", "")
        
        # Create evidence from company profile
        evidence = Evidence(
            url=company_url or f"https://linkedin.com/company/{domain.split('.')[0]}",
            title=f"{name} - LinkedIn Company Profile",
            snippet=f"{description}\n\nIndustries: {', '.join(industries)}\nSpecialties: {', '.join(specialties)}",
            source_name=self.channel_name
        )
        
        # Check cache to avoid duplicates
        is_cached = await asyncio.to_thread(
            redis_client.is_evidence_cached, domain, evidence
        )
        if is_cached:
            continue
        
        # Add to cache
        await asyncio.to_thread(
            redis_client.add_evidence_to_cache, domain, evidence
        )
        
        evidences.append(evidence)
    
    return evidences

Step 2: Register the Source

Add your custom source to the pipeline:
from app.sources.base import BaseSource, SourceResult
from app.sources.google_search import GoogleSearchSource
from app.sources.jobs_search import JobsSearchSource
from app.sources.news_search import NewsSearchSource
from app.sources.linkedin_search import LinkedInSearchSource  # Add import

__all__ = [
    "BaseSource",
    "SourceResult",
    "GoogleSearchSource",
    "JobsSearchSource",
    "NewsSearchSource",
    "LinkedInSearchSource",  # Export custom source
]

Step 3: Configure Rate Limiting

Add rate limit configuration for your custom source:
@dataclass(frozen=True)
class Settings:
    max_parallel_searches: int
    circuit_breaker_failures: int
    circuit_breaker_reset_seconds: float
    
    # API Rate Limits
    tavily_rpm: int
    gemini_rpm: int
    newsapi_rpm: int
    linkedin_rpm: int  # Add new rate limit

@lru_cache(maxsize=1)
def get_settings() -> Settings:
    return Settings(
        max_parallel_searches=20,
        circuit_breaker_failures=5,
        circuit_breaker_reset_seconds=30.0,
        tavily_rpm=500,
        gemini_rpm=2000,
        newsapi_rpm=300,
        linkedin_rpm=100,  # LinkedIn API limit
    )

Real-World Examples

Example 1: Crunchbase API Source

Integrate funding and acquisition data:
crunchbase_search.py
import asyncio
import os
from typing import List

import httpx
from dotenv import load_dotenv

from app.decorators import rate_limited
from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client

load_dotenv()

class CrunchbaseSearchSource(BaseSource):
    """Crunchbase funding and company data source."""
    
    channel_name = "crunchbase_search"
    
    def __init__(self):
        api_key = os.getenv("CRUNCHBASE_API_KEY")
        if not api_key:
            raise ValueError("CRUNCHBASE_API_KEY required")
        
        self.api_key = api_key
        self.base_url = "https://api.crunchbase.com/api/v4"
    
    @rate_limited("crunchbase")
    async def fetch(
        self, domain: str, query: str, search_depth: str
    ) -> SourceResult:
        try:
            async with httpx.AsyncClient() as client:
                # Search for organization
                response = await client.post(
                    f"{self.base_url}/searches/organizations",
                    headers={"X-cb-user-key": self.api_key},
                    json={
                        "field_ids": [
                            "name", "short_description", "website",
                            "funding_total", "last_funding_type",
                            "categories", "investor_identifiers"
                        ],
                        "query": [{"type": "predicate",
                                  "field_id": "website_url",
                                  "operator_id": "includes",
                                  "values": [domain]}]
                    }
                )
                
                if response.status_code != 200:
                    return SourceResult(
                        channel=self.channel_name,
                        domain=domain,
                        query=query,
                        evidences=[],
                        ok=False,
                        error=f"Crunchbase API error: {response.status_code}"
                    )
                
                data = response.json()
                evidences = await self._process_crunchbase_data(data, domain)
                
                return SourceResult(
                    channel=self.channel_name,
                    domain=domain,
                    query=query,
                    evidences=evidences,
                    ok=True
                )
        
        except Exception as e:
            return SourceResult(
                channel=self.channel_name,
                domain=domain,
                query=query,
                evidences=[],
                ok=False,
                error=str(e)
            )
    
    async def _process_crunchbase_data(
        self, data: dict, domain: str
    ) -> List[Evidence]:
        evidences = []
        
        for entity in data.get("entities", []):
            properties = entity.get("properties", {})
            
            name = properties.get("name", "")
            description = properties.get("short_description", "")
            funding = properties.get("funding_total", {}).get("value_usd", 0)
            categories = [cat.get("value") for cat in properties.get("categories", [])]
            
            evidence = Evidence(
                url=properties.get("website", {}).get("value", ""),
                title=f"{name} - Crunchbase Profile",
                snippet=f"{description}\n\nTotal Funding: ${funding:,}\nCategories: {', '.join(categories)}",
                source_name=self.channel_name
            )
            
            # Deduplication check
            is_cached = await asyncio.to_thread(
                redis_client.is_evidence_cached, domain, evidence
            )
            if not is_cached:
                await asyncio.to_thread(
                    redis_client.add_evidence_to_cache, domain, evidence
                )
                evidences.append(evidence)
        
        return evidences

Example 2: Custom Database Source

Query internal company databases:
internal_db_search.py
import asyncio
from typing import List

import asyncpg

from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult

class InternalDBSearchSource(BaseSource):
    """Query internal PostgreSQL database for company intelligence."""
    
    channel_name = "internal_db_search"
    
    def __init__(self):
        self.db_pool = None
    
    async def _get_pool(self):
        """Lazy initialize connection pool."""
        if self.db_pool is None:
            self.db_pool = await asyncpg.create_pool(
                host="localhost",
                database="company_intel",
                user="postgres",
                password="password",
                min_size=5,
                max_size=20
            )
        return self.db_pool
    
    async def fetch(
        self, domain: str, query: str, search_depth: str
    ) -> SourceResult:
        try:
            pool = await self._get_pool()
            
            async with pool.acquire() as conn:
                # Query company interactions from CRM
                rows = await conn.fetch("""
                    SELECT 
                        c.name,
                        c.website,
                        i.interaction_type,
                        i.notes,
                        i.created_at
                    FROM companies c
                    JOIN interactions i ON c.id = i.company_id
                    WHERE c.domain = $1
                    ORDER BY i.created_at DESC
                    LIMIT 10
                """, domain)
                
                evidences = []
                for row in rows:
                    evidence = Evidence(
                        url=row['website'],
                        title=f"{row['name']} - {row['interaction_type']}",
                        snippet=f"{row['notes']} (Date: {row['created_at']})",
                        source_name=self.channel_name
                    )
                    evidences.append(evidence)
                
                return SourceResult(
                    channel=self.channel_name,
                    domain=domain,
                    query=query,
                    evidences=evidences,
                    ok=True
                )
        
        except Exception as e:
            return SourceResult(
                channel=self.channel_name,
                domain=domain,
                query=query,
                evidences=[],
                ok=False,
                error=str(e)
            )

Example 3: Web Scraping Source

Scrape company websites with Playwright:
web_scraping_source.py
import asyncio
from typing import List

from playwright.async_api import async_playwright

from app.models.response import Evidence
from app.sources.base import BaseSource, SourceResult
from app.db import redis_client

class WebScrapingSource(BaseSource):
    """Scrape company websites for technology signals."""
    
    channel_name = "web_scraping"
    
    async def fetch(
        self, domain: str, query: str, search_depth: str
    ) -> SourceResult:
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch(headless=True)
                page = await browser.new_page()
                
                # Navigate to company homepage
                await page.goto(f"https://{domain}", timeout=10000)
                
                # Extract meta tags and page content
                title = await page.title()
                meta_description = await page.get_attribute(
                    'meta[name="description"]', 'content'
                ) or ""
                
                # Look for technology indicators in page source
                html_content = await page.content()
                tech_signals = self._detect_technologies(html_content)
                
                await browser.close()
                
                # Create evidence from scraped data
                evidence = Evidence(
                    url=f"https://{domain}",
                    title=title,
                    snippet=f"{meta_description}\n\nDetected Technologies: {', '.join(tech_signals)}",
                    source_name=self.channel_name
                )
                
                # Check cache
                is_cached = await asyncio.to_thread(
                    redis_client.is_evidence_cached, domain, evidence
                )
                
                evidences = []
                if not is_cached:
                    await asyncio.to_thread(
                        redis_client.add_evidence_to_cache, domain, evidence
                    )
                    evidences.append(evidence)
                
                return SourceResult(
                    channel=self.channel_name,
                    domain=domain,
                    query=query,
                    evidences=evidences,
                    ok=True
                )
        
        except Exception as e:
            return SourceResult(
                channel=self.channel_name,
                domain=domain,
                query=query,
                evidences=[],
                ok=False,
                error=f"Scraping failed: {str(e)}"
            )
    
    def _detect_technologies(self, html: str) -> List[str]:
        """Detect technologies from HTML content."""
        signals = []
        
        tech_patterns = {
            "React": ["react", "_react"],
            "Vue.js": ["vue.js", "__vue__"],
            "Angular": ["ng-", "angular"],
            "WordPress": ["wp-content", "wordpress"],
            "Shopify": ["cdn.shopify.com"],
            "Google Analytics": ["google-analytics.com", "gtag"],
        }
        
        html_lower = html.lower()
        for tech, patterns in tech_patterns.items():
            if any(pattern in html_lower for pattern in patterns):
                signals.append(tech)
        
        return signals

Best Practices

Always return a SourceResult with ok=False on errors:
try:
    # API call logic
    response = await client.get(...)
except httpx.TimeoutException as e:
    return SourceResult(
        channel=self.channel_name,
        domain=domain,
        query=query,
        evidences=[],
        ok=False,
        error=f"Request timeout: {str(e)}"
    )
except Exception as e:
    return SourceResult(
        channel=self.channel_name,
        domain=domain,
        query=query,
        evidences=[],
        ok=False,
        error=f"Unexpected error: {str(e)}"
    )
Always use the @rate_limited decorator:
@rate_limited("custom_api")  # Match your API config
async def fetch(self, domain: str, query: str, search_depth: str):
    # Automatic rate limiting and retry logic
    pass
Benefits:
  • Token bucket smoothing
  • Automatic retries on rate limit errors
  • Exponential backoff
Use Redis deduplication for all evidence:
# Check cache before adding
is_cached = await asyncio.to_thread(
    redis_client.is_evidence_cached, domain, evidence
)
if is_cached:
    continue

# Add to cache
await asyncio.to_thread(
    redis_client.add_evidence_to_cache, domain, evidence
)
Use asyncio.to_thread() for blocking operations:
# For sync API clients
response = await asyncio.to_thread(
    self.sync_client.search,
    query=query
)

# For CPU-intensive tasks
results = await asyncio.to_thread(
    self._process_large_dataset,
    data
)

Testing Custom Sources

Unit Tests

test_linkedin_search.py
import pytest
from app.sources.linkedin_search import LinkedInSearchSource

@pytest.mark.asyncio
async def test_linkedin_fetch_success():
    source = LinkedInSearchSource()
    
    result = await source.fetch(
        domain="stripe.com",
        query="fintech payment processing",
        search_depth="standard"
    )
    
    assert result.ok is True
    assert result.channel == "linkedin_search"
    assert len(result.evidences) > 0

@pytest.mark.asyncio
async def test_linkedin_fetch_error_handling():
    source = LinkedInSearchSource()
    
    # Test with invalid domain
    result = await source.fetch(
        domain="invalid-domain-12345.com",
        query="test",
        search_depth="quick"
    )
    
    # Should not crash, should return error result
    assert result.ok is False
    assert result.error is not None

Integration Tests

@pytest.mark.asyncio
async def test_custom_source_in_pipeline():
    """Test custom source integrates with pipeline."""
    from app.services.pipeline import ResearchPipeline
    from app.core import RunMetrics
    import time
    
    pipeline = ResearchPipeline(
        run_id="test-run",
        research_goal="Find fintech companies",
        search_depth="quick",
        company_domains=["stripe.com"],
        strategies=[
            QueryStrategy(channel="linkedin_search", strategy_type="company_profile")
        ],
        max_parallel_searches=5,
        confidence_threshold=0.7,
        metrics=RunMetrics(start_time=time.perf_counter())
    )
    
    results, total_planned = await pipeline.run()
    
    assert len(results) == 1
    assert results[0].domain == "stripe.com"
    # Verify LinkedIn source contributed evidence
    linkedin_evidence = [
        e for e in results[0].findings.evidence 
        if e.source_name == "linkedin_search"
    ]
    assert len(linkedin_evidence) > 0

Troubleshooting

Common Issues:
  1. Source not found: Verify source is registered in pipeline.py self.sources dict
  2. Rate limiting not working: Check api_type in @rate_limited() matches config
  3. Circuit breaker always open: Lower circuit_breaker_failures threshold or check API errors
  4. No evidence returned: Add debug logging to _process_* methods

Next Steps

Performance Tuning

Optimize your custom source for high throughput

API Reference

Complete API documentation for BaseSource

Build docs developers (and LLMs) love