Skip to main content

Pipeline Architecture

The For You feed operates as a multi-stage pipeline that processes each user request through a series of coordinated steps. Understanding this flow is essential for working with the system.
Each request flows through the Home Mixer orchestration layer, which coordinates all pipeline stages from candidate retrieval to final selection.

Request Flow

When a user requests their For You feed, here’s what happens:
1

Query Hydration

Fetch user context including engagement history and metadata
async fn hydrate_query(&self, query: Q) -> Q {
    let hydrators: Vec<_> = self
        .query_hydrators()
        .iter()
        .filter(|h| h.enable(&query))
        .collect();
    let hydrate_futures = hydrators.iter().map(|h| h.hydrate(&query));
    let results = join_all(hydrate_futures).await;
    // Merge results into query...
}
2

Candidate Sourcing

Retrieve candidates from Thunder (in-network) and Phoenix (out-of-network) in parallel
3

Candidate Hydration

Enrich candidates with post metadata, author info, and media details
4

Pre-Scoring Filters

Remove ineligible candidates (duplicates, old posts, blocked authors, etc.)
5

Scoring

Apply ML predictions and combine into final relevance scores
6

Selection

Sort by score and select top K candidates
7

Post-Selection Processing

Final visibility filtering and deduplication

Stage 1: Query Hydration

Before retrieving any content, the system needs to understand who you are and what you’re interested in.

User Context Collection

Your recent engagement sequence is fetched:
  • Posts you liked
  • Replies you wrote
  • Reposts you made
  • Posts you clicked
  • Authors you followed
  • Content you marked as “not interested”
This history becomes the input to Phoenix’s transformer model.
Query hydrators run in parallel for performance. The framework uses join_all to await multiple futures concurrently.

Stage 2: Candidate Sourcing

Candidates are retrieved from two independent sources simultaneously:

Thunder: In-Network Source

// Thunder maintains in-memory stores per user
pub struct ThunderSource {
    // Recent posts from followed accounts
    in_network_store: InMemoryPostStore,
}

impl Source<Query, Candidate> for ThunderSource {
    async fn get_candidates(&self, query: &Query) -> Result<Vec<Candidate>> {
        // Sub-millisecond lookup
        self.in_network_store.get_posts_for_user(query.viewer_id)
    }
}
How Thunder Works:

Kafka Ingestion

Consumes real-time post create/delete events from Kafka topics

In-Memory Storage

Maintains per-user stores for instant lookups (< 1ms)

Auto-Trimming

Removes posts older than retention period to manage memory

Post Types

Separate stores for original posts, replies, reposts, and videos

Phoenix Retrieval: Out-of-Network Source

Phoenix uses a two-tower architecture to find relevant content from millions of posts:
┌─────────────────────────────────────────────────────────────┐
│                    PHOENIX RETRIEVAL                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   USER TOWER                     CANDIDATE TOWER            │
│   ┌──────────────┐              ┌──────────────┐           │
│   │ User Context │              │  All Posts   │           │
│   │  + History   │              │  in Corpus   │           │
│   └──────┬───────┘              └──────┬───────┘           │
│          │                             │                   │
│          ▼                             ▼                   │
│   ┌──────────────┐              ┌──────────────┐           │
│   │ Transformer  │              │ Transformer  │           │
│   │  Encoder     │              │  Encoder     │           │
│   └──────┬───────┘              └──────┬───────┘           │
│          │                             │                   │
│          ▼                             ▼                   │
│   ┌──────────────┐              ┌──────────────┐           │
│   │ User Embed   │              │ Post Embeds  │           │
│   │   [1, D]     │              │   [N, D]     │           │
│   └──────┬───────┘              └──────┬───────┘           │
│          │                             │                   │
│          └──────────┬──────────────────┘                   │
│                     ▼                                       │
│          ┌─────────────────────┐                           │
│          │  Dot Product        │                           │
│          │  Similarity Search  │                           │
│          └─────────┬───────────┘                           │
│                    │                                       │
│                    ▼                                       │
│          ┌─────────────────────┐                           │
│          │   Top-K Candidates  │                           │
│          └─────────────────────┘                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘
The two towers are trained jointly but can encode users and items independently, enabling efficient pre-computation of item embeddings.

Parallel Execution

Both sources run simultaneously:
// From candidate_pipeline.rs:126
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
    let sources: Vec<_> = self.sources().iter().filter(|s| s.enable(query)).collect();
    let source_futures = sources.iter().map(|s| s.get_candidates(query));
    let results = join_all(source_futures).await; // Parallel execution
    
    let mut collected = Vec::new();
    for (source, result) in sources.iter().zip(results) {
        match result {
            Ok(mut candidates) => {
                collected.append(&mut candidates);
            }
            Err(err) => {
                error!("Source {} failed: {}", source.name(), err);
            }
        }
    }
    collected
}
The pipeline continues even if one source fails, ensuring resilience. Errors are logged but don’t block the entire request.

Stage 3: Candidate Hydration

Once we have candidate IDs, we need to enrich them with full metadata:
Fetches essential post information:
  • Post text content
  • Media URLs (photos, videos)
  • Timestamps
  • Engagement counts (likes, reposts, replies)
  • Conversation thread structure
Loads author profile data:
  • Username and display name
  • Verification status
  • Profile picture
  • Bio and location
For video posts, fetches:
  • Video duration
  • Thumbnail URL
  • Encoding quality options
Determines access eligibility:
  • Is this paywalled content?
  • Does the viewer have access?
  • Subscription tier requirements
