Skip to main content

Overview

The Candidate Pipeline is a flexible, extensible framework for building multi-stage recommendation systems. It provides a clean abstraction for composing complex pipelines from reusable components. The framework powers Home Mixer and can be used to build any candidate retrieval and ranking system.
Design Philosophy: Separate pipeline execution and monitoring from business logic, enabling engineers to focus on building individual components rather than orchestration.

Core Concepts

A CandidatePipeline orchestrates the flow of candidates through multiple stages:
┌────────────────────────────────────────────────────────────┐
CANDIDATE PIPELINE
├────────────────────────────────────────────────────────────┤
│                                                            │
Query
│    ↓                                                       │
QueryHydrators (parallel)                                 │
│    ↓                                                       │
Sources (parallel) → Candidates
│    ↓                                                       │
Hydrators (parallel)                                      │
│    ↓                                                       │
Filters (sequential)                                      │
│    ↓                                                       │
Scorers (sequential)                                      │
│    ↓                                                       │
Selector
│    ↓                                                       │
Post-Selection Hydrators (parallel)                       │
│    ↓                                                       │
Post-Selection Filters (sequential)                       │
│    ↓                                                       │
SideEffects (async)                                       │
│    ↓                                                       │
Results
│                                                            │
└────────────────────────────────────────────────────────────┘

Pipeline Stages

The framework defines traits for each pipeline stage:

QueryHydrator

Enriches the query with additional context before retrieving candidates.
candidate-pipeline/query_hydrator.rs
#[async_trait]
pub trait QueryHydrator<Q>: Send + Sync {
    /// Name for logging/debugging
    fn name(&self) -> &str;
    
    /// Whether this hydrator should run for this query
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Fetch additional data for the query
    async fn hydrate(&self, query: &Q) -> Result<Box<dyn Any + Send>, String>;
    
    /// Update query with hydrated data
    fn update(&self, query: &mut Q, hydrated: Box<dyn Any + Send>);
}
Example: Fetch user’s engagement history before candidate retrieval
pub struct UserActionSeqQueryHydrator {
    uas_fetcher: Arc<UserActionSequenceFetcher>,
}

impl QueryHydrator<ScoredPostsQuery> for UserActionSeqQueryHydrator {
    async fn hydrate(&self, query: &ScoredPostsQuery) -> Result<Box<dyn Any + Send>> {
        let sequence = self.uas_fetcher
            .fetch_user_action_sequence(query.user_id)
            .await?;
        Ok(Box::new(sequence))
    }
    
    fn update(&self, query: &mut ScoredPostsQuery, hydrated: Box<dyn Any + Send>) {
        if let Ok(sequence) = hydrated.downcast::<UserActionSequence>() {
            query.user_action_sequence = Some(*sequence);
        }
    }
}

Source

Retrieves candidate items from a data source.
candidate-pipeline/source.rs
#[async_trait]
pub trait Source<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Fetch candidates from this source
    async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;
}
Example: Retrieve posts from Thunder
pub struct ThunderSource {
    pub thunder_client: Arc<ThunderClient>,
}

impl Source<ScoredPostsQuery, PostCandidate> for ThunderSource {
    async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>> {
        let response = self.thunder_client
            .get_in_network_posts(query.user_id, query.following_ids)
            .await?;
        
        Ok(response.posts.into_iter()
            .map(|p| PostCandidate::from(p))
            .collect())
    }
}

Hydrator

Enriches candidates with additional features.
candidate-pipeline/hydrator.rs
#[async_trait]
pub trait Hydrator<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Hydrate all candidates
    async fn hydrate(&self, query: &Q, candidates: &[C]) 
        -> Result<Vec<Box<dyn Any + Send>>, String>;
    
    /// Update a single candidate with hydrated data
    fn update(&self, candidate: &mut C, hydrated: Box<dyn Any + Send>);
    
    /// Update all candidates (default implementation calls update() for each)
    fn update_all(&self, candidates: &mut [C], hydrated: Vec<Box<dyn Any + Send>>) {
        for (candidate, h) in candidates.iter_mut().zip(hydrated) {
            self.update(candidate, h);
        }
    }
}
Example: Fetch author information
pub struct GizmoduckCandidateHydrator {
    gizmoduck_client: Arc<GizmoduckClient>,
}

impl Hydrator<ScoredPostsQuery, PostCandidate> for GizmoduckCandidateHydrator {
    async fn hydrate(&self, _query: &ScoredPostsQuery, candidates: &[PostCandidate]) 
        -> Result<Vec<Box<dyn Any + Send>>> {
        
        let author_ids: Vec<u64> = candidates.iter()
            .map(|c| c.author_id)
            .collect();
        
        let users = self.gizmoduck_client
            .get_users(author_ids)
            .await?;
        
        Ok(users.into_iter()
            .map(|u| Box::new(u) as Box<dyn Any + Send>)
            .collect())
    }
    
    fn update(&self, candidate: &mut PostCandidate, hydrated: Box<dyn Any + Send>) {
        if let Ok(user) = hydrated.downcast::<User>() {
            candidate.author_screen_name = Some(user.screen_name);
            candidate.is_verified = Some(user.is_verified);
        }
    }
}

