Skip to main content

Overview

Thunder is a high-performance in-memory post store that provides sub-millisecond access to recent posts from accounts a user follows (in-network content). It consumes post create/delete events from Kafka in real-time, maintains per-user post timelines, and serves as the primary source for in-network candidates in the For You feed.
Why In-Memory?Thunder keeps recent posts in RAM to achieve sub-millisecond lookup latency, critical for the For You feed’s performance budget.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      THUNDER SYSTEM                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐                                            │
│  │   Kafka     │                                            │
│  │  Post Events│                                            │
│  └──────┬──────┘                                            │
│         │ Create/Delete Events                             │
│         ▼                                                   │
│  ┌──────────────────────────────────────┐                  │
│  │     Event Listener (Kafka Consumer)  │                  │
│  └──────────────┬───────────────────────┘                  │
│                 │                                           │
│                 ▼                                           │
│  ┌──────────────────────────────────────┐                  │
│  │          PostStore                   │                  │
│  │  ┌────────────────────────────────┐  │                  │
│  │  │ posts: Map[post_id, LightPost] │  │                  │
│  │  └────────────────────────────────┘  │                  │
│  │  ┌────────────────────────────────┐  │                  │
│  │  │ original_posts_by_user:        │  │                  │
│  │  │ Map[user_id, VecDeque<TinyPost>]│  │                  │
│  │  └────────────────────────────────┘  │                  │
│  │  ┌────────────────────────────────┐  │                  │
│  │  │ secondary_posts_by_user:       │  │                  │
│  │  │ Map[user_id, VecDeque<TinyPost>]│  │                  │
│  │  └────────────────────────────────┘  │                  │
│  │  ┌────────────────────────────────┐  │                  │
│  │  │ video_posts_by_user:           │  │                  │
│  │  │ Map[user_id, VecDeque<TinyPost>]│  │                  │
│  │  └────────────────────────────────┘  │                  │
│  └──────────────┬───────────────────────┘                  │
│                 │                                           │
│                 ▼                                           │
│  ┌──────────────────────────────────────┐                  │
│  │    gRPC Service (InNetworkPosts)     │                  │
│  └──────────────┬───────────────────────┘                  │
│                 │                                           │
└─────────────────┼───────────────────────────────────────────┘


         ┌────────────────┐
         │  Home Mixer    │
         └────────────────┘

PostStore: Core Data Structure

The PostStore is Thunder’s central component, managing posts in memory with efficient lookup and automatic trimming.

Data Model

thunder/posts/post_store.rs
pub struct PostStore {
    /// Full post data indexed by post_id
    posts: Arc<DashMap<i64, LightPost>>,
    
    /// Original posts (non-reply, non-retweet) by user
    original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    
    /// Replies and retweets by user
    secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    
    /// Video posts by user (separate for video feed)
    video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
    
    /// Deleted post IDs
    deleted_posts: Arc<DashMap<i64, bool>>,
    
    /// Retention period in seconds
    retention_seconds: u64,
    
    /// Request timeout for iteration
    request_timeout: Duration,
}

TinyPost: Memory-Efficient Reference

To minimize memory overhead, user timelines store minimal references:
pub struct TinyPost {
    pub post_id: i64,
    pub created_at: i64,  // Unix timestamp
}
Full post data (LightPost) is stored separately in the posts map, allowing multiple timelines to reference the same post without duplication.
Memory Optimization: Storing only post ID + timestamp in timelines reduces memory usage by ~90% compared to storing full post objects.

Real-Time Ingestion

Thunder consumes post events from Kafka and updates the in-memory store:

Post Creation

thunder/posts/post_store.rs
pub fn insert_posts(&self, mut posts: Vec<LightPost>) {
    let current_time = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs() as i64;
    
    // Filter: only keep posts within retention period
    posts.retain(|p| {
        p.created_at < current_time
            && current_time - p.created_at <= (self.retention_seconds as i64)
    });
    
    // Sort by creation time
    posts.sort_unstable_by_key(|p| p.created_at);
    
    for post in posts {
        let post_id = post.post_id;
        let author_id = post.author_id;
        let created_at = post.created_at;
        let is_original = !post.is_reply && !post.is_retweet;
        
        // Store full post data
        self.posts.insert(post_id, post);
        
        // Add to appropriate user timeline
        let tiny_post = TinyPost::new(post_id, created_at);
        
        if is_original {
            self.original_posts_by_user
                .entry(author_id)
                .or_default()
                .push_back(tiny_post);
        } else {
            self.secondary_posts_by_user
                .entry(author_id)
                .or_default()
                .push_back(tiny_post);
        }
        
        // Add to video timeline if applicable
        if post.has_video {
            self.video_posts_by_user
                .entry(author_id)
                .or_default()
                .push_back(tiny_post);
        }
    }
}