Parallel Hydration:
// From candidate_pipeline.rs:160
async fn hydrate(&self, query: &Q, candidates: Vec<C>) -> Vec<C> {
    let hydrators: Vec<_> = self.hydrators().iter().filter(|h| h.enable(query)).collect();
    let hydrate_futures = hydrators.iter().map(|h| h.hydrate(query, &candidates));
    let results = join_all(hydrate_futures).await; // All hydrators run in parallel
    // Results are merged into candidates...
}

Stage 4: Pre-Scoring Filters

Before spending compute on ML scoring, we remove candidates that shouldn’t be shown:

DropDuplicatesFilter

Removes duplicate post IDs from the candidate set

AgeFilter

Removes posts older than configurable threshold (e.g., 7 days)

SelfpostFilter

Removes the user’s own posts from their feed

AuthorSocialgraphFilter

Removes posts from blocked or muted accounts

MutedKeywordFilter

Removes posts containing user’s muted keywords

PreviouslySeenPostsFilter

Removes posts the user has already viewed

RepostDeduplicationFilter

Deduplicates multiple reposts of the same content

IneligibleSubscriptionFilter

Removes paywalled content the user can’t access
Sequential Execution: Unlike sources and hydrators, filters run sequentially because each filter’s output affects the next:
// From candidate_pipeline.rs:220
async fn filter(&self, query: &Q, candidates: Vec<C>) -> (Vec<C>, Vec<C>) {
    let mut candidates = candidates;
    let mut all_removed = Vec::new();
    
    for filter in self.filters().iter().filter(|f| f.enable(query)) {
        let result = filter.filter(query, candidates).await?;
        candidates = result.kept;
        all_removed.extend(result.removed);
    }
    
    (candidates, all_removed)
}

Stage 5: Scoring

Now comes the ML magic. Scoring happens in multiple sequential steps:
1

Phoenix Scorer

Calls the Phoenix ranking model to get engagement predictions
# Phoenix predicts probabilities for each action
ranking_output = model.predict(
    user_context=engagement_history,
    candidates=candidate_posts
)
# Output shape: [batch_size, num_candidates, num_actions]
2

Weighted Scorer

Combines predictions into a single relevance score
// Weighted combination of action probabilities
final_score = 
    0.5 * P(favorite) +
    1.0 * P(reply) +
    1.0 * P(repost) +
    0.3 * P(click) +
    2.0 * P(follow_author) -
    0.8 * P(not_interested) -
    5.0 * P(block_author) -
    3.0 * P(mute_author) -
    10.0 * P(report)
3

Author Diversity Scorer

Attenuates scores for repeated authors to ensure feed diversity
// Each additional post from same author gets penalized
if author_seen_count > 0 {
    score *= diversity_discount_factor.powi(author_seen_count);
}
4

Out-of-Network Scorer

Applies adjustments specifically for discovered content
Scorers run sequentially because later scorers may depend on scores computed by earlier ones.

Stage 6: Selection

The selector sorts candidates by final score and selects the top K:
// Simple score-based selection
fn select(&self, query: &Q, mut candidates: Vec<C>) -> Vec<C> {
    candidates.sort_by(|a, b| {
        b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal)
    });
    candidates.truncate(query.max_results);
    candidates
}

Stage 7: Post-Selection Processing

Final validation before serving to the user:
Visibility FilteringRemoves posts that are:
  • Deleted or suspended
  • Marked as spam
  • Containing violence or gore
  • Age-restricted (for underage users)
  • Geoblocked in the user’s region

Response Format

The Home Mixer server returns scored posts via gRPC:
// From server.rs:25
impl ScoredPostsService for HomeMixerServer {
    async fn get_scored_posts(
        &self,
        request: Request<ScoredPostsQuery>,
    ) -> Result<Response<ScoredPostsResponse>, Status> {
        let query = ScoredPostsQuery::new(
            proto_query.viewer_id,
            proto_query.client_app_id,
            proto_query.country_code,
            // ...
        );
        
        let pipeline_result = self.phx_candidate_pipeline.execute(query).await;
        
        let scored_posts: Vec<ScoredPost> = pipeline_result
            .selected_candidates
            .into_iter()
            .map(|candidate| ScoredPost {
                tweet_id: candidate.tweet_id,
                author_id: candidate.author_id,
                score: candidate.score.unwrap_or(0.0),
                in_network: candidate.in_network.unwrap_or(false),
                // ...
            })
            .collect();
            
        Ok(Response::new(ScoredPostsResponse { scored_posts }))
    }
}

Performance Characteristics

Latency

p50: ~50ms
p99: ~200ms
End-to-end request processing

Throughput

10,000+ RPSRequests per second per instance

Candidate Volume

Millions → 1000s → 50Retrieval → Scoring → Selection

Error Handling

The pipeline is designed to be resilient:
Graceful Degradation: If a source, hydrator, or scorer fails, the pipeline continues with the remaining components. Only critical failures (like query hydration) will abort the request.
// Example error handling in source fetching
match result {
    Ok(candidates) => {
        collected.append(&mut candidates);
    }
    Err(err) => {
        error!("Source {} failed: {}", source.name(), err);
        // Pipeline continues with other sources
    }
}

Next Steps

Architecture Details

Dive deep into the technical architecture and design patterns

Phoenix ML Models

Learn about the retrieval and ranking models

Candidate Pipeline Framework

Understand the composable pipeline framework

Customizing Scoring

Adjust weights and add custom scorers

Build docs developers (and LLMs) love