Skip to main content

Overview

The Source trait defines how candidate content is fetched from various retrieval systems. Sources run in parallel and their results are combined before continuing through the pipeline.

Trait Definition

pub trait Source<Q, C>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    fn enable(&self, query: &Q) -> bool;
    async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;
    fn name(&self) -> &'static str;
}

Type Parameters

Q
generic
The query type that contains request context and parametersConstraints: Clone + Send + Sync + 'static
C
generic
The candidate type that represents a single content itemConstraints: Clone + Send + Sync + 'static

Methods

enable

enable
fn
Determines whether this source should run for the given query
fn enable(&self, query: &Q) -> bool
query
&Q
Reference to the query object
return
bool
Returns true if this source should fetch candidates, false to skip. Default implementation returns true.

get_candidates

get_candidates
async fn
Fetches candidates from the source’s retrieval system
async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>
query
&Q
Reference to the query object containing request parameters
return
Result<Vec<C>, String>
Returns a vector of candidates on success, or an error message string on failure

name

name
fn
Returns a stable name for logging and metrics
fn name(&self) -> &'static str
return
&'static str
A short type name derived from the implementing struct. Default implementation uses reflection to extract the struct name.

Example Implementation

Here’s a real example from the X For You feed that retrieves candidates from the Phoenix recommendation system:
use xai_candidate_pipeline::source::Source;
use tonic::async_trait;

pub struct PhoenixSource {
    pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for PhoenixSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        // Only run for out-of-network requests
        !query.in_network_only
    }

    async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let user_id = query.user_id as u64;

        let sequence = query
            .user_action_sequence
            .as_ref()
            .ok_or_else(|| "PhoenixSource: missing user_action_sequence".to_string())?;

        let response = self
            .phoenix_retrieval_client
            .retrieve(user_id, sequence.clone(), PHOENIX_MAX_RESULTS)
            .await
            .map_err(|e| format!("PhoenixSource: {}", e))?;

        let candidates: Vec<PostCandidate> = response
            .top_k_candidates
            .into_iter()
            .flat_map(|scored_candidates| scored_candidates.candidates)
            .filter_map(|scored_candidate| scored_candidate.candidate)
            .map(|tweet_info| PostCandidate {
                tweet_id: tweet_info.tweet_id as i64,
                author_id: tweet_info.author_id,
                served_type: Some(ServedType::ForYouPhoenixRetrieval),
                ..Default::default()
            })
            .collect();

        Ok(candidates)
    }
}

Usage Notes

  • Multiple sources can be configured to run in parallel
  • Results from all enabled sources are combined before hydration
  • Sources should return candidates quickly; expensive operations should be deferred to hydrators
  • The enable method allows dynamic source selection based on query parameters
  • Error messages should be descriptive for debugging and monitoring

See Also

Build docs developers (and LLMs) love