Overview
Filters remove unwanted candidates from the pipeline. They run sequentially after candidate hydration, allowing each filter to see the results of previous filters. Filters partition candidates into “kept” and “removed” sets.
Filter Trait
The Filter trait defines the interface for filtering candidates:
candidate-pipeline/filter.rs
pub struct FilterResult < C > {
pub kept : Vec < C >,
pub removed : Vec < C >,
}
#[async_trait]
pub trait Filter < Q , C > : Any + Send + Sync
where
Q : Clone + Send + Sync + ' static ,
C : Clone + Send + Sync + ' static ,
{
/// Decide if this filter should run for the given query
fn enable ( & self , _query : & Q ) -> bool {
true
}
/// Filter candidates by evaluating each against some criteria.
/// Returns a FilterResult containing kept candidates (which continue to the next stage)
/// and removed candidates (which are excluded from further processing).
async fn filter ( & self , query : & Q , candidates : Vec < C >) -> Result < FilterResult < C >, String >;
/// Returns a stable name for logging/metrics.
fn name ( & self ) -> & ' static str {
util :: short_type_name ( type_name_of_val ( self ))
}
}
Optional gating logic to conditionally enable/disable the filter based on query parameters
filter
async fn(&self, &Q, Vec<C>) -> Result<FilterResult<C>, String>
The main filtering method. Partitions candidates into kept and removed sets.
name
fn(&self) -> &'static str
Returns the filter name for logging and metrics. Auto-generated from the type name by default.
Age Filter
The Age filter removes tweets older than a specified duration:
home-mixer/filters/age_filter.rs
pub struct AgeFilter {
pub max_age : Duration ,
}
impl AgeFilter {
pub fn new ( max_age : Duration ) -> Self {
Self { max_age }
}
fn is_within_age ( & self , tweet_id : i64 ) -> bool {
snowflake :: duration_since_creation_opt ( tweet_id )
. map ( | age | age <= self . max_age)
. unwrap_or ( false )
}
}
#[async_trait]
impl Filter < ScoredPostsQuery , PostCandidate > for AgeFilter {
async fn filter (
& self ,
_query : & ScoredPostsQuery ,
candidates : Vec < PostCandidate >,
) -> Result < FilterResult < PostCandidate >, String > {
let ( kept , removed ) : ( Vec < _ >, Vec < _ >) = candidates
. into_iter ()
. partition ( | c | self . is_within_age ( c . tweet_id));
Ok ( FilterResult { kept , removed })
}
}
The Age filter uses Twitter’s Snowflake ID format to extract the creation timestamp from the tweet ID itself, without requiring external service calls.
The Self Tweet filter removes tweets where the author is the viewer:
home-mixer/filters/self_tweet_filter.rs
pub struct SelfTweetFilter ;
#[async_trait]
impl Filter < ScoredPostsQuery , PostCandidate > for SelfTweetFilter {
async fn filter (
& self ,
query : & ScoredPostsQuery ,
candidates : Vec < PostCandidate >,
) -> Result < FilterResult < PostCandidate >, String > {
let viewer_id = query . user_id as u64 ;
let ( kept , removed ) : ( Vec < _ >, Vec < _ >) = candidates
. into_iter ()
. partition ( | c | c . author_id != viewer_id );
Ok ( FilterResult { kept , removed })
}
}
This filter prevents users from seeing their own tweets in their For You feed, which improves content diversity.
Author Socialgraph Filter
The Author Socialgraph filter removes candidates from blocked or muted authors:
home-mixer/filters/author_socialgraph_filter.rs
pub struct AuthorSocialgraphFilter ;
#[async_trait]
impl Filter < ScoredPostsQuery , PostCandidate > for AuthorSocialgraphFilter {
async fn filter (
& self ,
query : & ScoredPostsQuery ,
candidates : Vec < PostCandidate >,
) -> Result < FilterResult < PostCandidate >, String > {
let viewer_blocked_user_ids = query . user_features . blocked_user_ids . clone ();
let viewer_muted_user_ids = query . user_features . muted_user_ids . clone ();
if viewer_blocked_user_ids . is_empty () && viewer_muted_user_ids . is_empty () {
return Ok ( FilterResult {
kept : candidates ,
removed : Vec :: new (),
});
}
let mut kept : Vec < PostCandidate > = Vec :: new ();
let mut removed : Vec < PostCandidate > = Vec :: new ();
for candidate in candidates {
let author_id = candidate . author_id as i64 ;
let muted = viewer_muted_user_ids . contains ( & author_id );
let blocked = viewer_blocked_user_ids . contains ( & author_id );
if muted || blocked {
removed . push ( candidate );
} else {
kept . push ( candidate );
}
}
Ok ( FilterResult { kept , removed })
}
}
This filter respects user privacy preferences by removing content from blocked and muted authors. This is a critical safety and UX feature.
Visibility Filtering (VF) Filter
The VF filter applies visibility filtering rules based on safety results:
home-mixer/filters/vf_filter.rs
pub struct VFFilter ;
#[async_trait]
impl Filter < ScoredPostsQuery , PostCandidate > for VFFilter {
async fn filter (
& self ,
_query : & ScoredPostsQuery ,
candidates : Vec < PostCandidate >,
) -> Result < FilterResult < PostCandidate >, String > {
let ( removed , kept ) : ( Vec < _ >, Vec < _ >) = candidates
. into_iter ()
. partition ( | c | should_drop ( & c . visibility_reason));
Ok ( FilterResult { kept , removed })
}
}
fn should_drop ( reason : & Option < FilteredReason >) -> bool {
match reason {
Some ( FilteredReason :: SafetyResult ( safety_result )) => {
matches! ( safety_result . action, Action :: Drop ( _ ))
}
Some ( _ ) => true ,
None => false ,
}
}
The VF filter applies content safety policies by removing tweets that violate safety rules. This relies on the visibility_reason field being hydrated earlier in the pipeline.
Available Filters
The home-mixer service includes these filters:
AgeFilter Removes tweets older than a specified duration
AuthorSocialgraphFilter Removes tweets from blocked or muted authors
CoreDataHydrationFilter Removes tweets missing core data
DedupConversationFilter Removes duplicate conversation threads
DropDuplicatesFilter Removes exact duplicate tweets
IneligibleSubscriptionFilter Removes subscription content for non-subscribers
MutedKeywordFilter Removes tweets matching muted keywords
PreviouslySeenPostsFilter Removes tweets the user has already seen
PreviouslyServedPostsFilter Removes tweets already served in recent requests
RetweetDeduplicationFilter Deduplicates retweets of the same original tweet
SelfTweetFilter Removes the user’s own tweets
VFFilter Applies visibility filtering safety rules
Sequential Execution
Filters run sequentially, allowing each filter to see the results of previous filters:
candidate-pipeline/candidate_pipeline.rs
async fn run_filters (
& self ,
query : & Q ,
mut candidates : Vec < C >,
filters : & [ Box < dyn Filter < Q , C >>],
stage : PipelineStage ,
) -> ( Vec < C >, Vec < C >) {
let request_id = query . request_id () . to_string ();
let mut all_removed = Vec :: new ();
for filter in filters . iter () . filter ( | f | f . enable ( query )) {
let backup = candidates . clone ();
match filter . filter ( query , candidates ) . await {
Ok ( result ) => {
candidates = result . kept;
all_removed . extend ( result . removed);
}
Err ( err ) => {
error! (
"request_id={} stage={:?} component={} failed: {}" ,
request_id , stage , filter . name (), err
);
candidates = backup ;
}
}
}
info! (
"request_id={} stage={:?} kept {}, removed {}" ,
request_id , stage , candidates . len (), all_removed . len ()
);
( candidates , all_removed )
}
If a filter fails, the error is logged and the original candidate list is restored. This ensures filter failures don’t accidentally remove valid candidates.
Filter Stages
Filters can run at two points in the pipeline:
Pre-Selection Filtering
Runs after candidate hydration, before scoring. Used for removing invalid or unwanted candidates.
Post-Selection Filtering
Runs after selection, on candidates that will be returned. Used for final safety checks.
Error Handling
If a filter fails, the original candidate list is restored (no filtering occurs). This “safe” error handling prevents bugs in one filter from accidentally removing all candidates.
Best Practices
Use Rust’s partition method for clean, functional filtering: let ( kept , removed ) : ( Vec < _ >, Vec < _ >) = candidates
. into_iter ()
. partition ( | c | should_keep ( c ));
Short-circuit when filtering isn’t needed: if viewer_blocked_user_ids . is_empty () && viewer_muted_user_ids . is_empty () {
return Ok ( FilterResult {
kept : candidates ,
removed : Vec :: new (),
});
}
Include meaningful context in error messages: Err ( format! ( "AgeFilter: invalid snowflake id {}" , tweet_id ))
Use the stats macro for automatic metrics: #[xai_stats_macro :: receive_stats]
async fn filter ( ... ) -> Result < FilterResult < PostCandidate >, String > {
// ...
}
Partition Example
The partition pattern is idiomatic Rust for filtering:
// Simple partition by condition
let ( kept , removed ) = candidates
. into_iter ()
. partition ( | c | c . score > threshold );
// Partition with complex logic
let ( kept , removed ) = candidates
. into_iter ()
. partition ( | c | {
let is_valid = c . author_id != viewer_id ;
let is_recent = age ( c . tweet_id) < max_age ;
is_valid && is_recent
});
Filters run sequentially and process all candidates, so keep filter logic fast. For expensive operations (like external service calls), hydrate the data earlier in the pipeline and filter based on hydrated fields.
Conditional Enabling
Filters can be conditionally enabled based on query parameters:
fn enable ( & self , query : & ScoredPostsQuery ) -> bool {
query . enable_age_filter
}
This allows dynamic filter selection based on:
User experiments/flags
Query parameters
A/B test assignments
Feature flags
Pipeline Framework Understand the overall pipeline execution flow
Hydrators Hydrate features used by filters
Scorers Score candidates after filtering
Visibility Filtering Learn about content safety policies