Skip to main content

Overview

Hydrators enrich candidates and queries with additional data by calling external services. They run in parallel to maximize performance and can be used at multiple stages of the pipeline.

Hydrator Types

There are two types of hydrators:

Query Hydrators

Enrich the query with user features, experiment flags, and configuration

Candidate Hydrators

Enrich candidates with author info, media metadata, and additional features

Candidate Hydrator Trait

The Hydrator trait defines the interface for enriching candidates:
candidate-pipeline/hydrator.rs
#[async_trait]
pub trait Hydrator<Q, C>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    /// Decide if this hydrator should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    /// Hydrate candidates by performing async operations.
    /// Returns candidates with this hydrator's fields populated.
    ///
    /// IMPORTANT: The returned vector must have the same candidates in the same order as the input.
    /// Dropping candidates in a hydrator is not allowed - use a filter stage instead.
    async fn hydrate(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;

    /// Update a single candidate with the hydrated fields.
    /// Only the fields this hydrator is responsible for should be copied.
    fn update(&self, candidate: &mut C, hydrated: C);

    /// Update all candidates with the hydrated fields from `hydrated`.
    /// Default implementation iterates and calls `update` for each pair.
    fn update_all(&self, candidates: &mut [C], hydrated: Vec<C>) {
        for (c, h) in candidates.iter_mut().zip(hydrated) {
            self.update(c, h);
        }
    }

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}
The hydrate method must return candidates in the same order as the input. The returned vector must have the same length. Dropping or reordering candidates is not allowed - use a filter instead.

Query Hydrator Trait

The QueryHydrator trait defines the interface for enriching queries:
candidate-pipeline/query_hydrator.rs
#[async_trait]
pub trait QueryHydrator<Q>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
{
    /// Decide if this query hydrator should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    /// Hydrate the query by performing async operations.
    /// Returns a new query with this hydrator's fields populated.
    async fn hydrate(&self, query: &Q) -> Result<Q, String>;

    /// Update the query with the hydrated fields.
    /// Only the fields this hydrator is responsible for should be copied.
    fn update(&self, query: &mut Q, hydrated: Q);

    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}

Gizmoduck Hydrator

The Gizmoduck hydrator enriches candidates with author profile information:
home-mixer/candidate_hydrators/gizmoduck_hydrator.rs
pub struct GizmoduckCandidateHydrator {
    pub gizmoduck_client: Arc<dyn GizmoduckClient + Send + Sync>,
}

#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for GizmoduckCandidateHydrator {
    async fn hydrate(
        &self,
        _query: &ScoredPostsQuery,
        candidates: &[PostCandidate],
    ) -> Result<Vec<PostCandidate>, String> {
        let client = &self.gizmoduck_client;

        // Collect unique user IDs to fetch
        let author_ids: Vec<_> = candidates.iter().map(|c| c.author_id).collect();
        let author_ids: Vec<_> = author_ids.iter().map(|&x| x as i64).collect();
        let retweet_user_ids: Vec<_> = candidates.iter().map(|c| c.retweeted_user_id).collect();
        let retweet_user_ids: Vec<_> = retweet_user_ids.iter().flatten().collect();
        let retweet_user_ids: Vec<_> = retweet_user_ids.iter().map(|&&x| x as i64).collect();

        let mut user_ids_to_fetch = Vec::with_capacity(author_ids.len() + retweet_user_ids.len());
        user_ids_to_fetch.extend(author_ids);
        user_ids_to_fetch.extend(retweet_user_ids);
        user_ids_to_fetch.dedup();

        // Batch fetch all users
        let users = client.get_users(user_ids_to_fetch).await;
        let users = users.map_err(|e| e.to_string())?;

        // Hydrate each candidate
        let mut hydrated_candidates = Vec::with_capacity(candidates.len());
        for candidate in candidates {
            let user = users
                .get(&(candidate.author_id as i64))
                .and_then(|user| user.as_ref());
            let user_counts = user.and_then(|user| user.user.as_ref().map(|u| &u.counts));
            let user_profile = user.and_then(|user| user.user.as_ref().map(|u| &u.profile));

            let author_followers_count: Option<i32> =
                user_counts.map(|x| x.followers_count).map(|x| x as i32);
            let author_screen_name: Option<String> = user_profile.map(|x| x.screen_name.clone());

            let retweet_user = candidate
                .retweeted_user_id
                .and_then(|retweeted_user_id| users.get(&(retweeted_user_id as i64)))
                .and_then(|user| user.as_ref());
            let retweet_profile =
                retweet_user.and_then(|user| user.user.as_ref().map(|u| &u.profile));
            let retweeted_screen_name: Option<String> =
                retweet_profile.map(|x| x.screen_name.clone());

            let hydrated = PostCandidate {
                author_followers_count,
                author_screen_name,
                retweeted_screen_name,
                ..Default::default()
            };
            hydrated_candidates.push(hydrated);
        }

        Ok(hydrated_candidates)
    }

    fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
        candidate.author_followers_count = hydrated.author_followers_count;
        candidate.author_screen_name = hydrated.author_screen_name;
        candidate.retweeted_screen_name = hydrated.retweeted_screen_name;
    }
}

Gizmoduck Features

  • Fetches user profile data for authors and retweeters
  • Batch fetches to minimize RPC calls
  • Deduplicates user IDs before fetching
  • Hydrates: author_followers_count, author_screen_name, retweeted_screen_name

