Skip to main content

Discovery Pipeline

The Discovery Pipeline is a sophisticated multi-stage system designed to automatically find, evaluate, and onboard high-quality creators across multiple social media platforms.

Architecture Overview

The pipeline uses a queue-based architecture with three main stages:
┌──────────────────┐
│  Discovery Jobs  │  → Trending topics, categories, creator refresh
└────────┬─────────┘


┌──────────────────┐
│   Evaluation     │  → Quality scoring, duplicate detection
└────────┬─────────┘


┌──────────────────┐
│   Aggregation    │  → Data enrichment, composite scoring
└──────────────────┘

Core Components

The pipeline consists of several interconnected services:
  • QueueConfig: Manages BullMQ queues and workers
  • DiscoveryScheduler: Schedules recurring discovery jobs
  • TrendingDiscovery: Searches for creators by trending topics
  • CreatorEvaluator: Evaluates creator quality and suitability
  • DuplicateDetector: Prevents duplicate creator entries
  • DataAggregationEngine: Enriches creator profiles with cross-platform data

Queue System

The pipeline uses three specialized queues, each with different concurrency settings optimized for their workload:

Creator Discovery Queue

Concurrency: 5 workers Job Types:
  • DISCOVER_TRENDING - Find creators from trending topics
  • EXPLORE_CATEGORY - Explore specific categories across platforms
  • REFRESH_EXISTING - Update existing creator data
// Example: Adding a discovery job
await queueConfig.addJob(
  'creator-discovery',
  JobType.DISCOVER_TRENDING,
  {
    platform: 'tiktok',
    topic: 'tech reviews',
    mode: 'comprehensive'
  },
  JobPriority.HIGH
);

Creator Evaluation Queue

Concurrency: 10 workers Purpose: Evaluates discovered creators based on:
  • Follower count and growth rate
  • Engagement rate metrics
  • Content quality indicators
  • Audience demographics
  • Brand safety factors
// Evaluation outcome actions
if (evaluation.recommendation === 'add') {
  // Add to database and queue for aggregation
  await addCreatorToDatabase(creatorData, evaluation);
} else if (evaluation.recommendation === 'monitor') {
  // Add to monitoring list for future consideration
  await addToMonitoringList(creatorData, evaluation);
} else {
  // Reject with reasons logged
}

Creator Aggregation Queue

Concurrency: 3 workers Purpose: Aggregates cross-platform data and calculates composite scores
const aggregatedData = await aggregationEngine.aggregateCreatorData(
  creatorId,
  { 
    analyzeContentThemes: true,
    includeHistoricalData: false 
  }
);

Discovery Methods

Searches for creators based on viral topics and hashtags:
const trendingTopic: TrendingTopic = {
  topic: 'AI tutorials',
  platform: 'tiktok',
  volume: 150000,
  growth: 45.2,
  relatedHashtags: ['#AITips', '#TechTutorials'],
  timestamp: new Date()
};

const creators = await trendingDiscovery.searchCreatorsByTopic(
  trendingTopic,
  50  // limit
);
Location: lib/discovery/discovery-pipeline.ts:131-177

Category Exploration

Systematically explores content categories across all platforms:
// Searches Instagram, TikTok, and Twitter for category
const result = await processCategoryExploration({
  category: 'sustainable living',
  limit: 20
});
Location: lib/discovery/discovery-pipeline.ts:182-231

Creator Refresh

Updates existing creator profiles with latest metrics:
const refreshResult = await processCreatorRefresh({
  creatorId: 'abc123',
  username: '@techreviewer',
  platform: 'tiktok'
});

console.log(`New score: ${refreshResult.newScore}`);
Location: lib/discovery/discovery-pipeline.ts:236-259

Pipeline Control

Starting the Pipeline

import { DiscoveryPipeline } from './lib/discovery/discovery-pipeline';
import { PrismaClient } from '@prisma/client';

const prisma = new PrismaClient();
const pipeline = new DiscoveryPipeline(prisma);

await pipeline.start();
console.log('Pipeline is running');

Monitoring Pipeline Status

const status = await pipeline.getStatus();

console.log(`Running: ${status.isRunning}`);
console.log('Queue Metrics:', status.queues);
console.log('Scheduled Jobs:', status.scheduledJobs);
Location: lib/discovery/discovery-pipeline.ts:373-392

Stopping the Pipeline

await pipeline.stop();
// Gracefully shuts down all queues and workers

Duplicate Detection

Before queuing creators for evaluation, the pipeline checks for duplicates:
const duplicateCheck = await duplicateDetector.checkDuplicate(creator);

if (!duplicateCheck.isDuplicate) {
  // Safe to add to evaluation queue
  await queueConfig.addJob('creator-evaluation', ...);
}
Duplicate detection uses:
  • Username matching across platforms
  • Profile URL comparison
  • Fuzzy matching for display names
  • Cross-platform identifier correlation

