Pipeline Architecture
The For You feed operates as a multi-stage pipeline that processes each user request through a series of coordinated steps. Understanding this flow is essential for working with the system.
Each request flows through the Home Mixer orchestration layer, which coordinates all pipeline stages from candidate retrieval to final selection.
Request Flow
When a user requests their For You feed, here’s what happens:
Query Hydration
Fetch user context including engagement history and metadata async fn hydrate_query ( & self , query : Q ) -> Q {
let hydrators : Vec < _ > = self
. query_hydrators ()
. iter ()
. filter ( | h | h . enable ( & query ))
. collect ();
let hydrate_futures = hydrators . iter () . map ( | h | h . hydrate ( & query ));
let results = join_all ( hydrate_futures ) . await ;
// Merge results into query...
}
Candidate Sourcing
Retrieve candidates from Thunder (in-network) and Phoenix (out-of-network) in parallel
Candidate Hydration
Enrich candidates with post metadata, author info, and media details
Pre-Scoring Filters
Remove ineligible candidates (duplicates, old posts, blocked authors, etc.)
Scoring
Apply ML predictions and combine into final relevance scores
Selection
Sort by score and select top K candidates
Post-Selection Processing
Final visibility filtering and deduplication
Stage 1: Query Hydration
Before retrieving any content, the system needs to understand who you are and what you’re interested in.
User Context Collection
Engagement History
User Metadata
Your recent engagement sequence is fetched:
Posts you liked
Replies you wrote
Reposts you made
Posts you clicked
Authors you followed
Content you marked as “not interested”
This history becomes the input to Phoenix’s transformer model. Additional context is collected:
Following list (which accounts you follow)
Muted keywords and accounts
Blocked accounts
Language preferences
Country/region
Subscription status
Query hydrators run in parallel for performance. The framework uses join_all to await multiple futures concurrently.
Stage 2: Candidate Sourcing
Candidates are retrieved from two independent sources simultaneously:
Thunder: In-Network Source
// Thunder maintains in-memory stores per user
pub struct ThunderSource {
// Recent posts from followed accounts
in_network_store : InMemoryPostStore ,
}
impl Source < Query , Candidate > for ThunderSource {
async fn get_candidates ( & self , query : & Query ) -> Result < Vec < Candidate >> {
// Sub-millisecond lookup
self . in_network_store . get_posts_for_user ( query . viewer_id)
}
}
How Thunder Works:
Kafka Ingestion Consumes real-time post create/delete events from Kafka topics
In-Memory Storage Maintains per-user stores for instant lookups (< 1ms)
Auto-Trimming Removes posts older than retention period to manage memory
Post Types Separate stores for original posts, replies, reposts, and videos
Phoenix Retrieval: Out-of-Network Source
Phoenix uses a two-tower architecture to find relevant content from millions of posts:
┌─────────────────────────────────────────────────────────────┐
│ PHOENIX RETRIEVAL │
├─────────────────────────────────────────────────────────────┤
│ │
│ USER TOWER CANDIDATE TOWER │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ User Context │ │ All Posts │ │
│ │ + History │ │ in Corpus │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Transformer │ │ Transformer │ │
│ │ Encoder │ │ Encoder │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ User Embed │ │ Post Embeds │ │
│ │ [1, D] │ │ [N, D] │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ └──────────┬──────────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Dot Product │ │
│ │ Similarity Search │ │
│ └─────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Top-K Candidates │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
The two towers are trained jointly but can encode users and items independently, enabling efficient pre-computation of item embeddings.
Parallel Execution
Both sources run simultaneously:
// From candidate_pipeline.rs:126
async fn fetch_candidates ( & self , query : & Q ) -> Vec < C > {
let sources : Vec < _ > = self . sources () . iter () . filter ( | s | s . enable ( query )) . collect ();
let source_futures = sources . iter () . map ( | s | s . get_candidates ( query ));
let results = join_all ( source_futures ) . await ; // Parallel execution
let mut collected = Vec :: new ();
for ( source , result ) in sources . iter () . zip ( results ) {
match result {
Ok ( mut candidates ) => {
collected . append ( & mut candidates );
}
Err ( err ) => {
error! ( "Source {} failed: {}" , source . name (), err );
}
}
}
collected
}
The pipeline continues even if one source fails, ensuring resilience. Errors are logged but don’t block the entire request.
Stage 3: Candidate Hydration
Once we have candidate IDs, we need to enrich them with full metadata:
Fetches essential post information:
Post text content
Media URLs (photos, videos)
Timestamps
Engagement counts (likes, reposts, replies)
Conversation thread structure
Author Hydrator (Gizmoduck)
Loads author profile data:
Username and display name
Verification status
Profile picture
Bio and location
For video posts, fetches:
Video duration
Thumbnail URL
Encoding quality options
Determines access eligibility:
Is this paywalled content?
Does the viewer have access?
Subscription tier requirements
Parallel Hydration:
// From candidate_pipeline.rs:160
async fn hydrate ( & self , query : & Q , candidates : Vec < C >) -> Vec < C > {
let hydrators : Vec < _ > = self . hydrators () . iter () . filter ( | h | h . enable ( query )) . collect ();
let hydrate_futures = hydrators . iter () . map ( | h | h . hydrate ( query , & candidates ));
let results = join_all ( hydrate_futures ) . await ; // All hydrators run in parallel
// Results are merged into candidates...
}
Stage 4: Pre-Scoring Filters
Before spending compute on ML scoring, we remove candidates that shouldn’t be shown:
DropDuplicatesFilter Removes duplicate post IDs from the candidate set
AgeFilter Removes posts older than configurable threshold (e.g., 7 days)
SelfpostFilter Removes the user’s own posts from their feed
AuthorSocialgraphFilter Removes posts from blocked or muted accounts
MutedKeywordFilter Removes posts containing user’s muted keywords
PreviouslySeenPostsFilter Removes posts the user has already viewed
RepostDeduplicationFilter Deduplicates multiple reposts of the same content
IneligibleSubscriptionFilter Removes paywalled content the user can’t access
Sequential Execution:
Unlike sources and hydrators, filters run sequentially because each filter’s output affects the next:
// From candidate_pipeline.rs:220
async fn filter ( & self , query : & Q , candidates : Vec < C >) -> ( Vec < C >, Vec < C >) {
let mut candidates = candidates ;
let mut all_removed = Vec :: new ();
for filter in self . filters () . iter () . filter ( | f | f . enable ( query )) {
let result = filter . filter ( query , candidates ) . await ? ;
candidates = result . kept;
all_removed . extend ( result . removed);
}
( candidates , all_removed )
}
Stage 5: Scoring
Now comes the ML magic. Scoring happens in multiple sequential steps:
Phoenix Scorer
Calls the Phoenix ranking model to get engagement predictions # Phoenix predicts probabilities for each action
ranking_output = model.predict(
user_context = engagement_history,
candidates = candidate_posts
)
# Output shape: [batch_size, num_candidates, num_actions]
Weighted Scorer
Combines predictions into a single relevance score // Weighted combination of action probabilities
final_score =
0.5 * P ( favorite ) +
1.0 * P ( reply ) +
1.0 * P ( repost ) +
0.3 * P ( click ) +
2.0 * P ( follow_author ) -
0.8 * P ( not_interested ) -
5.0 * P ( block_author ) -
3.0 * P ( mute_author ) -
10.0 * P ( report )
Author Diversity Scorer
Attenuates scores for repeated authors to ensure feed diversity // Each additional post from same author gets penalized
if author_seen_count > 0 {
score *= diversity_discount_factor . powi ( author_seen_count );
}
Out-of-Network Scorer
Applies adjustments specifically for discovered content
Scorers run sequentially because later scorers may depend on scores computed by earlier ones.
Stage 6: Selection
The selector sorts candidates by final score and selects the top K:
// Simple score-based selection
fn select ( & self , query : & Q , mut candidates : Vec < C >) -> Vec < C > {
candidates . sort_by ( | a , b | {
b . score . partial_cmp ( & a . score) . unwrap_or ( std :: cmp :: Ordering :: Equal )
});
candidates . truncate ( query . max_results);
candidates
}
Stage 7: Post-Selection Processing
Final validation before serving to the user:
Visibility Filtering Removes posts that are:
Deleted or suspended
Marked as spam
Containing violence or gore
Age-restricted (for underage users)
Geoblocked in the user’s region
Thread Deduplication If multiple posts in a conversation thread made the cut, keeps only the most relevant branch to avoid redundancy.
The Home Mixer server returns scored posts via gRPC:
// From server.rs:25
impl ScoredPostsService for HomeMixerServer {
async fn get_scored_posts (
& self ,
request : Request < ScoredPostsQuery >,
) -> Result < Response < ScoredPostsResponse >, Status > {
let query = ScoredPostsQuery :: new (
proto_query . viewer_id,
proto_query . client_app_id,
proto_query . country_code,
// ...
);
let pipeline_result = self . phx_candidate_pipeline . execute ( query ) . await ;
let scored_posts : Vec < ScoredPost > = pipeline_result
. selected_candidates
. into_iter ()
. map ( | candidate | ScoredPost {
tweet_id : candidate . tweet_id,
author_id : candidate . author_id,
score : candidate . score . unwrap_or ( 0.0 ),
in_network : candidate . in_network . unwrap_or ( false ),
// ...
})
. collect ();
Ok ( Response :: new ( ScoredPostsResponse { scored_posts }))
}
}
Latency p50: ~50ms
p99: ~200msEnd-to-end request processing
Throughput 10,000+ RPS Requests per second per instance
Candidate Volume Millions → 1000s → 50 Retrieval → Scoring → Selection
Error Handling
The pipeline is designed to be resilient:
Graceful Degradation: If a source, hydrator, or scorer fails, the pipeline continues with the remaining components. Only critical failures (like query hydration) will abort the request.
// Example error handling in source fetching
match result {
Ok ( candidates ) => {
collected . append ( & mut candidates );
}
Err ( err ) => {
error! ( "Source {} failed: {}" , source . name (), err );
// Pipeline continues with other sources
}
}
Next Steps
Architecture Details Dive deep into the technical architecture and design patterns
Phoenix ML Models Learn about the retrieval and ranking models
Candidate Pipeline Framework Understand the composable pipeline framework
Customizing Scoring Adjust weights and add custom scorers