Skip to main content

Overview

Twikit provides two approaches for monitoring tweets in real-time:
  1. Polling method: Periodically check for new tweets from a user
  2. Streaming API: Subscribe to real-time events like tweet engagement updates
Both methods are useful for different scenarios, from monitoring specific accounts to tracking tweet metrics in real-time.

Polling for new tweets

The polling method checks for new tweets at regular intervals. This is ideal for monitoring when a specific user posts:
import asyncio
from typing import NoReturn
from twikit import Client, Tweet

client = Client()

USER_ID = '44196397'
CHECK_INTERVAL = 60 * 5  # Check every 5 minutes


def callback(tweet: Tweet) -> None:
    print(f'New tweet posted: {tweet.text}')


async def get_latest_tweet() -> Tweet:
    return (await client.get_user_tweets(USER_ID, 'Replies'))[0]


async def main() -> NoReturn:
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    before_tweet = await get_latest_tweet()
    
    while True:
        await asyncio.sleep(CHECK_INTERVAL)
        latest_tweet = await get_latest_tweet()
        
        if (
            before_tweet != latest_tweet and
            before_tweet.created_at_datetime < latest_tweet.created_at_datetime
        ):
            callback(latest_tweet)
        
        before_tweet = latest_tweet

asyncio.run(main())
This polling approach is from Twikit’s official examples and provides a reliable way to detect new tweets without using the streaming API.

Complete polling example

Here’s the actual code from Twikit’s listen_for_new_tweets.py example:
import asyncio
from typing import NoReturn

from twikit import Client, Tweet

AUTH_INFO_1 = '...'
AUTH_INFO_2 = '...'
PASSWORD = '...'

client = Client()

USER_ID = '44196397'
CHECK_INTERVAL = 60 * 5


def callback(tweet: Tweet) -> None:
    print(f'New tweet posted : {tweet.text}')


async def get_latest_tweet() -> Tweet:
    return (await client.get_user_tweets(USER_ID, 'Replies'))[0]


async def main() -> NoReturn:
    await client.login(
        auth_info_1=AUTH_INFO_1,
        auth_info_2=AUTH_INFO_2,
        password=PASSWORD
    )
    
    before_tweet = await get_latest_tweet()

    while True:
        await asyncio.sleep(CHECK_INTERVAL)
        latest_tweet = await get_latest_tweet()
        if (
            before_tweet != latest_tweet and
            before_tweet.created_at_datetime < latest_tweet.created_at_datetime
        ):
            callable(latest_tweet)
        before_tweet = latest_tweet

asyncio.run(main())

How polling works

Let’s break down the polling mechanism:

Setting up the monitor

# User ID to monitor
USER_ID = '44196397'

# Check interval (in seconds)
CHECK_INTERVAL = 60 * 5  # 5 minutes

# Callback function when new tweet is found
def callback(tweet: Tweet) -> None:
    print(f'New tweet: {tweet.text}')
    # Add your custom logic here

Fetching latest tweet

async def get_latest_tweet() -> Tweet:
    # Get user's tweets (can use 'Tweets', 'Replies', etc.)
    tweets = await client.get_user_tweets(USER_ID, 'Replies')
    return tweets[0]  # Return most recent

Comparison loop

# Store the initial latest tweet
before_tweet = await get_latest_tweet()

while True:
    # Wait for the check interval
    await asyncio.sleep(CHECK_INTERVAL)
    
    # Get current latest tweet
    latest_tweet = await get_latest_tweet()
    
    # Compare tweets
    if (
        before_tweet != latest_tweet and
        before_tweet.created_at_datetime < latest_tweet.created_at_datetime
    ):
        callback(latest_tweet)
    
    # Update reference tweet
    before_tweet = latest_tweet
The comparison checks both that tweets are different objects and that the new tweet is actually newer by timestamp to avoid false positives.

Streaming API for engagement

For real-time tweet engagement metrics (likes, retweets, views), use the streaming API:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    # Monitor engagement for specific tweet
    tweet_id = '1234567890'
    topics = {Topic.tweet_engagement(tweet_id)}
    
    streaming_session = await client.get_streaming_session(topics)
    
    async for topic, payload in streaming_session:
        if payload.tweet_engagement:
            print(f'Likes: {payload.tweet_engagement.like_count}')
            print(f'Retweets: {payload.tweet_engagement.retweet_count}')
            print(f'Replies: {payload.tweet_engagement.reply_count}')
            print(f'Views: {payload.tweet_engagement.view_count}')

asyncio.run(main())

Monitoring multiple tweets

You can monitor engagement for multiple tweets simultaneously:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    # Monitor multiple tweets
    tweet_ids = ['1234567890', '0987654321', '1111111111']
    
    topics = {
        Topic.tweet_engagement(tweet_id)
        for tweet_id in tweet_ids
    }
    
    streaming_session = await client.get_streaming_session(topics)
    
    async for topic, payload in streaming_session:
        if payload.tweet_engagement:
            # Extract tweet ID from topic
            tweet_id = topic.split('/')[-1]
            print(f'Tweet {tweet_id} engagement update:')
            print(f'  Likes: {payload.tweet_engagement.like_count}')
            print(f'  Retweets: {payload.tweet_engagement.retweet_count}')

asyncio.run(main())

Custom polling intervals

Adjust the check interval based on your needs:
import asyncio
from twikit import Client, Tweet

client = Client()

