Overview
The Source trait defines how candidate content is fetched from various retrieval systems. Sources run in parallel and their results are combined before continuing through the pipeline.
Trait Definition
pub trait Source<Q, C>: Any + Send + Sync
where
Q: Clone + Send + Sync + 'static,
C: Clone + Send + Sync + 'static,
{
fn enable(&self, query: &Q) -> bool;
async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;
fn name(&self) -> &'static str;
}
Type Parameters
The query type that contains request context and parametersConstraints: Clone + Send + Sync + 'static
The candidate type that represents a single content itemConstraints: Clone + Send + Sync + 'static
Methods
enable
Determines whether this source should run for the given queryfn enable(&self, query: &Q) -> bool
Reference to the query object
Returns true if this source should fetch candidates, false to skip. Default implementation returns true.
get_candidates
Fetches candidates from the source’s retrieval systemasync fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>
Reference to the query object containing request parameters
Returns a vector of candidates on success, or an error message string on failure
name
Returns a stable name for logging and metricsfn name(&self) -> &'static str
A short type name derived from the implementing struct. Default implementation uses reflection to extract the struct name.
Example Implementation
Here’s a real example from the X For You feed that retrieves candidates from the Phoenix recommendation system:
use xai_candidate_pipeline::source::Source;
use tonic::async_trait;
pub struct PhoenixSource {
pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}
#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for PhoenixSource {
fn enable(&self, query: &ScoredPostsQuery) -> bool {
// Only run for out-of-network requests
!query.in_network_only
}
async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
let user_id = query.user_id as u64;
let sequence = query
.user_action_sequence
.as_ref()
.ok_or_else(|| "PhoenixSource: missing user_action_sequence".to_string())?;
let response = self
.phoenix_retrieval_client
.retrieve(user_id, sequence.clone(), PHOENIX_MAX_RESULTS)
.await
.map_err(|e| format!("PhoenixSource: {}", e))?;
let candidates: Vec<PostCandidate> = response
.top_k_candidates
.into_iter()
.flat_map(|scored_candidates| scored_candidates.candidates)
.filter_map(|scored_candidate| scored_candidate.candidate)
.map(|tweet_info| PostCandidate {
tweet_id: tweet_info.tweet_id as i64,
author_id: tweet_info.author_id,
served_type: Some(ServedType::ForYouPhoenixRetrieval),
..Default::default()
})
.collect();
Ok(candidates)
}
}
Usage Notes
- Multiple sources can be configured to run in parallel
- Results from all enabled sources are combined before hydration
- Sources should return candidates quickly; expensive operations should be deferred to hydrators
- The
enable method allows dynamic source selection based on query parameters
- Error messages should be descriptive for debugging and monitoring
See Also