Video Duration Hydrator

The Video Duration hydrator enriches candidates with video metadata:
home-mixer/candidate_hydrators/video_duration_candidate_hydrator.rs
pub struct VideoDurationCandidateHydrator {
    pub tes_client: Arc<dyn TESClient + Send + Sync>,
}

#[async_trait]
impl Hydrator<ScoredPostsQuery, PostCandidate> for VideoDurationCandidateHydrator {
    async fn hydrate(
        &self,
        _query: &ScoredPostsQuery,
        candidates: &[PostCandidate],
    ) -> Result<Vec<PostCandidate>, String> {
        let client = &self.tes_client;
        let tweet_ids = candidates.iter().map(|c| c.tweet_id).collect::<Vec<_>>();

        let post_features = client.get_tweet_media_entities(tweet_ids.clone()).await;
        let post_features = post_features.map_err(|e| e.to_string())?;

        let mut hydrated_candidates = Vec::with_capacity(candidates.len());
        for tweet_id in tweet_ids {
            let post_features = post_features.get(&tweet_id);
            let media_entities = post_features.and_then(|x| x.as_ref());

            let video_duration_ms = media_entities.and_then(|entities| {
                entities.iter().find_map(|entity| {
                    if let Some(MediaInfo::VideoInfo(video_info)) = &entity.media_info {
                        Some(video_info.duration_millis)
                    } else {
                        None
                    }
                })
            });

            let hydrated = PostCandidate {
                video_duration_ms,
                ..Default::default()
            };
            hydrated_candidates.push(hydrated);
        }

        Ok(hydrated_candidates)
    }

    fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
        candidate.video_duration_ms = hydrated.video_duration_ms;
    }
}

Video Duration Features

  • Fetches media entities from Tweet Entity Service (TES)
  • Extracts video duration for video posts
  • Used by scorers to apply video-specific weights
  • Hydrates: video_duration_ms

Available Hydrators

The home-mixer service includes these candidate hydrators:

CoreDataCandidateHydrator

Hydrates core tweet data (text, media, metrics)

GizmoduckHydrator

Hydrates author profile information

InNetworkCandidateHydrator

Hydrates social graph features

SubscriptionHydrator

Hydrates subscription/premium status

VFCandidateHydrator

Hydrates visibility filtering results

VideoDurationCandidateHydrator

Hydrates video duration metadata

Parallel Execution

Hydrators run in parallel for maximum performance:
candidate-pipeline/candidate_pipeline.rs
async fn run_hydrators(
    &self,
    query: &Q,
    mut candidates: Vec<C>,
    hydrators: &[Box<dyn Hydrator<Q, C>>],
    stage: PipelineStage,
) -> Vec<C> {
    let request_id = query.request_id().to_string();
    let hydrators: Vec<_> = hydrators.iter().filter(|h| h.enable(query)).collect();
    let expected_len = candidates.len();
    let hydrate_futures = hydrators.iter().map(|h| h.hydrate(query, &candidates));
    let results = join_all(hydrate_futures).await;
    
    for (hydrator, result) in hydrators.iter().zip(results) {
        match result {
            Ok(hydrated) => {
                if hydrated.len() == expected_len {
                    hydrator.update_all(&mut candidates, hydrated);
                } else {
                    warn!(
                        "request_id={} stage={:?} component={} skipped: length_mismatch expected={} got={}",
                        request_id, stage, hydrator.name(), expected_len, hydrated.len()
                    );
                }
            }
            Err(err) => {
                error!(
                    "request_id={} stage={:?} component={} failed: {}",
                    request_id, stage, hydrator.name(), err
                );
            }
        }
    }
    candidates
}

Error Handling

If a hydrator fails or returns mismatched length, the error is logged and candidates remain unmodified. This ensures partial failures don’t break the pipeline.

Best Practices

Always batch external calls to minimize RPC overhead:
let tweet_ids = candidates.iter().map(|c| c.tweet_id).collect();
let results = client.get_tweets(tweet_ids).await;
The hydrated vector must match the input order and length:
let mut hydrated_candidates = Vec::with_capacity(candidates.len());
for candidate in candidates {
    // Process each candidate in order
    hydrated_candidates.push(hydrated);
}
Only update fields this hydrator is responsible for:
fn update(&self, candidate: &mut PostCandidate, hydrated: PostCandidate) {
    candidate.video_duration_ms = hydrated.video_duration_ms;
    // Don't touch other fields
}
Use Default::default() for fields you don’t hydrate:
PostCandidate {
    video_duration_ms,
    ..Default::default()
}

Hydration Stages

Hydrators can run at two points in the pipeline:
1

Pre-Selection Hydration

Runs after candidate fetching, before filtering and scoring. Used for features needed for filtering/scoring.
2

Post-Selection Hydration

Runs after selection, only on candidates that will be returned. Used for expensive features only needed for display.
Use post-selection hydration for expensive operations that only need to run on the final selected candidates (e.g., fetching full tweet content, high-resolution images).

Pipeline Framework

Understand the overall pipeline execution flow

Sources

Learn how candidates are fetched

Filters

Filter candidates after hydration

Scorers

Score candidates using hydrated features

Build docs developers (and LLMs) love