Overview
Twikit provides two approaches for monitoring tweets in real-time:
- Polling method: Periodically check for new tweets from a user
- Streaming API: Subscribe to real-time events like tweet engagement updates
Both methods are useful for different scenarios, from monitoring specific accounts to tracking tweet metrics in real-time.
The polling method checks for new tweets at regular intervals. This is ideal for monitoring when a specific user posts:
import asyncio
from typing import NoReturn
from twikit import Client, Tweet
client = Client()
USER_ID = '44196397'
CHECK_INTERVAL = 60 * 5 # Check every 5 minutes
def callback(tweet: Tweet) -> None:
print(f'New tweet posted: {tweet.text}')
async def get_latest_tweet() -> Tweet:
return (await client.get_user_tweets(USER_ID, 'Replies'))[0]
async def main() -> NoReturn:
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
before_tweet = await get_latest_tweet()
while True:
await asyncio.sleep(CHECK_INTERVAL)
latest_tweet = await get_latest_tweet()
if (
before_tweet != latest_tweet and
before_tweet.created_at_datetime < latest_tweet.created_at_datetime
):
callback(latest_tweet)
before_tweet = latest_tweet
asyncio.run(main())
This polling approach is from Twikit’s official examples and provides a reliable way to detect new tweets without using the streaming API.
Complete polling example
Here’s the actual code from Twikit’s listen_for_new_tweets.py example:
import asyncio
from typing import NoReturn
from twikit import Client, Tweet
AUTH_INFO_1 = '...'
AUTH_INFO_2 = '...'
PASSWORD = '...'
client = Client()
USER_ID = '44196397'
CHECK_INTERVAL = 60 * 5
def callback(tweet: Tweet) -> None:
print(f'New tweet posted : {tweet.text}')
async def get_latest_tweet() -> Tweet:
return (await client.get_user_tweets(USER_ID, 'Replies'))[0]
async def main() -> NoReturn:
await client.login(
auth_info_1=AUTH_INFO_1,
auth_info_2=AUTH_INFO_2,
password=PASSWORD
)
before_tweet = await get_latest_tweet()
while True:
await asyncio.sleep(CHECK_INTERVAL)
latest_tweet = await get_latest_tweet()
if (
before_tweet != latest_tweet and
before_tweet.created_at_datetime < latest_tweet.created_at_datetime
):
callable(latest_tweet)
before_tweet = latest_tweet
asyncio.run(main())
How polling works
Let’s break down the polling mechanism:
Setting up the monitor
# User ID to monitor
USER_ID = '44196397'
# Check interval (in seconds)
CHECK_INTERVAL = 60 * 5 # 5 minutes
# Callback function when new tweet is found
def callback(tweet: Tweet) -> None:
print(f'New tweet: {tweet.text}')
# Add your custom logic here
async def get_latest_tweet() -> Tweet:
# Get user's tweets (can use 'Tweets', 'Replies', etc.)
tweets = await client.get_user_tweets(USER_ID, 'Replies')
return tweets[0] # Return most recent
Comparison loop
# Store the initial latest tweet
before_tweet = await get_latest_tweet()
while True:
# Wait for the check interval
await asyncio.sleep(CHECK_INTERVAL)
# Get current latest tweet
latest_tweet = await get_latest_tweet()
# Compare tweets
if (
before_tweet != latest_tweet and
before_tweet.created_at_datetime < latest_tweet.created_at_datetime
):
callback(latest_tweet)
# Update reference tweet
before_tweet = latest_tweet
The comparison checks both that tweets are different objects and that the new tweet is actually newer by timestamp to avoid false positives.
Streaming API for engagement
For real-time tweet engagement metrics (likes, retweets, views), use the streaming API:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
# Monitor engagement for specific tweet
tweet_id = '1234567890'
topics = {Topic.tweet_engagement(tweet_id)}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.tweet_engagement:
print(f'Likes: {payload.tweet_engagement.like_count}')
print(f'Retweets: {payload.tweet_engagement.retweet_count}')
print(f'Replies: {payload.tweet_engagement.reply_count}')
print(f'Views: {payload.tweet_engagement.view_count}')
asyncio.run(main())
You can monitor engagement for multiple tweets simultaneously:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
# Monitor multiple tweets
tweet_ids = ['1234567890', '0987654321', '1111111111']
topics = {
Topic.tweet_engagement(tweet_id)
for tweet_id in tweet_ids
}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.tweet_engagement:
# Extract tweet ID from topic
tweet_id = topic.split('/')[-1]
print(f'Tweet {tweet_id} engagement update:')
print(f' Likes: {payload.tweet_engagement.like_count}')
print(f' Retweets: {payload.tweet_engagement.retweet_count}')
asyncio.run(main())
Custom polling intervals
Adjust the check interval based on your needs:
import asyncio
from twikit import Client, Tweet
client = Client()
# Different interval options
CHECK_EVERY_MINUTE = 60
CHECK_EVERY_5_MINUTES = 60 * 5
CHECK_EVERY_HOUR = 60 * 60
async def monitor_user(user_id: str, interval: int):
"""Monitor a user for new tweets."""
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
tweets = await client.get_user_tweets(user_id, 'Tweets')
last_tweet = tweets[0]
while True:
await asyncio.sleep(interval)
tweets = await client.get_user_tweets(user_id, 'Tweets')
current_tweet = tweets[0]
if (
last_tweet != current_tweet and
last_tweet.created_at_datetime < current_tweet.created_at_datetime
):
print(f'New tweet from {user_id}: {current_tweet.text}')
last_tweet = current_tweet
async def main():
# Monitor with 5-minute intervals
await monitor_user('44196397', CHECK_EVERY_5_MINUTES)
asyncio.run(main())
Very short polling intervals may hit rate limits. For frequent checks, consider using intervals of 2-5 minutes or longer.
Perform actions when new tweets are detected:
import asyncio
from twikit import Client, Tweet
client = Client()
async def process_new_tweet(tweet: Tweet):
"""Process newly detected tweets."""
print(f'New tweet from @{tweet.user.screen_name}')
print(f'Text: {tweet.text}')
print(f'Posted at: {tweet.created_at}')
# Like the tweet
await tweet.favorite()
# Retweet it
await tweet.retweet()
# Reply to it
await tweet.reply('Great tweet!')
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
user_id = '44196397'
check_interval = 60 * 5
tweets = await client.get_user_tweets(user_id, 'Tweets')
last_tweet = tweets[0]
while True:
await asyncio.sleep(check_interval)
tweets = await client.get_user_tweets(user_id, 'Tweets')
current_tweet = tweets[0]
if (
last_tweet != current_tweet and
last_tweet.created_at_datetime < current_tweet.created_at_datetime
):
await process_new_tweet(current_tweet)
last_tweet = current_tweet
asyncio.run(main())
Monitoring multiple users
Track tweets from multiple accounts:
import asyncio
from twikit import Client, Tweet
client = Client()
async def monitor_single_user(user_id: str, check_interval: int):
"""Monitor a single user for new tweets."""
tweets = await client.get_user_tweets(user_id, 'Tweets')
last_tweet = tweets[0]
while True:
await asyncio.sleep(check_interval)
tweets = await client.get_user_tweets(user_id, 'Tweets')
current_tweet = tweets[0]
if (
last_tweet != current_tweet and
last_tweet.created_at_datetime < current_tweet.created_at_datetime
):
user = await client.get_user_by_id(user_id)
print(f'New tweet from @{user.screen_name}: {current_tweet.text}')
last_tweet = current_tweet
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
# List of user IDs to monitor
user_ids = ['44196397', '1234567890', '0987654321']
check_interval = 60 * 5
# Create monitoring tasks for all users
tasks = [
monitor_single_user(user_id, check_interval)
for user_id in user_ids
]
# Run all monitoring tasks concurrently
await asyncio.gather(*tasks)
asyncio.run(main())
Error handling
Add robust error handling for long-running monitors:
import asyncio
from twikit import Client, Tweet
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
user_id = '44196397'
check_interval = 60 * 5
last_tweet = None
while True:
try:
# Get latest tweet
tweets = await client.get_user_tweets(user_id, 'Tweets')
current_tweet = tweets[0]
# Check if this is first run
if last_tweet is None:
last_tweet = current_tweet
print('Monitoring started')
continue
# Compare tweets
if (
last_tweet != current_tweet and
last_tweet.created_at_datetime < current_tweet.created_at_datetime
):
print(f'New tweet: {current_tweet.text}')
last_tweet = current_tweet
except Exception as e:
print(f'Error occurred: {e}')
print('Retrying in 30 seconds...')
await asyncio.sleep(30)
continue
await asyncio.sleep(check_interval)
asyncio.run(main())
Streaming session auto-reconnect
For streaming API, enable automatic reconnection:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
tweet_id = '1234567890'
topics = {Topic.tweet_engagement(tweet_id)}
# Enable auto_reconnect
streaming_session = await client.get_streaming_session(
topics,
auto_reconnect=True
)
async for topic, payload in streaming_session:
if payload.config:
print('Streaming session started')
if payload.tweet_engagement:
print(f'Engagement update:')
print(f' Likes: {payload.tweet_engagement.like_count}')
print(f' Retweets: {payload.tweet_engagement.retweet_count}')
asyncio.run(main())
The streaming session will automatically reconnect if the connection is lost when auto_reconnect=True is set.
Key points
Choose monitoring method
Use polling for new tweets or streaming API for real-time engagement metrics
Set appropriate intervals
Balance between responsiveness and rate limits (5 minutes is a good default)
Compare timestamps
Check both tweet equality and timestamps to avoid false positives
Handle errors gracefully
Wrap monitoring loops in try-catch blocks for long-running stability
Use concurrent monitoring
Monitor multiple users with asyncio.gather() for efficient parallel execution