Post Deletion

pub fn mark_as_deleted(&self, posts: Vec<TweetDeleteEvent>) {
    for post in posts {
        self.posts.remove(&post.post_id);
        self.deleted_posts.insert(post.post_id, true);
        
        // Track deletion event
        self.original_posts_by_user
            .entry(DELETE_EVENT_KEY)
            .or_default()
            .push_back(TinyPost {
                post_id: post.post_id,
                created_at: post.deleted_at,
            });
    }
}

Serving In-Network Posts

Thunder exposes a gRPC service that Home Mixer calls to retrieve posts:
thunder/thunder_service.rs
pub struct ThunderServiceImpl {
    post_store: Arc<PostStore>,
    strato_client: Arc<StratoClient>,
    request_semaphore: Arc<Semaphore>,
}

impl InNetworkPostsService for ThunderServiceImpl {
    async fn get_in_network_posts(
        &self,
        request: Request<GetInNetworkPostsRequest>,
    ) -> Result<Response<GetInNetworkPostsResponse>, Status> {
        let req = request.into_inner();
        
        // Acquire semaphore to limit concurrent requests
        let _permit = self.request_semaphore.try_acquire()?;
        
        // Fetch following list if not provided
        let following_user_ids = if req.following_user_ids.is_empty() {
            self.strato_client
                .fetch_following_list(req.user_id, MAX_INPUT_LIST_SIZE)
                .await?
        } else {
            req.following_user_ids
        };
        
        // Get posts from PostStore
        let all_posts = self.post_store.get_all_posts_by_users(
            &following_user_ids,
            &req.exclude_tweet_ids,
            Instant::now(),
            req.user_id,
        );
        
        // Score and sort by recency
        let scored_posts = score_recent(all_posts, req.max_results);
        
        Ok(Response::new(GetInNetworkPostsResponse { 
            posts: scored_posts 
        }))
    }
}

Post Retrieval Flow

1

Validate Request

Check semaphore permits to prevent overload
2

Fetch Following List

Get list of accounts the user follows (if not provided)
3

Lookup Posts

For each followed user, scan their timeline and collect posts
4

Filter and Deduplicate

Remove excluded posts, deleted posts, and apply reply filtering logic
5

Sort by Recency

Order posts by creation timestamp (newest first)
6

Truncate

Return top K posts (typically 1000)

Efficient Post Lookup

thunder/posts/post_store.rs
pub fn get_all_posts_by_users(
    &self,
    user_ids: &[i64],
    exclude_tweet_ids: &HashSet<i64>,
    start_time: Instant,
    request_user_id: i64,
) -> Vec<LightPost> {
    let mut all_posts = Vec::new();
    
    for user_id in user_ids {
        // Timeout protection
        if !self.request_timeout.is_zero() && start_time.elapsed() >= self.request_timeout {
            break;
        }
        
        if let Some(user_posts_ref) = self.original_posts_by_user.get(user_id) {
            let user_posts = user_posts_ref.value();
            
            // Scan recent posts (reverse order = newest first)
            let posts_iter = user_posts.iter()
                .rev()
                .filter(|post| !exclude_tweet_ids.contains(&post.post_id))
                .take(MAX_TINY_POSTS_PER_USER_SCAN)
                .filter_map(|tiny_post| self.posts.get(&tiny_post.post_id))
                .filter(|post| !self.deleted_posts.contains_key(&post.post_id))
                .take(MAX_ORIGINAL_POSTS_PER_AUTHOR);
            
            all_posts.extend(posts_iter);
        }
    }
    
    all_posts
}
Performance: Lookups are O(1) for user timeline access + O(N) for scanning recent posts, where N is bounded by MAX_TINY_POSTS_PER_USER_SCAN.

Automatic Post Trimming

