Skip to main content

Overview

Filters remove unwanted candidates from the pipeline. They run sequentially after candidate hydration, allowing each filter to see the results of previous filters. Filters partition candidates into “kept” and “removed” sets.

Filter Trait

The Filter trait defines the interface for filtering candidates:
candidate-pipeline/filter.rs
pub struct FilterResult<C> {
    pub kept: Vec<C>,
    pub removed: Vec<C>,
}

#[async_trait]
pub trait Filter<Q, C>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    /// Decide if this filter should run for the given query
    fn enable(&self, _query: &Q) -> bool {
        true
    }

    /// Filter candidates by evaluating each against some criteria.
    /// Returns a FilterResult containing kept candidates (which continue to the next stage)
    /// and removed candidates (which are excluded from further processing).
    async fn filter(&self, query: &Q, candidates: Vec<C>) -> Result<FilterResult<C>, String>;

    /// Returns a stable name for logging/metrics.
    fn name(&self) -> &'static str {
        util::short_type_name(type_name_of_val(self))
    }
}
enable
fn(&self, &Q) -> bool
Optional gating logic to conditionally enable/disable the filter based on query parameters
filter
async fn(&self, &Q, Vec<C>) -> Result<FilterResult<C>, String>
The main filtering method. Partitions candidates into kept and removed sets.
name
fn(&self) -> &'static str
Returns the filter name for logging and metrics. Auto-generated from the type name by default.

Age Filter

The Age filter removes tweets older than a specified duration:
home-mixer/filters/age_filter.rs
pub struct AgeFilter {
    pub max_age: Duration,
}

impl AgeFilter {
    pub fn new(max_age: Duration) -> Self {
        Self { max_age }
    }

    fn is_within_age(&self, tweet_id: i64) -> bool {
        snowflake::duration_since_creation_opt(tweet_id)
            .map(|age| age <= self.max_age)
            .unwrap_or(false)
    }
}

#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for AgeFilter {
    async fn filter(
        &self,
        _query: &ScoredPostsQuery,
        candidates: Vec<PostCandidate>,
    ) -> Result<FilterResult<PostCandidate>, String> {
        let (kept, removed): (Vec<_>, Vec<_>) = candidates
            .into_iter()
            .partition(|c| self.is_within_age(c.tweet_id));

        Ok(FilterResult { kept, removed })
    }
}
The Age filter uses Twitter’s Snowflake ID format to extract the creation timestamp from the tweet ID itself, without requiring external service calls.

Self Tweet Filter

The Self Tweet filter removes tweets where the author is the viewer:
home-mixer/filters/self_tweet_filter.rs
pub struct SelfTweetFilter;

#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for SelfTweetFilter {
    async fn filter(
        &self,
        query: &ScoredPostsQuery,
        candidates: Vec<PostCandidate>,
    ) -> Result<FilterResult<PostCandidate>, String> {
        let viewer_id = query.user_id as u64;
        let (kept, removed): (Vec<_>, Vec<_>) = candidates
            .into_iter()
            .partition(|c| c.author_id != viewer_id);

        Ok(FilterResult { kept, removed })
    }
}
This filter prevents users from seeing their own tweets in their For You feed, which improves content diversity.

Author Socialgraph Filter

The Author Socialgraph filter removes candidates from blocked or muted authors:
home-mixer/filters/author_socialgraph_filter.rs
pub struct AuthorSocialgraphFilter;

#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for AuthorSocialgraphFilter {
    async fn filter(
        &self,
        query: &ScoredPostsQuery,
        candidates: Vec<PostCandidate>,
    ) -> Result<FilterResult<PostCandidate>, String> {
        let viewer_blocked_user_ids = query.user_features.blocked_user_ids.clone();
        let viewer_muted_user_ids = query.user_features.muted_user_ids.clone();

        if viewer_blocked_user_ids.is_empty() && viewer_muted_user_ids.is_empty() {
            return Ok(FilterResult {
                kept: candidates,
                removed: Vec::new(),
            });
        }

        let mut kept: Vec<PostCandidate> = Vec::new();
        let mut removed: Vec<PostCandidate> = Vec::new();

        for candidate in candidates {
            let author_id = candidate.author_id as i64;
            let muted = viewer_muted_user_ids.contains(&author_id);
            let blocked = viewer_blocked_user_ids.contains(&author_id);
            if muted || blocked {
                removed.push(candidate);
            } else {
                kept.push(candidate);
            }
        }

        Ok(FilterResult { kept, removed })
    }
}
This filter respects user privacy preferences by removing content from blocked and muted authors. This is a critical safety and UX feature.

Visibility Filtering (VF) Filter

The VF filter applies visibility filtering rules based on safety results:
home-mixer/filters/vf_filter.rs
pub struct VFFilter;

