Skip to main content

Overview

Sources are the entry point for candidates in the pipeline. They fetch raw candidates from various retrieval systems and services. All sources run in parallel to maximize throughput.

Source Trait

The Source trait defines the interface for fetching candidates:
candidate-pipeline/source.rs
#[async_trait]
pub trait Source<Q, C>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    /// Decide if this source should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}
enable
fn(&self, &Q) -> bool
Optional gating logic to conditionally enable/disable the source based on query parameters
get_candidates
async fn(&self, &Q) -> Result<Vec<C>, String>
The main method that fetches candidates. Returns a vector of candidates or an error message.
name
fn(&self) -> &'static str
Returns the source name for logging and metrics. Auto-generated from the type name by default.

Thunder Source

The Thunder source fetches in-network (following) posts from the Thunder service:
home-mixer/sources/thunder_source.rs
pub struct ThunderSource {
    pub thunder_client: Arc<ThunderClient>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for ThunderSource {
    async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let cluster = ThunderCluster::Amp;
        let channel = self
            .thunder_client
            .get_random_channel(cluster)
            .ok_or_else(|| "ThunderSource: no available channel".to_string())?;

        let mut client = InNetworkPostsServiceClient::new(channel.clone());
        let following_list = &query.user_features.followed_user_ids;
        let request = GetInNetworkPostsRequest {
            user_id: query.user_id as u64,
            following_user_ids: following_list.iter().map(|&id| id as u64).collect(),
            max_results: p::THUNDER_MAX_RESULTS,
            exclude_tweet_ids: vec![],
            algorithm: "default".to_string(),
            debug: false,
            is_video_request: false,
        };

        let response = client
            .get_in_network_posts(request)
            .await
            .map_err(|e| format!("ThunderSource: {}", e))?;

        let candidates: Vec<PostCandidate> = response
            .into_inner()
            .posts
            .into_iter()
            .map(|post| PostCandidate {
                tweet_id: post.post_id,
                author_id: post.author_id as u64,
                in_reply_to_tweet_id: post.in_reply_to_post_id.and_then(|id| u64::try_from(id).ok()),
                served_type: Some(pb::ServedType::ForYouInNetwork),
                ..Default::default()
            })
            .collect();

        Ok(candidates)
    }
}
Thunder provides in-network posts from accounts the user follows. This is the “Following” content in the For You feed.

Thunder Features

  • Fetches posts from accounts the user follows
  • Uses the following list from query.user_features.followed_user_ids
  • Returns up to THUNDER_MAX_RESULTS candidates
  • Marks candidates with ServedType::ForYouInNetwork
  • Includes reply chain information (ancestors)

Phoenix Source

The Phoenix source fetches out-of-network (discovery) posts from the Phoenix retrieval system:
home-mixer/sources/phoenix_source.rs
pub struct PhoenixSource {
    pub phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient + Send + Sync>,
}

#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for PhoenixSource {
    fn enable(&self, query: &ScoredPostsQuery) -> bool {
        !query.in_network_only
    }

    async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>, String> {
        let user_id = query.user_id as u64;

        let sequence = query
            .user_action_sequence
            .as_ref()
            .ok_or_else(|| "PhoenixSource: missing user_action_sequence".to_string())?;

        let response = self
            .phoenix_retrieval_client
            .retrieve(user_id, sequence.clone(), p::PHOENIX_MAX_RESULTS)
            .await
            .map_err(|e| format!("PhoenixSource: {}", e))?;

        let candidates: Vec<PostCandidate> = response
            .top_k_candidates
            .into_iter()
            .flat_map(|scored_candidates| scored_candidates.candidates)
            .filter_map(|scored_candidate| scored_candidate.candidate)
            .map(|tweet_info| PostCandidate {
                tweet_id: tweet_info.tweet_id as i64,
                author_id: tweet_info.author_id,
                in_reply_to_tweet_id: Some(tweet_info.in_reply_to_tweet_id),
                served_type: Some(pb::ServedType::ForYouPhoenixRetrieval),
                ..Default::default()
            })
            .collect();

        Ok(candidates)
    }
}
Phoenix provides out-of-network posts from accounts the user doesn’t follow. This is the “Recommended” content in the For You feed.

Phoenix Features

  • Fetches posts from accounts the user doesn’t follow
  • Uses the user action sequence for personalization
  • Conditionally disabled when query.in_network_only is true
  • Returns up to PHOENIX_MAX_RESULTS candidates
  • Marks candidates with ServedType::ForYouPhoenixRetrieval
  • Requires user_action_sequence to be present in the query

Parallel Execution

Sources run in parallel for maximum performance:
candidate-pipeline/candidate_pipeline.rs
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
    let request_id = query.request_id().to_string();
    let sources: Vec<_> = self.sources().iter().filter(|s| s.enable(query)).collect();
    let source_futures = sources.iter().map(|s| s.get_candidates(query));
    let results = join_all(source_futures).await;

    let mut collected = Vec::new();
    for (source, result) in sources.iter().zip(results) {
        match result {
            Ok(mut candidates) => {
                info!(
                    "request_id={} stage={:?} component={} fetched {} candidates",
                    request_id,
                    PipelineStage::Source,
                    source.name(),
                    candidates.len()
                );
                collected.append(&mut candidates);
            }
            Err(err) => {
                error!(
                    "request_id={} stage={:?} component={} failed: {}",
                    request_id,
                    PipelineStage::Source,
                    source.name(),
                    err
                );
            }
        }
    }
    collected
}

Error Handling

If a source fails, the error is logged but the pipeline continues with candidates from other sources. This ensures partial failures don’t break the entire feed.

Conditional Enabling

Sources can be conditionally enabled based on query parameters:
fn enable(&self, query: &ScoredPostsQuery) -> bool {
    !query.in_network_only
}
This allows dynamic source selection based on:
  • User experiments/flags
  • Query parameters
  • A/B test assignments
  • Feature flags

Source Configuration

Typical source configuration:
const THUNDER_MAX_RESULTS: i32 = 800;

Best Practices

Always prefix error messages with the source name for easier debugging:
.map_err(|e| format!("ThunderSource: {}", e))
Validate required query fields before making external calls:
let sequence = query
    .user_action_sequence
    .as_ref()
    .ok_or_else(|| "PhoenixSource: missing user_action_sequence".to_string())?;
Always set the served_type field to track candidate provenance:
served_type: Some(pb::ServedType::ForYouPhoenixRetrieval)
Use the stats macro for automatic metrics collection:
#[xai_stats_macro::receive_stats]
async fn get_candidates(...) -> Result<Vec<PostCandidate>, String> {
    // ...
}

Pipeline Framework

Understand the overall pipeline execution flow

Hydrators

Enrich candidates after they’re fetched

Phoenix Retrieval

Deep dive into Phoenix retrieval system

Thunder Service

Thunder in-network service details

Build docs developers (and LLMs) love