Overview
TechCal’s ingestion pipeline automatically collects events from multiple sources, applies quality scoring, deduplicates entries, and routes low-quality events to a moderation queue.
Cron Schedule: Hourly via Vercel cron (configured in vercel.json)Manual Trigger: POST /api/admin/ingestion/run (requires admin auth)
Architecture
Source Types
Collector: src/services/ingestion/collectors/RssCollector.ts
Supported Formats:
- RSS 2.0
- Atom
- Custom namespaces (Dublin Core, Media RSS)
Example Source:
{
"id": "uuid",
"name": "TechCrunch Events",
"url": "https://techcrunch.com/events/feed/",
"source_type": "rss",
"enabled": true,
"ingestion_config": {
"transform": "techcrunch",
"quality_threshold": 70
}
}
Parsing:
import Parser from 'rss-parser';
const parser = new Parser({
customFields: {
item: [
['ev:location', 'location'],
['ev:startdate', 'startDate'],
['content:encoded', 'contentEncoded']
]
}
});
const feed = await parser.parseURL(source.url);
const events = feed.items.map(item => normalizeRssItem(item));
API Sources
Collector: src/services/ingestion/collectors/ApiCollector.ts
Authentication:
{
"headers": {
"Authorization": "Bearer ${API_KEY}",
"Accept": "application/json"
},
"pagination": {
"type": "offset", // or 'cursor', 'page'
"param": "offset",
"limit": 100
}
}
Request Pattern:
async function fetchFromApi(source: EventSource): Promise<RawEvent[]> {
const headers = source.ingestion_config.headers || {};
const events: RawEvent[] = [];
let offset = 0;
let hasMore = true;
while (hasMore) {
const response = await fetch(
`${source.url}?offset=${offset}&limit=100`,
{ headers }
);
const data = await response.json();
events.push(...data.results);
hasMore = data.has_more;
offset += 100;
// Rate limiting
await sleep(1000);
}
return events;
}
ICS Calendar Feeds
Collector: src/services/ingestion/collectors/IcsCollector.ts
Supported:
- iCalendar (.ics) format
- Recurring events (RRULE)
- Multi-day events
- Timezone handling (VTIMEZONE)
Parsing:
import ical from 'node-ical';
const events = await ical.async.fromURL(source.url);
for (const event of Object.values(events)) {
if (event.type !== 'VEVENT') continue;
const normalized = {
title: event.summary,
description: event.description,
startTime: event.start.toISOString(),
endTime: event.end?.toISOString(),
location: event.location,
timezone: event.start.tz,
sourceUrl: event.url || source.url,
// Handle recurrence
rrule: event.rrule ? event.rrule.toString() : null
};
yield normalized;
}
When an ICS feed includes recurring events (RRULE), the collector expands them into individual instances up to 1 year in the future:if (event.rrule) {
const rule = rrulestr(event.rrule.toString());
const instances = rule.between(
new Date(),
addYears(new Date(), 1)
);
for (const instanceDate of instances) {
yield createInstance(event, instanceDate);
}
}
HTML Scrapers
Collector: src/services/ingestion/collectors/HtmlCollector.ts
Strategy:
- Fetch HTML page
- Parse with Mozilla Readability
- Extract structured data (JSON-LD, microdata)
- Fallback to heuristic extraction
Example:
import { Readability } from '@mozilla/readability';
import { JSDOM } from 'jsdom';
const response = await fetch(url);
const html = await response.text();
const dom = new JSDOM(html);
// Try JSON-LD first
const jsonLd = dom.window.document.querySelector('script[type="application/ld+json"]');
if (jsonLd) {
const data = JSON.parse(jsonLd.textContent);
if (data['@type'] === 'Event') {
return normalizeJsonLd(data);
}
}
// Fallback to Readability
const reader = new Readability(dom.window.document);
const article = reader.parse();
return extractEventFromArticle(article);
Normalization
All sources are normalized to a common schema:
interface NormalizedEvent {
// Required fields
title: string;
description: string;
startTime: string; // ISO 8601
sourceUrl: string;
sourceId: string; // Unique ID from source
// Optional fields
endTime?: string;
location?: string;
timezone?: string;
livestreamUrl?: string;
registrationUrl?: string;
eventType?: string;
tags?: string[];
organizer?: string;
priceRange?: string;
capacity?: number;
difficulty?: 'beginner' | 'intermediate' | 'advanced';
speakers?: Array<{
name: string;
title?: string;
company?: string;
bio?: string;
}>;
}
Normalization Steps:
- Clean HTML: Strip unwanted tags, sanitize content
- Parse dates: Convert to ISO 8601 UTC
- Extract location: Detect virtual vs. in-person
- Infer event type: Classify as conference, workshop, meetup, etc.
- Extract pricing: Parse free/paid indicators
function normalizeEvent(raw: RawEvent, source: EventSource): NormalizedEvent {
return {
title: sanitizeHtml(raw.title, { allowedTags: [] }),
description: cleanDescription(raw.description),
startTime: parseDate(raw.startDate, source.ingestion_config.timezone),
endTime: raw.endDate ? parseDate(raw.endDate) : null,
location: normalizeLocation(raw.location),
sourceUrl: raw.url || source.url,
sourceId: generateSourceId(raw, source),
eventType: inferEventType(raw.title, raw.description),
// ... more fields
};
}
Quality Scoring
Scorer: src/services/ingestion/QualityScorer.ts
Factors (0-100 scale):
1. Completeness (40 points)
How complete is the event data?let score = 0;
if (event.title) score += 5;
if (event.description?.length > 100) score += 10;
if (event.startTime) score += 5;
if (event.endTime) score += 3;
if (event.location) score += 5;
if (event.registrationUrl) score += 5;
if (event.organizer) score += 3;
if (event.speakers?.length > 0) score += 4;
2. Content Quality (30 points)
Is the content well-written and informative?// Description length
if (description.length > 500) score += 10;
else if (description.length > 200) score += 5;
// Readability
const sentences = description.split(/[.!?]/).length;
if (sentences >= 3) score += 5;
// Links and formatting
if (hasValidLinks(description)) score += 5;
if (hasStructuredContent(description)) score += 5;
// No spam indicators
if (!hasSpamKeywords(description)) score += 5;
3. Metadata Quality (20 points)
Is the event upcoming and timely?const daysUntilEvent = differenceInDays(event.startTime, new Date());
if (daysUntilEvent >= 7 && daysUntilEvent <= 90) score += 10;
else if (daysUntilEvent > 0) score += 5;
else score = 0; // Past events get 0
Example Scores:
- 90-100: Complete, well-written, upcoming event with speakers and agenda
- 75-89: Good event with minor missing fields
- 50-74: Acceptable but needs review (moderation queue)
- < 50: Incomplete or low-quality (requires manual review)
Deduplication
Deduplicator: src/services/ingestion/DeduplicationService.ts
Fuzzy Matching
Strategy: Multi-stage comparison
- Exact URL match: Same
sourceUrl = duplicate
- Title similarity: Levenshtein distance < 10%
- Date + location match: Same day + same venue
- Content similarity: Cosine similarity of description embeddings
function findSimilarEvents(
candidate: NormalizedEvent,
existingEvents: Event[]
): Event[] {
const matches: Array<{ event: Event; similarity: number }> = [];
for (const existing of existingEvents) {
let similarity = 0;
// URL exact match (100%)
if (candidate.sourceUrl === existing.sourceUrl) {
return [existing];
}
// Title similarity (40% weight)
const titleSim = levenshteinSimilarity(candidate.title, existing.title);
similarity += titleSim * 0.4;
// Date proximity (30% weight)
const hoursDiff = Math.abs(
differenceInHours(candidate.startTime, existing.startTime)
);
const dateSim = Math.max(0, 1 - hoursDiff / 48); // Within 48 hours
similarity += dateSim * 0.3;
// Location similarity (30% weight)
const locationSim = locationMatch(candidate.location, existing.location);
similarity += locationSim * 0.3;
if (similarity >= 0.85) {
matches.push({ event: existing, similarity });
}
}
return matches.sort((a, b) => b.similarity - a.similarity).map(m => m.event);
}
Duplicate Resolution
Policy:
- If similarity >= 95%: Skip (exact duplicate)
- If similarity >= 85%: Merge metadata (keep richer version)
- If similarity < 85%: Treat as separate event
if (similarity >= 0.95) {
logger.info('Skipping exact duplicate', { candidateId, existingId });
return { action: 'skip', existingEvent };
}
if (similarity >= 0.85) {
logger.info('Merging duplicate with richer metadata', { candidateId, existingId });
return {
action: 'merge',
mergedEvent: mergeEvents(candidate, existing)
};
}
return { action: 'create' };
Auto-Publish Thresholds
const QUALITY_THRESHOLDS = {
AUTO_PUBLISH: 75, // >= 75% quality score
MANUAL_REVIEW: 50, // 50-74% goes to moderation queue
REJECT: 50, // < 50% requires manual review
} as const;
function determineAction(qualityScore: number): 'publish' | 'moderate' | 'reject' {
if (qualityScore >= QUALITY_THRESHOLDS.AUTO_PUBLISH) {
return 'publish';
}
if (qualityScore >= QUALITY_THRESHOLDS.MANUAL_REVIEW) {
return 'moderate';
}
return 'reject';
}
Thresholds can be customized per source in ingestion_config.quality_threshold.
Moderation Queue
Admin Dashboard: /admin/ingestion/moderation
Features:
- View pending events sorted by quality score
- See duplicate matches with similarity scores
- Bulk approve/reject actions
- Edit event details before publishing
- View ingestion history and error logs
Queue Schema:
interface ModerationQueueEntry {
id: string;
event_id: string; // Temporary event in staging
source_id: string;
quality_score: number;
deduplication_matches: Array<{
event_id: string;
similarity: number;
}>;
flagged_reasons: string[];
created_at: string;
reviewed_at?: string;
reviewer_id?: string;
decision?: 'approved' | 'rejected' | 'merged';
}
Workflow:
- Moderator reviews event details
- Checks for duplicates
- Edits metadata if needed
- Approves → Moves to
events table
- Rejects → Archives to
rejected_events
- Merges → Combines with existing event
Race Condition Safety
Problem: Multiple cron instances running simultaneously
Solution: Distributed locking with Redis
import { Redis } from '@upstash/redis';
const redis = Redis.fromEnv();
const LOCK_KEY = 'ingestion:lock';
const LOCK_TTL = 300; // 5 minutes
async function runIngestion() {
// Try to acquire lock
const acquired = await redis.set(LOCK_KEY, Date.now(), {
nx: true, // Only set if not exists
ex: LOCK_TTL
});
if (!acquired) {
logger.warn('Another ingestion is running, skipping');
return { status: 'skipped', reason: 'lock_held' };
}
try {
// Run ingestion
await ingestFromAllSources();
} finally {
// Release lock
await redis.del(LOCK_KEY);
}
}
Error Handling
Retry Strategy
Exponential backoff for transient failures:
import pLimit from 'p-limit';
const limit = pLimit(3); // Max 3 concurrent requests
async function fetchWithRetry(
url: string,
maxRetries = 3
): Promise<Response> {
let lastError: Error;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fetch(url, { timeout: 10000 });
} catch (error) {
lastError = error;
const delay = Math.min(1000 * 2 ** attempt, 10000);
await sleep(delay);
}
}
throw lastError;
}
Error Logging
Sentry Integration:
import * as Sentry from '@sentry/nextjs';
try {
await ingestFromSource(source);
} catch (error) {
Sentry.captureException(error, {
tags: {
source_id: source.id,
source_type: source.source_type
},
extra: {
source_url: source.url,
last_success: source.last_ingested_at
}
});
// Mark source as errored
await markSourceErrored(source.id, error.message);
}
Environment Variables
# Required
CRON_SECRET=... # Vercel cron authentication
SUPABASE_SERVICE_ROLE_KEY=... # Database write access
# Optional
INGESTION_VERIFY_SPEAKERS=true # Verify speaker URLs exist (default: true)
INGESTION_BATCH_SIZE=100 # Events per batch (default: 100)
INGESTION_TIMEOUT=300000 # Max runtime in ms (default: 5 minutes)
Benchmarks:
- RSS feed (100 items): ~3 seconds
- API source (500 events): ~15 seconds (with rate limiting)
- ICS calendar (50 events): ~2 seconds
- Full ingestion (10 sources): ~60 seconds
Optimizations:
- Parallel source processing: Process multiple sources concurrently
- Batch inserts: Insert 100 events at a time
- Cached deduplication: Keep in-memory index of recent events
- Incremental ingestion: Track
last_ingested_at per source
Monitoring
Metrics Tracked:
- Events ingested per source
- Quality score distribution
- Duplicate detection rate
- Moderation queue size
- Ingestion errors by source
- Processing time per source
Dashboard: PostHog/Sentry dashboards show:
- Ingestion success rate (target: > 95%)
- Average quality score (target: > 70)
- Moderation queue backlog (target: < 50 events)
Manual Testing
# Test ingestion locally
npm run ingest
# Verify ingestion setup
npm run verify-ingestion
# Analyze ingestion data
npm run analyze-ingestion
Always test ingestion against staging environment first. Production ingestion can insert hundreds of events.
Next Steps
Data Model
Understand the database schema for events
Recommendation System
See how ingested events are scored and ranked