Overview
The Candidate Pipeline is a flexible, extensible framework for building multi-stage recommendation systems. It provides a clean abstraction for composing complex pipelines from reusable components.
The framework powers Home Mixer and can be used to build any candidate retrieval and ranking system.
Design Philosophy : Separate pipeline execution and monitoring from business logic, enabling engineers to focus on building individual components rather than orchestration.
Core Concepts
A CandidatePipeline orchestrates the flow of candidates through multiple stages:
┌────────────────────────────────────────────────────────────┐
│ CANDIDATE PIPELINE │
├────────────────────────────────────────────────────────────┤
│ │
│ Query │
│ ↓ │
│ QueryHydrators ( parallel ) │
│ ↓ │
│ Sources ( parallel ) → Candidates │
│ ↓ │
│ Hydrators ( parallel ) │
│ ↓ │
│ Filters ( sequential ) │
│ ↓ │
│ Scorers ( sequential ) │
│ ↓ │
│ Selector │
│ ↓ │
│ Post - Selection Hydrators ( parallel ) │
│ ↓ │
│ Post - Selection Filters ( sequential ) │
│ ↓ │
│ SideEffects ( async ) │
│ ↓ │
│ Results │
│ │
└────────────────────────────────────────────────────────────┘
Pipeline Stages
The framework defines traits for each pipeline stage:
QueryHydrator
Enriches the query with additional context before retrieving candidates.
candidate-pipeline/query_hydrator.rs
#[async_trait]
pub trait QueryHydrator < Q > : Send + Sync {
/// Name for logging/debugging
fn name ( & self ) -> & str ;
/// Whether this hydrator should run for this query
fn enable ( & self , query : & Q ) -> bool { true }
/// Fetch additional data for the query
async fn hydrate ( & self , query : & Q ) -> Result < Box < dyn Any + Send >, String >;
/// Update query with hydrated data
fn update ( & self , query : & mut Q , hydrated : Box < dyn Any + Send >);
}
Example : Fetch user’s engagement history before candidate retrieval
pub struct UserActionSeqQueryHydrator {
uas_fetcher : Arc < UserActionSequenceFetcher >,
}
impl QueryHydrator < ScoredPostsQuery > for UserActionSeqQueryHydrator {
async fn hydrate ( & self , query : & ScoredPostsQuery ) -> Result < Box < dyn Any + Send >> {
let sequence = self . uas_fetcher
. fetch_user_action_sequence ( query . user_id)
. await ? ;
Ok ( Box :: new ( sequence ))
}
fn update ( & self , query : & mut ScoredPostsQuery , hydrated : Box < dyn Any + Send >) {
if let Ok ( sequence ) = hydrated . downcast :: < UserActionSequence >() {
query . user_action_sequence = Some ( * sequence );
}
}
}
Source
Retrieves candidate items from a data source.
candidate-pipeline/source.rs
#[async_trait]
pub trait Source < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : & Q ) -> bool { true }
/// Fetch candidates from this source
async fn get_candidates ( & self , query : & Q ) -> Result < Vec < C >, String >;
}
Example : Retrieve posts from Thunder
pub struct ThunderSource {
pub thunder_client : Arc < ThunderClient >,
}
impl Source < ScoredPostsQuery , PostCandidate > for ThunderSource {
async fn get_candidates ( & self , query : & ScoredPostsQuery ) -> Result < Vec < PostCandidate >> {
let response = self . thunder_client
. get_in_network_posts ( query . user_id, query . following_ids)
. await ? ;
Ok ( response . posts . into_iter ()
. map ( | p | PostCandidate :: from ( p ))
. collect ())
}
}
Hydrator
Enriches candidates with additional features.
candidate-pipeline/hydrator.rs
#[async_trait]
pub trait Hydrator < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : & Q ) -> bool { true }
/// Hydrate all candidates
async fn hydrate ( & self , query : & Q , candidates : & [ C ])
-> Result < Vec < Box < dyn Any + Send >>, String >;
/// Update a single candidate with hydrated data
fn update ( & self , candidate : & mut C , hydrated : Box < dyn Any + Send >);
/// Update all candidates (default implementation calls update() for each)
fn update_all ( & self , candidates : & mut [ C ], hydrated : Vec < Box < dyn Any + Send >>) {
for ( candidate , h ) in candidates . iter_mut () . zip ( hydrated ) {
self . update ( candidate , h );
}
}
}
Example : Fetch author information
pub struct GizmoduckCandidateHydrator {
gizmoduck_client : Arc < GizmoduckClient >,
}
impl Hydrator < ScoredPostsQuery , PostCandidate > for GizmoduckCandidateHydrator {
async fn hydrate ( & self , _query : & ScoredPostsQuery , candidates : & [ PostCandidate ])
-> Result < Vec < Box < dyn Any + Send >>> {
let author_ids : Vec < u64 > = candidates . iter ()
. map ( | c | c . author_id)
. collect ();
let users = self . gizmoduck_client
. get_users ( author_ids )
. await ? ;
Ok ( users . into_iter ()
. map ( | u | Box :: new ( u ) as Box < dyn Any + Send >)
. collect ())
}
fn update ( & self , candidate : & mut PostCandidate , hydrated : Box < dyn Any + Send >) {
if let Ok ( user ) = hydrated . downcast :: < User >() {
candidate . author_screen_name = Some ( user . screen_name);
candidate . is_verified = Some ( user . is_verified);
}
}
}
Filter
Removes candidates that shouldn’t be shown to the user.
candidate-pipeline/filter.rs
pub struct FilterResult < C > {
pub kept : Vec < C >,
pub removed : Vec < C >,
}
#[async_trait]
pub trait Filter < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : & Q ) -> bool { true }
/// Filter candidates into kept and removed
async fn filter ( & self , query : & Q , candidates : Vec < C >)
-> Result < FilterResult < C >, String >;
}
Example : Remove user’s own posts
pub struct SelfTweetFilter ;
impl Filter < ScoredPostsQuery , PostCandidate > for SelfTweetFilter {
async fn filter ( & self , query : & ScoredPostsQuery , candidates : Vec < PostCandidate >)
-> Result < FilterResult < PostCandidate >> {
let ( kept , removed ) : ( Vec < _ >, Vec < _ >) = candidates . into_iter ()
. partition ( | c | c . author_id != query . user_id);
Ok ( FilterResult { kept , removed })
}
}
Scorer
Computes scores for ranking candidates.
candidate-pipeline/scorer.rs
#[async_trait]
pub trait Scorer < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : & Q ) -> bool { true }
/// Compute scores for all candidates
async fn score ( & self , query : & Q , candidates : & [ C ])
-> Result < Vec < Box < dyn Any + Send >>, String >;
/// Update a candidate with its score
fn update ( & self , candidate : & mut C , scored : Box < dyn Any + Send >);
/// Update all candidates (default implementation calls update() for each)
fn update_all ( & self , candidates : & mut [ C ], scored : Vec < Box < dyn Any + Send >>) {
for ( candidate , s ) in candidates . iter_mut () . zip ( scored ) {
self . update ( candidate , s );
}
}
}
Example : Phoenix ML scorer
pub struct PhoenixScorer {
phoenix_client : Arc < PhoenixPredictionClient >,
}
impl Scorer < ScoredPostsQuery , PostCandidate > for PhoenixScorer {
async fn score ( & self , query : & ScoredPostsQuery , candidates : & [ PostCandidate ])
-> Result < Vec < Box < dyn Any + Send >>> {
let predictions = self . phoenix_client
. predict ( query . user_id, candidates )
. await ? ;
Ok ( predictions . into_iter ()
. map ( | p | Box :: new ( p ) as Box < dyn Any + Send >)
. collect ())
}
fn update ( & self , candidate : & mut PostCandidate , scored : Box < dyn Any + Send >) {
if let Ok ( predictions ) = scored . downcast :: < PhoenixScores >() {
candidate . phoenix_scores = * predictions ;
}
}
}
Selector
Sorts and selects top candidates.
candidate-pipeline/selector.rs
pub trait Selector < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : & Q ) -> bool { true }
/// Select and reorder candidates
fn select ( & self , query : & Q , candidates : Vec < C >) -> Vec < C >;
}
Example : Select top K by score
pub struct TopKScoreSelector ;
impl Selector < ScoredPostsQuery , PostCandidate > for TopKScoreSelector {
fn select ( & self , query : & ScoredPostsQuery , mut candidates : Vec < PostCandidate >)
-> Vec < PostCandidate > {
// Sort by score descending
candidates . sort_by ( | a , b | {
b . score . partial_cmp ( & a . score) . unwrap_or ( Ordering :: Equal )
});
// Take top K
candidates . truncate ( query . max_results);
candidates
}
}
SideEffect
Runs asynchronous actions that don’t affect the pipeline result.
candidate-pipeline/side_effect.rs
pub struct SideEffectInput < Q , C > {
pub query : Arc < Q >,
pub selected_candidates : Vec < C >,
}
#[async_trait]
pub trait SideEffect < Q , C > : Send + Sync {
fn name ( & self ) -> & str ;
fn enable ( & self , query : Arc < Q >) -> bool { true }
/// Execute side effect asynchronously
async fn run ( & self , input : Arc < SideEffectInput < Q , C >>) -> Result <(), String >;
}
Example : Cache request info
pub struct CacheRequestInfoSideEffect {
strato_client : Arc < StratoClient >,
}
impl SideEffect < ScoredPostsQuery , PostCandidate > for CacheRequestInfoSideEffect {
async fn run ( & self , input : Arc < SideEffectInput < ScoredPostsQuery , PostCandidate >>)
-> Result <()> {
let request_info = RequestInfo {
user_id : input . query . user_id,
served_tweet_ids : input . selected_candidates . iter ()
. map ( | c | c . tweet_id)
. collect (),
timestamp : Instant :: now (),
};
self . strato_client . cache_request_info ( request_info ) . await ? ;
Ok (())
}
}
Pipeline Execution
The CandidatePipeline trait orchestrates all stages:
candidate-pipeline/candidate_pipeline.rs
#[async_trait]
pub trait CandidatePipeline < Q , C > : Send + Sync {
// Component accessors
fn query_hydrators ( & self ) -> & [ Box < dyn QueryHydrator < Q >>];
fn sources ( & self ) -> & [ Box < dyn Source < Q , C >>];
fn hydrators ( & self ) -> & [ Box < dyn Hydrator < Q , C >>];
fn filters ( & self ) -> & [ Box < dyn Filter < Q , C >>];
fn scorers ( & self ) -> & [ Box < dyn Scorer < Q , C >>];
fn selector ( & self ) -> & dyn Selector < Q , C >;
fn post_selection_hydrators ( & self ) -> & [ Box < dyn Hydrator < Q , C >>];
fn post_selection_filters ( & self ) -> & [ Box < dyn Filter < Q , C >>];
fn side_effects ( & self ) -> Arc < Vec < Box < dyn SideEffect < Q , C >>>>;
fn result_size ( & self ) -> usize ;
/// Execute the complete pipeline
async fn execute ( & self , query : Q ) -> PipelineResult < Q , C > {
// 1. Hydrate query
let hydrated_query = self . hydrate_query ( query ) . await ;
// 2. Fetch candidates from all sources (parallel)
let candidates = self . fetch_candidates ( & hydrated_query ) . await ;
// 3. Hydrate candidates (parallel)
let hydrated_candidates = self . hydrate ( & hydrated_query , candidates ) . await ;
// 4. Filter candidates (sequential)
let ( kept_candidates , filtered ) = self
. filter ( & hydrated_query , hydrated_candidates )
. await ;
// 5. Score candidates (sequential)
let scored_candidates = self . score ( & hydrated_query , kept_candidates ) . await ;
// 6. Select top candidates
let selected_candidates = self . select ( & hydrated_query , scored_candidates );
// 7. Post-selection hydration and filtering
let final_candidates = self
. hydrate_post_selection ( & hydrated_query , selected_candidates )
. await ;
let ( final_candidates , post_filtered ) = self
. filter_post_selection ( & hydrated_query , final_candidates )
. await ;
// 8. Run side effects asynchronously
self . run_side_effects ( Arc :: new ( SideEffectInput {
query : Arc :: new ( hydrated_query ),
selected_candidates : final_candidates . clone (),
}));
PipelineResult {
selected_candidates : final_candidates ,
// ...
}
}
}
Parallel vs Sequential Execution
Parallel Stages
Sequential Stages
Query Hydrators , Sources , and Hydrators run in parallel:let hydrate_futures = hydrators . iter () . map ( | h | h . hydrate ( query , candidates ));
let results = join_all ( hydrate_futures ) . await ;
This maximizes throughput for independent operations. Filters and Scorers run sequentially:for filter in filters {
let result = filter . filter ( query , candidates ) . await ? ;
candidates = result . kept;
}
This ensures each stage operates on the output of the previous stage.
Error Handling
The framework provides graceful error handling:
Component failures are logged but don’t crash the pipeline
Sources : Partial results from successful sources are used
Hydrators : Failed hydrations skip update, candidates proceed
Filters : Failed filters use backup (pre-filter candidates)
Scorers : Failed scorers skip update, candidates use existing scores
match hydrator . hydrate ( query , candidates ) . await {
Ok ( hydrated ) => hydrator . update_all ( & mut candidates , hydrated ),
Err ( err ) => {
error! ( "Hydrator {} failed: {}" , hydrator . name (), err );
// Continue with non-hydrated candidates
}
}
Building a Pipeline
Example: Building the Phoenix Candidate Pipeline for Home Mixer
pub struct PhoenixCandidatePipeline {
query_hydrators : Vec < Box < dyn QueryHydrator < ScoredPostsQuery >>>,
sources : Vec < Box < dyn Source < ScoredPostsQuery , PostCandidate >>>,
hydrators : Vec < Box < dyn Hydrator < ScoredPostsQuery , PostCandidate >>>,
filters : Vec < Box < dyn Filter < ScoredPostsQuery , PostCandidate >>>,
scorers : Vec < Box < dyn Scorer < ScoredPostsQuery , PostCandidate >>>,
selector : TopKScoreSelector ,
// ...
}
impl PhoenixCandidatePipeline {
pub fn new () -> Self {
// Assemble components
let query_hydrators = vec! [
Box :: new ( UserActionSeqQueryHydrator :: new ()),
Box :: new ( UserFeaturesQueryHydrator :: new ()),
];
let sources = vec! [
Box :: new ( PhoenixSource :: new ()),
Box :: new ( ThunderSource :: new ()),
];
let hydrators = vec! [
Box :: new ( CoreDataHydrator :: new ()),
Box :: new ( GizmoduckHydrator :: new ()),
];
let filters = vec! [
Box :: new ( DropDuplicatesFilter ),
Box :: new ( AgeFilter :: new ( Duration :: from_secs ( 86400 * 2 ))),
Box :: new ( SelfTweetFilter ),
];
let scorers = vec! [
Box :: new ( PhoenixScorer :: new ()),
Box :: new ( WeightedScorer ),
];
PhoenixCandidatePipeline {
query_hydrators ,
sources ,
hydrators ,
filters ,
scorers ,
selector : TopKScoreSelector ,
// ...
}
}
}
// Implement the CandidatePipeline trait
impl CandidatePipeline < ScoredPostsQuery , PostCandidate > for PhoenixCandidatePipeline {
fn sources ( & self ) -> & [ Box < dyn Source < ScoredPostsQuery , PostCandidate >>] {
& self . sources
}
fn filters ( & self ) -> & [ Box < dyn Filter < ScoredPostsQuery , PostCandidate >>] {
& self . filters
}
// ... implement other methods
}
Monitoring and Logging
The framework automatically logs each stage:
INFO request_id=abc123 stage=Source component=ThunderSource fetched 1000 candidates
INFO request_id=abc123 stage=Source component=PhoenixSource fetched 500 candidates
INFO request_id=abc123 stage=Filter kept 1200, removed 300
INFO request_id=abc123 stage=Scorer component=PhoenixScorer completed
Stage timings and component success rates are tracked via metrics.
Benefits
Composability Build complex pipelines from simple, reusable components
Testability Each component can be tested independently
Observability Built-in logging and metrics for every stage
Flexibility Components can be enabled/disabled per request
Home Mixer Production implementation using Candidate Pipeline
Phoenix ML components integrated as Sources and Scorers