Thunder automatically removes old posts to maintain memory bounds:
pub async fn trim_old_posts(&self) -> usize {
    let current_time = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs();
    
    let mut total_trimmed = 0;
    
    for mut entry in self.original_posts_by_user.iter_mut() {
        let user_posts = entry.value_mut();
        
        // Remove posts older than retention period (oldest first)
        while let Some(oldest_post) = user_posts.front() {
            if current_time - (oldest_post.created_at as u64) > self.retention_seconds {
                let trimmed_post = user_posts.pop_front().unwrap();
                self.posts.remove(&trimmed_post.post_id);
                total_trimmed += 1;
            } else {
                break;  // Posts are sorted by time
            }
        }
    }
    
    total_trimmed
}

Background Tasks

Thunder runs periodic background tasks:
// Log statistics every 5 seconds
post_store.start_stats_logger();

// Auto-trim old posts every 30 minutes
post_store.start_auto_trim(30);

Configuration

Key parameters controlling Thunder’s behavior:
// Retention period: how long to keep posts in memory
pub const RETENTION_SECONDS: u64 = 2 * 24 * 60 * 60;  // 2 days

// Per-user post limits
pub const MAX_ORIGINAL_POSTS_PER_AUTHOR: usize = 50;
pub const MAX_REPLY_POSTS_PER_AUTHOR: usize = 10;
pub const MAX_VIDEO_POSTS_PER_AUTHOR: usize = 20;

// Scan limits (prevent excessive iteration)
pub const MAX_TINY_POSTS_PER_USER_SCAN: usize = 200;

// Request limits
pub const MAX_INPUT_LIST_SIZE: usize = 10000;  // Max following list size
pub const MAX_POSTS_TO_RETURN: usize = 1000;
pub const MAX_VIDEOS_TO_RETURN: usize = 200;

Integration with Home Mixer

home-mixer/sources/thunder_source.rs
pub struct ThunderSource {
    pub thunder_client: Arc<ThunderClient>,
}

impl Source<ScoredPostsQuery, PostCandidate> for ThunderSource {
    async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result<Vec<PostCandidate>> {
        let request = GetInNetworkPostsRequest {
            user_id: query.user_id,
            following_user_ids: query.user_features.followed_user_ids.clone(),
            max_results: THUNDER_MAX_RESULTS,
            exclude_tweet_ids: vec![],
            is_video_request: false,
            // ...
        };
        
        let response = self.thunder_client
            .get_in_network_posts(request)
            .await?;
        
        let candidates = response.posts
            .into_iter()
            .map(|post| PostCandidate {
                tweet_id: post.post_id,
                author_id: post.author_id,
                served_type: Some(ServedType::ForYouInNetwork),
                // ...
            })
            .collect();
        
        Ok(candidates)
    }
}

Metrics and Monitoring

Thunder emits comprehensive metrics:
// Store statistics
POST_STORE_USER_COUNT         // Number of users with posts
POST_STORE_TOTAL_POSTS        // Total posts in memory
POST_STORE_DELETED_POSTS      // Deleted post count

// Request metrics
GET_IN_NETWORK_POSTS_COUNT           // Posts returned per request
GET_IN_NETWORK_POSTS_DURATION        // Request latency
GET_IN_NETWORK_POSTS_FOLLOWING_SIZE  // Following list size

// Post quality metrics
GET_IN_NETWORK_POSTS_FOUND_FRESHNESS_SECONDS  // Time since most recent post
GET_IN_NETWORK_POSTS_FOUND_UNIQUE_AUTHORS     // Author diversity
GET_IN_NETWORK_POSTS_FOUND_REPLY_RATIO        // Reply vs original ratio

Performance Characteristics

Typical Latencies
  • Post lookup: < 1ms for typical following list (500 users)
  • Post insertion: < 0.1ms per post
  • Memory usage: ~2-5GB for 2-day retention window
  • Throughput: 10,000+ RPS per instance

Scaling

Thunder can be horizontally scaled by sharding users across multiple instances:
Hash(user_id) % num_instances → Thunder Instance

Home Mixer

Orchestration layer that uses Thunder as the in-network candidate source

Phoenix

Complements Thunder with out-of-network content discovery

Candidate Pipeline

Framework that integrates Thunder into the overall recommendation flow

Example Request

grpcurl -d '{
  "user_id": 12345,
  "following_user_ids": [111, 222, 333],
  "max_results": 1000,
  "exclude_tweet_ids": [],
  "is_video_request": false
}' \
  thunder.example.com:443 \
  xai.InNetworkPostsService/GetInNetworkPosts

Build docs developers (and LLMs) love