Skip to main content

Overview

The Twitter Knowledge Base system provides two key components:
  1. TweetKnowledgeBase: Vector-based storage and retrieval of tweets using ChromaDB
  2. TwitterState: SQLite-based tracking of interactions (replies, reposts) to avoid duplicates

TweetKnowledgeBase

The TweetKnowledgeBase class uses ChromaDB and sentence transformers to store and query tweets using semantic search.

Initialization

from twitter_agent.twitter_knowledge_base import TweetKnowledgeBase
import chromadb
from sentence_transformers import SentenceTransformer
import os

class TweetKnowledgeBase:
    def __init__(self, collection_name: str = "twitter_knowledge"):
        print_system("Initializing TweetKnowledgeBase...")
        # Create data directory if it doesn't exist
        data_dir = os.path.join(os.path.dirname(__file__), "..", "data", "chroma_db")
        os.makedirs(data_dir, exist_ok=True)
        
        # Initialize ChromaDB client with persistence
        self.client = chromadb.PersistentClient(path="./chroma_db")
        
        # Use advanced embedding model
        self.embedding_model = SentenceTransformer('all-mpnet-base-v2')
        
        # Create embedding function
        class EmbeddingFunction:
            def __init__(self, model):
                self.model = model
            
            def __call__(self, input: List[str]) -> List[List[float]]:
                embeddings = self.model.encode(input)
                return embeddings.tolist()
        
        embedding_func = EmbeddingFunction(self.embedding_model)
        
        # Create or get collection
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=embedding_func
        )
From twitter_knowledge_base.py:22-54

Embedding Model

The knowledge base uses the all-mpnet-base-v2 model from SentenceTransformers for high-quality semantic embeddings:
  • Model: all-mpnet-base-v2
  • Dimensions: 768
  • Performance: Excellent for semantic similarity tasks
  • Speed: Optimized for production use

Adding Tweets

Add tweets to the knowledge base:
def add_tweets(self, tweets: List[Tweet]):
    """Add tweets to the knowledge base."""
    documents = [tweet.text for tweet in tweets]
    ids = [tweet.id for tweet in tweets]
    metadata = [
        {
            "author_id": tweet.author_id,
            "created_at": tweet.created_at,
        }
        for tweet in tweets
    ]
    
    self.collection.add(
        documents=documents,
        ids=ids,
        metadatas=metadata
    )
From twitter_knowledge_base.py:56-72

Querying the Knowledge Base

Search for relevant tweets using semantic search:
def query_knowledge_base(self, query: str, n_results: int = 10) -> List[Dict]:
    """Query the knowledge base for relevant tweets."""
    try:
        print_system(f"Querying knowledge base with: {query}")
        
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        
        if not results['documents'][0]:
            print_system("No results found in knowledge base")
            return []
            
        formatted_results = []
        for doc, metadata, distance in zip(
            results['documents'][0], 
            results['metadatas'][0],
            results['distances'][0]
        ):
            # Format timestamp
            created_at = datetime.fromisoformat(metadata['created_at'].replace('Z', '+00:00'))
            formatted_date = created_at.strftime('%Y-%m-%d %H:%M:%S UTC')
            
            formatted_results.append({
                "text": doc,
                "metadata": {
                    **metadata,
                    "created_at": formatted_date
                },
                "relevance_score": 1 - distance  # Convert distance to similarity
            })
        
        # Sort by relevance score
        formatted_results.sort(key=lambda x: x['relevance_score'], reverse=True)
        
        print_system(f"Found {len(formatted_results)} relevant tweets")
        return formatted_results
        
    except Exception as e:
        print_error(f"Error querying knowledge base: {e}")
        return []
From twitter_knowledge_base.py:74-118

Formatting Results

Format query results into a readable string:
def format_query_results(self, results: List[Dict]) -> str:
    """Format query results into a readable string."""
    if not results:
        return "No relevant tweets found in knowledge base."
        
    formatted_output = []
    for result in results:
        formatted_output.append(
            f"Tweet from {result['metadata']['created_at']}\n"
            f"Relevance: {result['relevance_score']:.2f}\n"
            f"Content: {result['text']}\n"
        )
        
    return "\n---\n".join(formatted_output)
From twitter_knowledge_base.py:120-133

Collection Statistics

Get statistics about the knowledge base:
def get_collection_stats(self) -> Dict:
    """Get statistics about the knowledge base collection."""
    try:
        count = self.collection.count()
        metadata = self.collection.get()
        last_update = None
        if metadata.get("metadatas"):
            # Get most recent tweet timestamp
            last_update = max(m["created_at"] for m in metadata["metadatas"])
            last_update = datetime.fromisoformat(last_update.replace('Z', '+00:00'))
        
        print_system(f"Knowledge base contains {count} tweets")
        return {
            "count": count,
            "last_update": last_update or datetime.now()
        }
    except Exception as e:
        print_error(f"Error getting collection stats: {str(e)}")
        return {"count": 0, "last_update": datetime.now()}