Discovery Reports

Generate comprehensive reports on discovery performance:
const report = await pipeline.generateReport(
  new Date('2026-03-01'),
  new Date('2026-03-06')
);

console.log(`Creators discovered: ${report.stats.creatorsDiscovered}`);
console.log(`Acceptance rate: ${report.trends.acceptanceRate}%`);
console.log(`Discovery rate: ${report.trends.discoveryRate}/day`);
console.log('Top sources:', report.stats.topSources);
Location: lib/discovery/discovery-pipeline.ts:397-480

Report Structure

interface DiscoveryReport {
  period: { start: Date; end: Date };
  stats: {
    creatorsDiscovered: number;
    creatorsAdded: number;
    creatorsRejected: number;
    platformBreakdown: Record<string, number>;
    topSources: Array<{ source: string; count: number }>;
    averageQualityScore: number;
  };
  trends: {
    discoveryRate: number;      // creators per day
    acceptanceRate: number;     // percentage accepted
    growthRate: number;         // vs previous period
  };
}

Configuration Options

Queue Concurrency

Adjust worker concurrency based on your infrastructure:
// In queue setup
this.queueConfig.createWorker(
  'creator-discovery',
  this.processDiscoveryJob.bind(this),
  { concurrency: 10 }  // Increase for more parallelism
);

Job Priorities

enum JobPriority {
  LOW = 1,
  NORMAL = 5,
  HIGH = 10,
  CRITICAL = 20
}

// High priority for trending topics
await queueConfig.addJob(
  'creator-discovery',
  JobType.DISCOVER_TRENDING,
  data,
  JobPriority.HIGH
);

Scheduler Configuration

The DiscoveryScheduler manages recurring jobs:
// Configure in scheduler
await scheduler.initialize();
scheduler.start();

// Schedule daily trending discovery
scheduler.scheduleRecurring(
  'daily-trending',
  '0 2 * * *',  // 2 AM daily
  { type: JobType.DISCOVER_TRENDING }
);

Performance Optimization

Batch Processing

Process multiple creators in batches to reduce database queries:
// Process in batches of 50
for (let i = 0; i < creators.length; i += 50) {
  const batch = creators.slice(i, i + 50);
  await Promise.all(
    batch.map(creator => 
      queueConfig.addJob('creator-evaluation', ...)
    )
  );
}

Queue Metrics

Monitor queue health and performance:
const metrics = await queueConfig.getQueueMetrics('creator-discovery');

console.log('Waiting:', metrics.waiting);
console.log('Active:', metrics.active);
console.log('Completed:', metrics.completed);
console.log('Failed:', metrics.failed);

Error Handling

The pipeline includes comprehensive error handling:
try {
  const evaluation = await this.evaluator.evaluateCreator(creatorData);
} catch (error) {
  logger.error(`Failed to evaluate creator ${creatorData.identifier}`, error);
  // Job will be retried based on queue configuration
  throw error;
}

Retry Strategy

Configure job retries in queue settings:
const jobOptions = {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 2000
  }
};

Database Models

The pipeline interacts with these key models:

CreatorProfile

Stores basic creator information and discovery metadata:
model CreatorProfile {
  id                String   @id @default(uuid())
  platform          String
  username          String
  followerCount     Int
  engagementRate    Float
  profileData       Json     // Includes discoverySource, evaluationScore
  createdAt         DateTime @default(now())
}

Platform-Specific Metrics

  • TiktokProfile - TikTok engagement metrics
  • InstagramProfile - Instagram profile data
  • TwitterMetrics - Twitter analytics
  • YoutubeMetrics - YouTube channel stats
Q: How often should I run discovery jobs?A: Trending discovery should run 2-4 times daily. Category exploration can run weekly. Creator refresh depends on your data freshness requirements - typically daily for active creators.Q: What determines if a creator is added vs monitored?A: The CreatorEvaluator uses a quality score based on engagement rate, follower count, content consistency, and audience quality. Scores above the threshold result in “add”, mid-range scores get “monitor”, low scores are rejected.Q: How do I prevent duplicate creators across platforms?A: The DuplicateDetector checks profile URLs, usernames, and cross-platform identifiers. It also uses fuzzy matching on display names to catch variations.Q: Can I customize evaluation criteria?A: Yes, modify the CreatorEvaluator class to adjust scoring weights, minimum thresholds, and quality factors based on your requirements.Q: What happens if a queue gets backed up?A: Monitor queue metrics and increase worker concurrency if needed. You can also implement job prioritization to process high-value discoveries first.Q: How do I add a new discovery source?A: Add a new JobType enum value, create a processing method in DiscoveryPipeline, and register it in the job router (processDiscoveryJob method).

Next Steps

Build docs developers (and LLMs) love