Filter

Removes candidates that shouldn’t be shown to the user.
candidate-pipeline/filter.rs
pub struct FilterResult<C> {
    pub kept: Vec<C>,
    pub removed: Vec<C>,
}

#[async_trait]
pub trait Filter<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Filter candidates into kept and removed
    async fn filter(&self, query: &Q, candidates: Vec<C>) 
        -> Result<FilterResult<C>, String>;
}
Example: Remove user’s own posts
pub struct SelfTweetFilter;

impl Filter<ScoredPostsQuery, PostCandidate> for SelfTweetFilter {
    async fn filter(&self, query: &ScoredPostsQuery, candidates: Vec<PostCandidate>) 
        -> Result<FilterResult<PostCandidate>> {
        
        let (kept, removed): (Vec<_>, Vec<_>) = candidates.into_iter()
            .partition(|c| c.author_id != query.user_id);
        
        Ok(FilterResult { kept, removed })
    }
}

Scorer

Computes scores for ranking candidates.
candidate-pipeline/scorer.rs
#[async_trait]
pub trait Scorer<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Compute scores for all candidates
    async fn score(&self, query: &Q, candidates: &[C]) 
        -> Result<Vec<Box<dyn Any + Send>>, String>;
    
    /// Update a candidate with its score
    fn update(&self, candidate: &mut C, scored: Box<dyn Any + Send>);
    
    /// Update all candidates (default implementation calls update() for each)
    fn update_all(&self, candidates: &mut [C], scored: Vec<Box<dyn Any + Send>>) {
        for (candidate, s) in candidates.iter_mut().zip(scored) {
            self.update(candidate, s);
        }
    }
}
Example: Phoenix ML scorer
pub struct PhoenixScorer {
    phoenix_client: Arc<PhoenixPredictionClient>,
}

impl Scorer<ScoredPostsQuery, PostCandidate> for PhoenixScorer {
    async fn score(&self, query: &ScoredPostsQuery, candidates: &[PostCandidate]) 
        -> Result<Vec<Box<dyn Any + Send>>> {
        
        let predictions = self.phoenix_client
            .predict(query.user_id, candidates)
            .await?;
        
        Ok(predictions.into_iter()
            .map(|p| Box::new(p) as Box<dyn Any + Send>)
            .collect())
    }
    
    fn update(&self, candidate: &mut PostCandidate, scored: Box<dyn Any + Send>) {
        if let Ok(predictions) = scored.downcast::<PhoenixScores>() {
            candidate.phoenix_scores = *predictions;
        }
    }
}

Selector

Sorts and selects top candidates.
candidate-pipeline/selector.rs
pub trait Selector<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: &Q) -> bool { true }
    
    /// Select and reorder candidates
    fn select(&self, query: &Q, candidates: Vec<C>) -> Vec<C>;
}
Example: Select top K by score
pub struct TopKScoreSelector;

impl Selector<ScoredPostsQuery, PostCandidate> for TopKScoreSelector {
    fn select(&self, query: &ScoredPostsQuery, mut candidates: Vec<PostCandidate>) 
        -> Vec<PostCandidate> {
        
        // Sort by score descending
        candidates.sort_by(|a, b| {
            b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
        });
        
        // Take top K
        candidates.truncate(query.max_results);
        candidates
    }
}

SideEffect

Runs asynchronous actions that don’t affect the pipeline result.
candidate-pipeline/side_effect.rs
pub struct SideEffectInput<Q, C> {
    pub query: Arc<Q>,
    pub selected_candidates: Vec<C>,
}

#[async_trait]
pub trait SideEffect<Q, C>: Send + Sync {
    fn name(&self) -> &str;
    fn enable(&self, query: Arc<Q>) -> bool { true }
    
    /// Execute side effect asynchronously
    async fn run(&self, input: Arc<SideEffectInput<Q, C>>) -> Result<(), String>;
}
Example: Cache request info
pub struct CacheRequestInfoSideEffect {
    strato_client: Arc<StratoClient>,
}

impl SideEffect<ScoredPostsQuery, PostCandidate> for CacheRequestInfoSideEffect {
    async fn run(&self, input: Arc<SideEffectInput<ScoredPostsQuery, PostCandidate>>) 
        -> Result<()> {
        
        let request_info = RequestInfo {
            user_id: input.query.user_id,
            served_tweet_ids: input.selected_candidates.iter()
                .map(|c| c.tweet_id)
                .collect(),
            timestamp: Instant::now(),
        };
        
        self.strato_client.cache_request_info(request_info).await?;
        Ok(())
    }
}

Pipeline Execution

The CandidatePipeline trait orchestrates all stages:
candidate-pipeline/candidate_pipeline.rs
#[async_trait]
pub trait CandidatePipeline<Q, C>: Send + Sync {
    // Component accessors
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
    fn sources(&self) -> &[Box<dyn Source<Q, C>>];
    fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
    fn selector(&self) -> &dyn Selector<Q, C>;
    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
    fn result_size(&self) -> usize;
    
    /// Execute the complete pipeline
    async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
        // 1. Hydrate query
        let hydrated_query = self.hydrate_query(query).await;
        