# Different interval options
CHECK_EVERY_MINUTE = 60
CHECK_EVERY_5_MINUTES = 60 * 5
CHECK_EVERY_HOUR = 60 * 60

async def monitor_user(user_id: str, interval: int):
    """Monitor a user for new tweets."""
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    tweets = await client.get_user_tweets(user_id, 'Tweets')
    last_tweet = tweets[0]
    
    while True:
        await asyncio.sleep(interval)
        
        tweets = await client.get_user_tweets(user_id, 'Tweets')
        current_tweet = tweets[0]
        
        if (
            last_tweet != current_tweet and
            last_tweet.created_at_datetime < current_tweet.created_at_datetime
        ):
            print(f'New tweet from {user_id}: {current_tweet.text}')
        
        last_tweet = current_tweet

async def main():
    # Monitor with 5-minute intervals
    await monitor_user('44196397', CHECK_EVERY_5_MINUTES)

asyncio.run(main())
Very short polling intervals may hit rate limits. For frequent checks, consider using intervals of 2-5 minutes or longer.

Processing new tweets

Perform actions when new tweets are detected:
import asyncio
from twikit import Client, Tweet

client = Client()

async def process_new_tweet(tweet: Tweet):
    """Process newly detected tweets."""
    print(f'New tweet from @{tweet.user.screen_name}')
    print(f'Text: {tweet.text}')
    print(f'Posted at: {tweet.created_at}')
    
    # Like the tweet
    await tweet.favorite()
    
    # Retweet it
    await tweet.retweet()
    
    # Reply to it
    await tweet.reply('Great tweet!')

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    user_id = '44196397'
    check_interval = 60 * 5
    
    tweets = await client.get_user_tweets(user_id, 'Tweets')
    last_tweet = tweets[0]
    
    while True:
        await asyncio.sleep(check_interval)
        
        tweets = await client.get_user_tweets(user_id, 'Tweets')
        current_tweet = tweets[0]
        
        if (
            last_tweet != current_tweet and
            last_tweet.created_at_datetime < current_tweet.created_at_datetime
        ):
            await process_new_tweet(current_tweet)
        
        last_tweet = current_tweet

asyncio.run(main())

Monitoring multiple users

Track tweets from multiple accounts:
import asyncio
from twikit import Client, Tweet

client = Client()

async def monitor_single_user(user_id: str, check_interval: int):
    """Monitor a single user for new tweets."""
    tweets = await client.get_user_tweets(user_id, 'Tweets')
    last_tweet = tweets[0]
    
    while True:
        await asyncio.sleep(check_interval)
        
        tweets = await client.get_user_tweets(user_id, 'Tweets')
        current_tweet = tweets[0]
        
        if (
            last_tweet != current_tweet and
            last_tweet.created_at_datetime < current_tweet.created_at_datetime
        ):
            user = await client.get_user_by_id(user_id)
            print(f'New tweet from @{user.screen_name}: {current_tweet.text}')
        
        last_tweet = current_tweet

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    # List of user IDs to monitor
    user_ids = ['44196397', '1234567890', '0987654321']
    check_interval = 60 * 5
    
    # Create monitoring tasks for all users
    tasks = [
        monitor_single_user(user_id, check_interval)
        for user_id in user_ids
    ]
    
    # Run all monitoring tasks concurrently
    await asyncio.gather(*tasks)

asyncio.run(main())

Error handling

Add robust error handling for long-running monitors:
import asyncio
from twikit import Client, Tweet

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    user_id = '44196397'
    check_interval = 60 * 5
    
    last_tweet = None
    
    while True:
        try:
            # Get latest tweet
            tweets = await client.get_user_tweets(user_id, 'Tweets')
            current_tweet = tweets[0]
            
            # Check if this is first run
            if last_tweet is None:
                last_tweet = current_tweet
                print('Monitoring started')
                continue
            
            # Compare tweets
            if (
                last_tweet != current_tweet and
                last_tweet.created_at_datetime < current_tweet.created_at_datetime
            ):
                print(f'New tweet: {current_tweet.text}')
            
            last_tweet = current_tweet
            
        except Exception as e:
            print(f'Error occurred: {e}')
            print('Retrying in 30 seconds...')
            await asyncio.sleep(30)
            continue
        
        await asyncio.sleep(check_interval)

asyncio.run(main())

Streaming session auto-reconnect

For streaming API, enable automatic reconnection:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    tweet_id = '1234567890'
    topics = {Topic.tweet_engagement(tweet_id)}
    
    # Enable auto_reconnect
    streaming_session = await client.get_streaming_session(
        topics,
        auto_reconnect=True
    )
    
    async for topic, payload in streaming_session:
        if payload.config:
            print('Streaming session started')
        
        if payload.tweet_engagement:
            print(f'Engagement update:')
            print(f'  Likes: {payload.tweet_engagement.like_count}')
            print(f'  Retweets: {payload.tweet_engagement.retweet_count}')

asyncio.run(main())
The streaming session will automatically reconnect if the connection is lost when auto_reconnect=True is set.

Key points

1

Choose monitoring method

Use polling for new tweets or streaming API for real-time engagement metrics
2

Set appropriate intervals

Balance between responsiveness and rate limits (5 minutes is a good default)
3

Compare timestamps

Check both tweet equality and timestamps to avoid false positives
4

Handle errors gracefully

Wrap monitoring loops in try-catch blocks for long-running stability
5

Use concurrent monitoring

Monitor multiple users with asyncio.gather() for efficient parallel execution

Build docs developers (and LLMs) love