#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for VFFilter {
    async fn filter(
        &self,
        _query: &ScoredPostsQuery,
        candidates: Vec<PostCandidate>,
    ) -> Result<FilterResult<PostCandidate>, String> {
        let (removed, kept): (Vec<_>, Vec<_>) = candidates
            .into_iter()
            .partition(|c| should_drop(&c.visibility_reason));

        Ok(FilterResult { kept, removed })
    }
}

fn should_drop(reason: &Option<FilteredReason>) -> bool {
    match reason {
        Some(FilteredReason::SafetyResult(safety_result)) => {
            matches!(safety_result.action, Action::Drop(_))
        }
        Some(_) => true,
        None => false,
    }
}
The VF filter applies content safety policies by removing tweets that violate safety rules. This relies on the visibility_reason field being hydrated earlier in the pipeline.

Available Filters

The home-mixer service includes these filters:

AgeFilter

Removes tweets older than a specified duration

AuthorSocialgraphFilter

Removes tweets from blocked or muted authors

CoreDataHydrationFilter

Removes tweets missing core data

DedupConversationFilter

Removes duplicate conversation threads

DropDuplicatesFilter

Removes exact duplicate tweets

IneligibleSubscriptionFilter

Removes subscription content for non-subscribers

MutedKeywordFilter

Removes tweets matching muted keywords

PreviouslySeenPostsFilter

Removes tweets the user has already seen

PreviouslyServedPostsFilter

Removes tweets already served in recent requests

RetweetDeduplicationFilter

Deduplicates retweets of the same original tweet

SelfTweetFilter

Removes the user’s own tweets

VFFilter

Applies visibility filtering safety rules

Sequential Execution

Filters run sequentially, allowing each filter to see the results of previous filters:
candidate-pipeline/candidate_pipeline.rs
async fn run_filters(
    &self,
    query: &Q,
    mut candidates: Vec<C>,
    filters: &[Box<dyn Filter<Q, C>>],
    stage: PipelineStage,
) -> (Vec<C>, Vec<C>) {
    let request_id = query.request_id().to_string();
    let mut all_removed = Vec::new();
    
    for filter in filters.iter().filter(|f| f.enable(query)) {
        let backup = candidates.clone();
        match filter.filter(query, candidates).await {
            Ok(result) => {
                candidates = result.kept;
                all_removed.extend(result.removed);
            }
            Err(err) => {
                error!(
                    "request_id={} stage={:?} component={} failed: {}",
                    request_id, stage, filter.name(), err
                );
                candidates = backup;
            }
        }
    }
    
    info!(
        "request_id={} stage={:?} kept {}, removed {}",
        request_id, stage, candidates.len(), all_removed.len()
    );
    
    (candidates, all_removed)
}
If a filter fails, the error is logged and the original candidate list is restored. This ensures filter failures don’t accidentally remove valid candidates.

Filter Stages

Filters can run at two points in the pipeline:
1

Pre-Selection Filtering

Runs after candidate hydration, before scoring. Used for removing invalid or unwanted candidates.
2

Post-Selection Filtering

Runs after selection, on candidates that will be returned. Used for final safety checks.

Error Handling

If a filter fails, the original candidate list is restored (no filtering occurs). This “safe” error handling prevents bugs in one filter from accidentally removing all candidates.

Best Practices

Use Rust’s partition method for clean, functional filtering:
let (kept, removed): (Vec<_>, Vec<_>) = candidates
    .into_iter()
    .partition(|c| should_keep(c));
Short-circuit when filtering isn’t needed:
if viewer_blocked_user_ids.is_empty() && viewer_muted_user_ids.is_empty() {
    return Ok(FilterResult {
        kept: candidates,
        removed: Vec::new(),
    });
}
Include meaningful context in error messages:
Err(format!("AgeFilter: invalid snowflake id {}", tweet_id))
Use the stats macro for automatic metrics:
#[xai_stats_macro::receive_stats]
async fn filter(...) -> Result<FilterResult<PostCandidate>, String> {
    // ...
}

Partition Example

The partition pattern is idiomatic Rust for filtering:
// Simple partition by condition
let (kept, removed) = candidates
    .into_iter()
    .partition(|c| c.score > threshold);

// Partition with complex logic
let (kept, removed) = candidates
    .into_iter()
    .partition(|c| {
        let is_valid = c.author_id != viewer_id;
        let is_recent = age(c.tweet_id) < max_age;
        is_valid && is_recent
    });

Performance Considerations

Filters run sequentially and process all candidates, so keep filter logic fast. For expensive operations (like external service calls), hydrate the data earlier in the pipeline and filter based on hydrated fields.

Conditional Enabling

Filters can be conditionally enabled based on query parameters:
fn enable(&self, query: &ScoredPostsQuery) -> bool {
    query.enable_age_filter
}
This allows dynamic filter selection based on:
  • User experiments/flags
  • Query parameters
  • A/B test assignments
  • Feature flags

Pipeline Framework

Understand the overall pipeline execution flow

Hydrators

Hydrate features used by filters

Scorers

Score candidates after filtering

Visibility Filtering

Learn about content safety policies

Build docs developers (and LLMs) love