        // 2. Fetch candidates from all sources (parallel)
        let candidates = self.fetch_candidates(&hydrated_query).await;
        
        // 3. Hydrate candidates (parallel)
        let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;
        
        // 4. Filter candidates (sequential)
        let (kept_candidates, filtered) = self
            .filter(&hydrated_query, hydrated_candidates)
            .await;
        
        // 5. Score candidates (sequential)
        let scored_candidates = self.score(&hydrated_query, kept_candidates).await;
        
        // 6. Select top candidates
        let selected_candidates = self.select(&hydrated_query, scored_candidates);
        
        // 7. Post-selection hydration and filtering
        let final_candidates = self
            .hydrate_post_selection(&hydrated_query, selected_candidates)
            .await;
        let (final_candidates, post_filtered) = self
            .filter_post_selection(&hydrated_query, final_candidates)
            .await;
        
        // 8. Run side effects asynchronously
        self.run_side_effects(Arc::new(SideEffectInput {
            query: Arc::new(hydrated_query),
            selected_candidates: final_candidates.clone(),
        }));
        
        PipelineResult {
            selected_candidates: final_candidates,
            // ...
        }
    }
}

Parallel vs Sequential Execution

Query Hydrators, Sources, and Hydrators run in parallel:
let hydrate_futures = hydrators.iter().map(|h| h.hydrate(query, candidates));
let results = join_all(hydrate_futures).await;
This maximizes throughput for independent operations.

Error Handling

The framework provides graceful error handling:
  • Component failures are logged but don’t crash the pipeline
  • Sources: Partial results from successful sources are used
  • Hydrators: Failed hydrations skip update, candidates proceed
  • Filters: Failed filters use backup (pre-filter candidates)
  • Scorers: Failed scorers skip update, candidates use existing scores
match hydrator.hydrate(query, candidates).await {
    Ok(hydrated) => hydrator.update_all(&mut candidates, hydrated),
    Err(err) => {
        error!("Hydrator {} failed: {}", hydrator.name(), err);
        // Continue with non-hydrated candidates
    }
}

Building a Pipeline

Example: Building the Phoenix Candidate Pipeline for Home Mixer
pub struct PhoenixCandidatePipeline {
    query_hydrators: Vec<Box<dyn QueryHydrator<ScoredPostsQuery>>>,
    sources: Vec<Box<dyn Source<ScoredPostsQuery, PostCandidate>>>,
    hydrators: Vec<Box<dyn Hydrator<ScoredPostsQuery, PostCandidate>>>,
    filters: Vec<Box<dyn Filter<ScoredPostsQuery, PostCandidate>>>,
    scorers: Vec<Box<dyn Scorer<ScoredPostsQuery, PostCandidate>>>,
    selector: TopKScoreSelector,
    // ...
}

impl PhoenixCandidatePipeline {
    pub fn new() -> Self {
        // Assemble components
        let query_hydrators = vec![
            Box::new(UserActionSeqQueryHydrator::new()),
            Box::new(UserFeaturesQueryHydrator::new()),
        ];
        
        let sources = vec![
            Box::new(PhoenixSource::new()),
            Box::new(ThunderSource::new()),
        ];
        
        let hydrators = vec![
            Box::new(CoreDataHydrator::new()),
            Box::new(GizmoduckHydrator::new()),
        ];
        
        let filters = vec![
            Box::new(DropDuplicatesFilter),
            Box::new(AgeFilter::new(Duration::from_secs(86400 * 2))),
            Box::new(SelfTweetFilter),
        ];
        
        let scorers = vec![
            Box::new(PhoenixScorer::new()),
            Box::new(WeightedScorer),
        ];
        
        PhoenixCandidatePipeline {
            query_hydrators,
            sources,
            hydrators,
            filters,
            scorers,
            selector: TopKScoreSelector,
            // ...
        }
    }
}

// Implement the CandidatePipeline trait
impl CandidatePipeline<ScoredPostsQuery, PostCandidate> for PhoenixCandidatePipeline {
    fn sources(&self) -> &[Box<dyn Source<ScoredPostsQuery, PostCandidate>>] {
        &self.sources
    }
    
    fn filters(&self) -> &[Box<dyn Filter<ScoredPostsQuery, PostCandidate>>] {
        &self.filters
    }
    
    // ... implement other methods
}

Monitoring and Logging

The framework automatically logs each stage:
INFO request_id=abc123 stage=Source component=ThunderSource fetched 1000 candidates
INFO request_id=abc123 stage=Source component=PhoenixSource fetched 500 candidates
INFO request_id=abc123 stage=Filter kept 1200, removed 300
INFO request_id=abc123 stage=Scorer component=PhoenixScorer completed
Stage timings and component success rates are tracked via metrics.

Benefits

Composability

Build complex pipelines from simple, reusable components

Testability

Each component can be tested independently

Observability

Built-in logging and metrics for every stage

Flexibility

Components can be enabled/disabled per request

Home Mixer

Production implementation using Candidate Pipeline

Phoenix

ML components integrated as Sources and Scorers

Build docs developers (and LLMs) love