Skip to main content
This guide explains how data flows through the system, from raw Twitter archive to actionable deletion candidates.

Overview

The tool processes data in two distinct pipelines:

Extract pipeline

Transforms Twitter’s complex JSON archive into a simplified CSV format.

Input: Twitter archive JSON

Location: data/tweets/tweets.json Format:
tweets.json
[
  {
    "tweet": {
      "id_str": "1234567890",
      "full_text": "This is my tweet content",
      "created_at": "2023-01-15T10:30:00Z",
      "retweet_count": 5,
      "favorite_count": 12
      // ... many other fields
    }
  },
  {
    "tweet": {
      "id_str": "9876543210",
      "full_text": "Another tweet here"
      // ...
    }
  }
]
Twitter archives contain 50+ fields per tweet. We only extract what we need: id_str and full_text.

Step 1: Parse JSON

Component: JSONParser (storage.py:35)
parser = JSONParser(settings.tweets_archive_path)
tweets = parser.parse()
Processing:
  1. Load entire JSON file into memory
  2. Iterate through array of tweet objects
  3. Extract id_str and full_text from each tweet object
  4. Create immutable Tweet objects
Output: List[Tweet]
[
    Tweet(id="1234567890", content="This is my tweet content"),
    Tweet(id="9876543210", content="Another tweet here"),
    # ...
]

Step 2: Write to CSV

Component: CSVWriter (storage.py:131)
with CSVWriter(settings.transformed_tweets_path) as writer:
    writer.write_tweets(tweets)
Processing:
  1. Create output directory if needed
  2. Open CSV file for writing
  3. Write header row: id,text
  4. Write one row per tweet
  5. Close file (automatic via context manager)

Output: Transformed CSV

Location: data/tweets/transformed/tweets.csv Format:
tweets.csv
id,text
1234567890,"This is my tweet content"
9876543210,"Another tweet here"
Why this transformation?
  • Simpler format (2 columns vs 50+ fields)
  • Human-readable (can inspect in Excel)
  • Faster to parse for analysis
  • Smaller file size

Visual flow

Analysis pipeline

Processes tweets through Gemini AI and identifies deletion candidates.

Input: Transformed CSV

Location: data/tweets/transformed/tweets.csv Same format as extract pipeline output.

Step 1: Load tweets and checkpoint

Component: CSVParser + Checkpoint (storage.py:61, storage.py:84)
parser = CSVParser(settings.transformed_tweets_path)
tweets = parser.parse()

with Checkpoint(settings.checkpoint_path) as checkpoint:
    start_index = checkpoint.load()
Processing:
  1. Parse entire CSV into List[Tweet]
  2. Load checkpoint (0 if first run, or saved index)
  3. Skip already-processed tweets
Example:
# First run
start_index = 0  # Process from beginning

# After interruption at tweet 100
start_index = 100  # Resume from tweet 100

Step 2: Batch processing loop

Component: Application.analyze_tweets() (application.py:64)
for i in range(start_index, len(tweets), settings.batch_size):
    batch = tweets[i:i+settings.batch_size]
    
    for tweet in batch:
        # Process tweet
    
    checkpoint.save(i + len(batch))
Batching behavior:
Tweets 0-9 (batch_size=10)
batch = tweets[0:10]
# Process 10 tweets
checkpoint.save(10)
Important: Checkpoint updates only after full batch completes. If interrupted mid-batch, those tweets will be re-processed on resume.

Step 3: Filter retweets

Component: _is_retweet() (application.py:125)
for tweet in batch:
    if _is_retweet(tweet):
        continue  # Skip retweets
    
    result = self.analyzer.analyze(tweet)
Retweet detection:
def _is_retweet(tweet) -> bool:
    return tweet.content.startswith("RT @")
Examples:
# Retweet (skipped)
Tweet(id="123", content="RT @someone: Great point!")

# Original tweet (analyzed)
Tweet(id="456", content="Here's my original thought")

Step 4: AI analysis

Component: Gemini.analyze() (analyzer.py:71)
result = self.analyzer.analyze(tweet)
Processing:
1

