Skip to main content

Overview

The Candidate Pipeline is a composable, trait-based framework for building recommendation systems in Rust. It provides a structured approach to fetching, enriching, filtering, scoring, and selecting candidates through a series of well-defined stages.

Core Trait

The CandidatePipeline trait is the main abstraction that defines the entire pipeline execution flow:
candidate-pipeline/candidate_pipeline.rs
#[async_trait]
pub trait CandidatePipeline<Q, C>: Send + Sync
where
    Q: HasRequestId + Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
    fn sources(&self) -> &[Box<dyn Source<Q, C>>];
    fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
    fn selector(&self) -> &dyn Selector<Q, C>;
    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
    fn result_size(&self) -> usize;

    async fn execute(&self, query: Q) -> PipelineResult<Q, C>;
}

Pipeline Stages

The pipeline executes in the following order:
1

Query Hydration

All query hydrators run in parallel to enrich the query with additional data (user features, experiment flags, etc.)
2

Candidate Fetching

All sources run in parallel to retrieve candidate posts from different systems (Thunder, Phoenix, etc.)
3

Candidate Hydration

All hydrators run in parallel to enrich candidates with additional features (author info, video duration, etc.)
4

Filtering

Filters run sequentially to remove unwanted candidates (age, blocked authors, etc.)
5

Scoring

Scorers run sequentially to compute prediction scores and ranking signals
6

Selection

The selector sorts and truncates candidates based on scores
7

Post-Selection Hydration

Additional hydrators run in parallel on the selected candidates
8

Post-Selection Filtering

Additional filters run sequentially on the selected candidates
9

Side Effects

Side effects run asynchronously (logging, metrics, etc.) without blocking the response

Pipeline Stages Enum

candidate-pipeline/candidate_pipeline.rs
#[derive(Copy, Clone, Debug)]
pub enum PipelineStage {
    QueryHydrator,
    Source,
    Hydrator,
    PostSelectionHydrator,
    Filter,
    PostSelectionFilter,
    Scorer,
}
This enum is used for logging and metrics to identify which stage a component belongs to.

Error Handling

The framework has robust error handling at each stage:
If a query hydrator fails, the error is logged and the pipeline continues with the original query data.
If a source fails, the error is logged and the pipeline continues with candidates from other sources.
If a hydrator fails or returns mismatched length, the error is logged and candidates remain unmodified.
If a filter fails, the error is logged and the original candidate list is restored (no candidates are filtered).
If a scorer fails or returns mismatched length, the error is logged and candidates remain unscored.

Execution Flow

The main execution method orchestrates the entire pipeline:
candidate-pipeline/candidate_pipeline.rs
async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
    let hydrated_query = self.hydrate_query(query).await;
    
    let candidates = self.fetch_candidates(&hydrated_query).await;
    
    let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;
    
    let (kept_candidates, mut filtered_candidates) = self
        .filter(&hydrated_query, hydrated_candidates.clone())
        .await;
    
    let scored_candidates = self.score(&hydrated_query, kept_candidates).await;
    
    let selected_candidates = self.select(&hydrated_query, scored_candidates);
    
    let post_selection_hydrated_candidates = self
        .hydrate_post_selection(&hydrated_query, selected_candidates)
        .await;
    
    let (mut final_candidates, post_selection_filtered_candidates) = self
        .filter_post_selection(&hydrated_query, post_selection_hydrated_candidates)
        .await;
    filtered_candidates.extend(post_selection_filtered_candidates);
    
    final_candidates.truncate(self.result_size());
    
    // Side effects run asynchronously
    let arc_hydrated_query = Arc::new(hydrated_query);
    let input = Arc::new(SideEffectInput {
        query: arc_hydrated_query.clone(),
        selected_candidates: final_candidates.clone(),
    });
    self.run_side_effects(input);
    
    PipelineResult {
        retrieved_candidates: hydrated_candidates,
        filtered_candidates,
        selected_candidates: final_candidates,
        query: arc_hydrated_query,
    }
}

Pipeline Result

The pipeline returns a PipelineResult containing:
candidate-pipeline/candidate_pipeline.rs
pub struct PipelineResult<Q, C> {
    pub retrieved_candidates: Vec<C>,
    pub filtered_candidates: Vec<C>,
    pub selected_candidates: Vec<C>,
    pub query: Arc<Q>,
}
retrieved_candidates
Vec<C>
All candidates after hydration, before filtering
filtered_candidates
Vec<C>
Candidates that were filtered out during the pipeline
selected_candidates
Vec<C>
Final ranked candidates returned to the user
query
Arc<Q>
The hydrated query object

HasRequestId Trait

All query types must implement the HasRequestId trait for logging and tracing:
candidate-pipeline/candidate_pipeline.rs
pub trait HasRequestId {
    fn request_id(&self) -> &str;
}

Parallel vs Sequential Execution

Parallel stages (query hydrators, sources, hydrators) use join_all to run all components concurrently for maximum performance.Sequential stages (filters, scorers) run components one at a time, allowing each component to see the results of the previous one.

Sources

Learn about candidate sources (Thunder, Phoenix)

Hydrators

Enrich candidates and queries with features

Filters

Remove unwanted candidates from the pipeline

Scorers

Compute prediction scores for ranking

Build docs developers (and LLMs) love