From twitter_knowledge_base.py:135-153

Clearing the Collection

Clear all tweets from the knowledge base:
def clear_collection(self) -> bool:
    """Clear all tweets from the knowledge base."""
    try:
        print_system("Clearing knowledge base collection...")
        ids = self.collection.get()["ids"]
        if ids:  # Only delete if there are IDs
            self.collection.delete(ids=ids)
            print_system("Knowledge base cleared successfully")
        else:
            print_system("Knowledge base is already empty")
        return True
    except Exception as e:
        print_error(f"Error clearing knowledge base: {str(e)}")
        return False
From twitter_knowledge_base.py:155-168

Updating Knowledge Base from KOLs

The update_knowledge_base function populates the knowledge base with tweets from key opinion leaders (KOLs):
async def update_knowledge_base(
    twitter_client: TwitterClient, 
    knowledge_base, 
    kol_list: List[Dict]
):
    """Update the knowledge base with recent tweets from top KOLs."""
    TOP_KOLS = 5
    TWEETS_PER_KOL = 15
    REQUEST_DELAY = 5
    
    # Select random sample of KOLs
    selected_kols = random.sample(valid_kols, min(TOP_KOLS, len(valid_kols)))
    
    # Clear existing knowledge base
    knowledge_base.clear_collection()
    
    # Process each selected KOL
    for kol in selected_kols:
        tweets = await twitter_client.get_user_tweets(
            user_id=kol['user_id'],
            max_results=TWEETS_PER_KOL
        )
        
        if kol_tweets:
            knowledge_base.add_tweets(kol_tweets)
        
        await asyncio.sleep(REQUEST_DELAY)
From twitter_knowledge_base.py:170-330

KOL List Format

The KOL list should be a list of dictionaries with username and user_id:
[
  {
    "username": "vitalikbuterin",
    "user_id": "295218901"
  },
  {
    "username": "elonmusk",
    "user_id": "44196397"
  }
]

TwitterState

The TwitterState class tracks interactions to prevent duplicate replies and reposts.

Initialization

from twitter_agent.twitter_state import TwitterState
import sqlite3
import os
from datetime import datetime, timedelta

class TwitterState:
    def __init__(self):
        self.account_id = None
        self.last_mention_id = None
        self.last_check_time = None
        self.mentions_count = 0
        self.reset_time = None
        # Generate DB name based on character file
        self.db_name = self._get_db_name()
        self._init_db()
From twitter_state.py:10-19

Database Schema