Rate limiting

Enforce minimum delay since last request (default: 1 second)
elapsed = time.time() - self.last_request_time
if elapsed < 1.0:
    time.sleep(1.0 - elapsed)
2

Build prompt

Construct prompt with tweet content and criteria
prompt = self._build_prompt(tweet)
3

Call Gemini API

Send prompt to Gemini with JSON response format
response = self.client.models.generate_content(
    model=self.model,
    contents=prompt,
    config={"response_mime_type": "application/json"}
)
4

Parse response

Extract decision from JSON response
data = json.loads(response.text)
decision = Decision(data["decision"].upper())
5

Create result

Return AnalysisResult with tweet URL and decision
return AnalysisResult(
    tweet_url=settings.tweet_url(tweet.id),
    decision=decision
)
API Request:
{
  "model": "gemini-2.5-flash",
  "contents": "You are evaluating tweets...\n\nTweet: 'This is problematic content'...",
  "config": {
    "response_mime_type": "application/json"
  }
}
API Response:
{
  "decision": "DELETE",
  "reason": "Contains unprofessional language"
}
Converted to:
AnalysisResult(
    tweet_url="https://x.com/username/status/1234567890",
    decision=Decision.DELETE
)

Step 5: Write results

Component: CSVWriter.write_result() (storage.py:172)
if result.decision == Decision.DELETE:
    writer.write_result(result)
Only DELETE decisions are written to results. KEEP decisions are silently skipped. This keeps the output focused on actionable items.
Processing:
  1. Check if decision is DELETE
  2. If yes, write row to results CSV
  3. Flush to disk immediately (ensures no data loss)
  4. If no, skip (don’t write KEEP decisions)

Step 6: Update checkpoint

Component: Checkpoint.save() (storage.py:121)
checkpoint.save(i + len(batch))
Processing:
  1. Seek to beginning of checkpoint file
  2. Truncate (clear existing content)
  3. Write new index
  4. Flush to disk
Checkpoint file:
30
Simple integer representing next tweet index to process.

Output: Results CSV

Location: data/tweets/processed/results.csv Format:
results.csv
tweet_url,deleted
https://x.com/username/status/1234567890,false
https://x.com/username/status/9876543210,false
Fields:
  • tweet_url: Direct link to tweet (clickable in spreadsheets)
  • deleted: Manual tracking column (initialized to false)
Allows users to manually track deletion progress:
  1. Open results.csv in Excel/Google Sheets
  2. Click tweet URL to review
  3. Delete tweet on Twitter
  4. Mark deleted as true in spreadsheet
  5. Track completion progress

Visual flow

Data transformations

Summary of how data structure changes through the pipeline:
Input format from Twitter
{
  "tweet": {
    "id_str": "1234567890",
    "full_text": "Tweet content",
    "created_at": "...",
    "retweet_count": 5,
    // 50+ other fields
  }
}
Size: ~5-10 KB per tweet (large)

Error handling flow

How errors propagate through the system: Examples:
try:
    result = gemini.analyze(tweet)
except RateLimitError:
    # Retry 3 times with backoff
    # If still fails, propagate

Performance characteristics

Time analysis

For 5,000 tweets with default settings:
1

Extract

~1-2 seconds
  • Parse JSON: O(n)
  • Write CSV: O(n)
  • Memory-bound operation
2

Analysis

~1.5-2 hours
  • 5,000 tweets × 1 req/sec = 5,000 seconds
  • Plus API latency (~500ms per request)
  • Network-bound operation

Memory usage

Memory: O(n) where n = number of tweets
tweets = parser.parse()  # All tweets in memory
writer.write_tweets(tweets)  # Still in memory
For 50,000 tweets × 200 bytes = ~10 MB
Memory: O(batch_size)
for i in range(start_index, len(tweets), batch_size):
    batch = tweets[i:i+batch_size]  # Only 10 tweets in memory
For batch_size=10 × 200 bytes = ~2 KB

Next steps

Component details

Deep dive into each module’s implementation

Design decisions

Understand why the system works this way

Build docs developers (and LLMs) love