Overview
Sources are the entry point for candidates in the pipeline. They fetch raw candidates from various retrieval systems and services. All sources run in parallel to maximize throughput.
Source Trait
The Source trait defines the interface for fetching candidates:
candidate-pipeline/source.rs
#[async_trait]
pub trait Source < Q , C > : Any + Send + Sync
where
Q : Clone + Send + Sync + ' static ,
C : Clone + Send + Sync + ' static ,
{
/// Decide if this source should run for the given query
fn enable ( & self , _query : & Q ) -> bool {
true
}
async fn get_candidates ( & self , query : & Q ) -> Result < Vec < C >, String >;
fn name ( & self ) -> & ' static str {
util :: short_type_name ( type_name_of_val ( self ))
}
}
Optional gating logic to conditionally enable/disable the source based on query parameters
get_candidates
async fn(&self, &Q) -> Result<Vec<C>, String>
The main method that fetches candidates. Returns a vector of candidates or an error message.
name
fn(&self) -> &'static str
Returns the source name for logging and metrics. Auto-generated from the type name by default.
Thunder Source
The Thunder source fetches in-network (following) posts from the Thunder service:
home-mixer/sources/thunder_source.rs
pub struct ThunderSource {
pub thunder_client : Arc < ThunderClient >,
}
#[async_trait]
impl Source < ScoredPostsQuery , PostCandidate > for ThunderSource {
async fn get_candidates ( & self , query : & ScoredPostsQuery ) -> Result < Vec < PostCandidate >, String > {
let cluster = ThunderCluster :: Amp ;
let channel = self
. thunder_client
. get_random_channel ( cluster )
. ok_or_else ( || "ThunderSource: no available channel" . to_string ()) ? ;
let mut client = InNetworkPostsServiceClient :: new ( channel . clone ());
let following_list = & query . user_features . followed_user_ids;
let request = GetInNetworkPostsRequest {
user_id : query . user_id as u64 ,
following_user_ids : following_list . iter () . map ( |& id | id as u64 ) . collect (),
max_results : p :: THUNDER_MAX_RESULTS ,
exclude_tweet_ids : vec! [],
algorithm : "default" . to_string (),
debug : false ,
is_video_request : false ,
};
let response = client
. get_in_network_posts ( request )
. await
. map_err ( | e | format! ( "ThunderSource: {}" , e )) ? ;
let candidates : Vec < PostCandidate > = response
. into_inner ()
. posts
. into_iter ()
. map ( | post | PostCandidate {
tweet_id : post . post_id,
author_id : post . author_id as u64 ,
in_reply_to_tweet_id : post . in_reply_to_post_id . and_then ( | id | u64 :: try_from ( id ) . ok ()),
served_type : Some ( pb :: ServedType :: ForYouInNetwork ),
.. Default :: default ()
})
. collect ();
Ok ( candidates )
}
}
Thunder provides in-network posts from accounts the user follows. This is the “Following” content in the For You feed.
Thunder Features
Fetches posts from accounts the user follows
Uses the following list from query.user_features.followed_user_ids
Returns up to THUNDER_MAX_RESULTS candidates
Marks candidates with ServedType::ForYouInNetwork
Includes reply chain information (ancestors)
Phoenix Source
The Phoenix source fetches out-of-network (discovery) posts from the Phoenix retrieval system:
home-mixer/sources/phoenix_source.rs
pub struct PhoenixSource {
pub phoenix_retrieval_client : Arc < dyn PhoenixRetrievalClient + Send + Sync >,
}
#[async_trait]
impl Source < ScoredPostsQuery , PostCandidate > for PhoenixSource {
fn enable ( & self , query : & ScoredPostsQuery ) -> bool {
! query . in_network_only
}
async fn get_candidates ( & self , query : & ScoredPostsQuery ) -> Result < Vec < PostCandidate >, String > {
let user_id = query . user_id as u64 ;
let sequence = query
. user_action_sequence
. as_ref ()
. ok_or_else ( || "PhoenixSource: missing user_action_sequence" . to_string ()) ? ;
let response = self
. phoenix_retrieval_client
. retrieve ( user_id , sequence . clone (), p :: PHOENIX_MAX_RESULTS )
. await
. map_err ( | e | format! ( "PhoenixSource: {}" , e )) ? ;
let candidates : Vec < PostCandidate > = response
. top_k_candidates
. into_iter ()
. flat_map ( | scored_candidates | scored_candidates . candidates)
. filter_map ( | scored_candidate | scored_candidate . candidate)
. map ( | tweet_info | PostCandidate {
tweet_id : tweet_info . tweet_id as i64 ,
author_id : tweet_info . author_id,
in_reply_to_tweet_id : Some ( tweet_info . in_reply_to_tweet_id),
served_type : Some ( pb :: ServedType :: ForYouPhoenixRetrieval ),
.. Default :: default ()
})
. collect ();
Ok ( candidates )
}
}
Phoenix provides out-of-network posts from accounts the user doesn’t follow. This is the “Recommended” content in the For You feed.
Phoenix Features
Fetches posts from accounts the user doesn’t follow
Uses the user action sequence for personalization
Conditionally disabled when query.in_network_only is true
Returns up to PHOENIX_MAX_RESULTS candidates
Marks candidates with ServedType::ForYouPhoenixRetrieval
Requires user_action_sequence to be present in the query
Parallel Execution
Sources run in parallel for maximum performance:
candidate-pipeline/candidate_pipeline.rs
async fn fetch_candidates ( & self , query : & Q ) -> Vec < C > {
let request_id = query . request_id () . to_string ();
let sources : Vec < _ > = self . sources () . iter () . filter ( | s | s . enable ( query )) . collect ();
let source_futures = sources . iter () . map ( | s | s . get_candidates ( query ));
let results = join_all ( source_futures ) . await ;
let mut collected = Vec :: new ();
for ( source , result ) in sources . iter () . zip ( results ) {
match result {
Ok ( mut candidates ) => {
info! (
"request_id={} stage={:?} component={} fetched {} candidates" ,
request_id ,
PipelineStage :: Source ,
source . name (),
candidates . len ()
);
collected . append ( & mut candidates );
}
Err ( err ) => {
error! (
"request_id={} stage={:?} component={} failed: {}" ,
request_id ,
PipelineStage :: Source ,
source . name (),
err
);
}
}
}
collected
}
Error Handling
If a source fails, the error is logged but the pipeline continues with candidates from other sources. This ensures partial failures don’t break the entire feed.
Conditional Enabling
Sources can be conditionally enabled based on query parameters:
fn enable ( & self , query : & ScoredPostsQuery ) -> bool {
! query . in_network_only
}
This allows dynamic source selection based on:
User experiments/flags
Query parameters
A/B test assignments
Feature flags
Source Configuration
Typical source configuration:
Thunder Configuration
Phoenix Configuration
const THUNDER_MAX_RESULTS : i32 = 800 ;
Best Practices
Always prefix error messages with the source name for easier debugging: . map_err ( | e | format! ( "ThunderSource: {}" , e ))
Validate required query fields before making external calls: let sequence = query
. user_action_sequence
. as_ref ()
. ok_or_else ( || "PhoenixSource: missing user_action_sequence" . to_string ()) ? ;
Use the stats macro for automatic metrics collection: #[xai_stats_macro :: receive_stats]
async fn get_candidates ( ... ) -> Result < Vec < PostCandidate >, String > {
// ...
}
Pipeline Framework Understand the overall pipeline execution flow
Hydrators Enrich candidates after they’re fetched
Phoenix Retrieval Deep dive into Phoenix retrieval system
Thunder Service Thunder in-network service details