TwitterState uses SQLite with three tables:
def _init_db(self):
    """Initialize SQLite database for state and replied tweets."""
    with sqlite3.connect(self.db_name) as conn:
        # Replied tweets table
        conn.execute('''
            CREATE TABLE IF NOT EXISTS replied_tweets (
                tweet_id TEXT PRIMARY KEY,
                replied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # Reposted tweets table
        conn.execute('''
            CREATE TABLE IF NOT EXISTS reposted_tweets (
                tweet_id TEXT PRIMARY KEY,
                reposted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # State table
        conn.execute('''
            CREATE TABLE IF NOT EXISTS twitter_state (
                key TEXT PRIMARY KEY,
                value TEXT
            )
        ''')
        
        conn.execute('CREATE INDEX IF NOT EXISTS idx_replied_at ON replied_tweets(replied_at)')
        conn.execute('CREATE INDEX IF NOT EXISTS idx_reposted_at ON reposted_tweets(reposted_at)')
From twitter_state.py:34-62

Tracking Replied Tweets

Check if already replied and add to database:
def has_replied_to(self, tweet_id):
    """Check if we've already replied to this tweet."""
    with sqlite3.connect(self.db_name) as conn:
        cursor = conn.execute('SELECT 1 FROM replied_tweets WHERE tweet_id = ?', (tweet_id,))
        return cursor.fetchone() is not None

def add_replied_tweet(self, tweet_id):
    """Add a tweet ID to the database of replied tweets."""
    try:
        with sqlite3.connect(self.db_name) as conn:
            conn.execute('INSERT OR REPLACE INTO replied_tweets (tweet_id) VALUES (?)', (tweet_id,))
            conn.commit()
        return f"Successfully added tweet {tweet_id} to replied tweets database"
    except Exception as e:
        return f"Error adding tweet {tweet_id} to database: {str(e)}"
From twitter_state.py:96-104,106-110

Tracking Reposted Tweets

Check if already reposted and add to database:
def has_reposted(self, tweet_id: str) -> bool:
    """Check if we have already reposted a tweet."""
    with sqlite3.connect(self.db_name) as conn:
        cursor = conn.execute(
            'SELECT 1 FROM reposted_tweets WHERE tweet_id = ?',
            (tweet_id,)
        )
        return cursor.fetchone() is not None

def add_reposted_tweet(self, tweet_id: str) -> str:
    """Add a tweet ID to the database of reposted tweets."""
    try:
        with sqlite3.connect(self.db_name) as conn:
            conn.execute(
                'INSERT INTO reposted_tweets (tweet_id) VALUES (?)',
                (tweet_id,)
            )
        return f"Successfully recorded repost of tweet {tweet_id}"
    except sqlite3.IntegrityError:
        return f"Tweet {tweet_id} was already recorded as reposted"
From twitter_state.py:132-151

Rate Limiting

Manage mention check intervals:
# Constants
MENTION_CHECK_INTERVAL = 2 * 60  # 2 minutes
MAX_MENTIONS_PER_INTERVAL = 50

def can_check_mentions(self):
    """Check if enough time has passed since last mention check."""
    if not self.last_check_time:
        return True
    
    time_since_last_check = (datetime.now() - self.last_check_time).total_seconds()
    return time_since_last_check >= MENTION_CHECK_INTERVAL

def update_rate_limit(self):
    """Update and check rate limits."""
    now = datetime.now()
    if not self.reset_time or now >= self.reset_time:
        self.mentions_count = 0
        self.reset_time = now + timedelta(minutes=15)
    
    self.mentions_count += 1
    return self.mentions_count <= MAX_MENTIONS_PER_INTERVAL
From twitter_state.py:7-8,112-130

Persisting State

Save and load state from SQLite:
def save(self):
    """Save state to SQLite database."""
    with sqlite3.connect(self.db_name) as conn:
        state_data = {
            'last_mention_id': self.last_mention_id,
            'last_check_time': self.last_check_time.isoformat() if self.last_check_time else None,
            'mentions_count': str(self.mentions_count),
            'reset_time': self.reset_time.isoformat() if self.reset_time else None
        }
        
        for key, value in state_data.items():
            conn.execute('''
                INSERT OR REPLACE INTO twitter_state (key, value) 
                VALUES (?, ?)
            ''', (key, value))
        conn.commit()

def load(self):
    """Load state from SQLite database."""
    with sqlite3.connect(self.db_name) as conn:
        cursor = conn.execute('SELECT key, value FROM twitter_state')
        for key, value in cursor.fetchall():
            if key == 'last_mention_id':
                self.last_mention_id = value
            elif key == 'last_check_time':
                self.last_check_time = datetime.fromisoformat(value) if value else None
            # ... handle other keys
From twitter_state.py:64-93

Using Knowledge Base as a Tool

From chatbot.py:302-308, the knowledge base is added as a LangChain tool:
if os.getenv("USE_TWITTER_KNOWLEDGE_BASE", "true").lower() == "true" and knowledge_base is not None:
    tools.append(Tool(
        name="query_twitter_knowledge_base",
        description=TWITTER_KNOWLEDGE_BASE_DESCRIPTION,
        func=lambda query: knowledge_base.query_knowledge_base(query)
    ))

Environment Configuration

# Enable Twitter Knowledge Base
USE_TWITTER_KNOWLEDGE_BASE=true

# Enable reply/repost tracking
USE_TWEET_REPLY_TRACKING=true
USE_TWEET_REPOST_TRACKING=true

# Character file (determines DB name)
CHARACTER_FILE=characters/default.json

Usage Examples

Query Knowledge Base

from langchain_core.messages import HumanMessage

response = agent_executor.invoke({
    "messages": [HumanMessage(
        content="What are the latest trends in DeFi according to the knowledge base?"
    )]
})

Check Before Replying

twitter_state = TwitterState()
tweet_id = "1234567890"

if not twitter_state.has_replied_to(tweet_id):
    # Reply to the tweet
    reply_result = await twitter_client.reply_to_tweet(tweet_id, "Great insight!")
    
    # Mark as replied
    twitter_state.add_replied_tweet(tweet_id)
    twitter_state.save()

Avoid Duplicate Reposts

if not twitter_state.has_reposted(tweet_id):
    # Retweet
    await twitter_client.retweet(tweet_id)
    
    # Mark as reposted
    twitter_state.add_reposted_tweet(tweet_id)
    twitter_state.save()

Best Practices

Knowledge Base Management

  • Update regularly with fresh content from KOLs
  • Clear stale data periodically
  • Monitor collection size and performance
  • Use appropriate n_results for queries (default: 10)

State Management

  • Always check state before interactions
  • Save state after each operation
  • Use character-specific databases
  • Back up state databases regularly

Rate Limiting

  • Respect MENTION_CHECK_INTERVAL
  • Monitor MAX_MENTIONS_PER_INTERVAL
  • Implement exponential backoff for errors
  • Use wait_on_rate_limit=True in TwitterClient

Troubleshooting

ChromaDB Issues

If ChromaDB fails to initialize:
# Install required packages
pip install chromadb sentence-transformers

# Clear corrupted database
rm -rf ./chroma_db

SQLite Database Locked

If you get database locked errors:
  • Ensure only one agent instance is running
  • Check file permissions on .db files
  • Close any open database connections

Missing Embeddings

If embeddings fail to generate:
# Download embedding model manually
python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-mpnet-base-v2')"

Next Steps

Build docs developers (and LLMs) love