Thunder is a high-performance in-memory post store that provides sub-millisecond access to recent posts from accounts a user follows (in-network content).It consumes post create/delete events from Kafka in real-time, maintains per-user post timelines, and serves as the primary source for in-network candidates in the For You feed.
Why In-Memory?Thunder keeps recent posts in RAM to achieve sub-millisecond lookup latency, critical for the For You feed’s performance budget.
pub struct PostStore { /// Full post data indexed by post_id posts: Arc<DashMap<i64, LightPost>>, /// Original posts (non-reply, non-retweet) by user original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>, /// Replies and retweets by user secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>, /// Video posts by user (separate for video feed) video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>, /// Deleted post IDs deleted_posts: Arc<DashMap<i64, bool>>, /// Retention period in seconds retention_seconds: u64, /// Request timeout for iteration request_timeout: Duration,}
pub fn insert_posts(&self, mut posts: Vec<LightPost>) { let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as i64; // Filter: only keep posts within retention period posts.retain(|p| { p.created_at < current_time && current_time - p.created_at <= (self.retention_seconds as i64) }); // Sort by creation time posts.sort_unstable_by_key(|p| p.created_at); for post in posts { let post_id = post.post_id; let author_id = post.author_id; let created_at = post.created_at; let is_original = !post.is_reply && !post.is_retweet; // Store full post data self.posts.insert(post_id, post); // Add to appropriate user timeline let tiny_post = TinyPost::new(post_id, created_at); if is_original { self.original_posts_by_user .entry(author_id) .or_default() .push_back(tiny_post); } else { self.secondary_posts_by_user .entry(author_id) .or_default() .push_back(tiny_post); } // Add to video timeline if applicable if post.has_video { self.video_posts_by_user .entry(author_id) .or_default() .push_back(tiny_post); } }}
Thunder automatically removes old posts to maintain memory bounds:
pub async fn trim_old_posts(&self) -> usize { let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let mut total_trimmed = 0; for mut entry in self.original_posts_by_user.iter_mut() { let user_posts = entry.value_mut(); // Remove posts older than retention period (oldest first) while let Some(oldest_post) = user_posts.front() { if current_time - (oldest_post.created_at as u64) > self.retention_seconds { let trimmed_post = user_posts.pop_front().unwrap(); self.posts.remove(&trimmed_post.post_id); total_trimmed += 1; } else { break; // Posts are sorted by time } } } total_trimmed}
// Store statisticsPOST_STORE_USER_COUNT // Number of users with postsPOST_STORE_TOTAL_POSTS // Total posts in memoryPOST_STORE_DELETED_POSTS // Deleted post count// Request metricsGET_IN_NETWORK_POSTS_COUNT // Posts returned per requestGET_IN_NETWORK_POSTS_DURATION // Request latencyGET_IN_NETWORK_POSTS_FOLLOWING_SIZE // Following list size// Post quality metricsGET_IN_NETWORK_POSTS_FOUND_FRESHNESS_SECONDS // Time since most recent postGET_IN_NETWORK_POSTS_FOUND_UNIQUE_AUTHORS // Author diversityGET_IN_NETWORK_POSTS_FOUND_REPLY_RATIO // Reply vs original ratio