Twikit is built on asynchronous I/O to provide several advantages:
Better performance: Handle multiple API requests concurrently
Non-blocking operations: Your program continues running while waiting for API responses
Efficient resource usage: Use fewer system resources than synchronous alternatives
Scalability: Process hundreds of requests without spawning multiple threads
All Twikit client methods are coroutines and must be called using await. Forgetting to use await will return a coroutine object instead of the actual result.
import asyncioasync def fetch_multiple_users(): """Fetch multiple users concurrently.""" # These run at the same time! users = await asyncio.gather( client.get_user_by_screen_name('elonmusk'), client.get_user_by_screen_name('BillGates'), client.get_user_by_screen_name('jeffbezos') ) for user in users: print(f'{user.name}: {user.followers_count} followers') return users# Much faster than fetching one at a time!users = await fetch_multiple_users()
asyncio.gather() is perfect when you need to fetch multiple independent resources. It can be 5-10x faster than sequential requests.
# get_user_tweets_by_id returns an AsyncGeneratorasync for tweet in client.get_user_tweets_by_id('44196397', 'Tweets'): print(tweet.text) # Process each tweet as it's fetched if tweet.favorite_count > 1000: print(f'Popular tweet: {tweet.favorite_count} likes')
# Collect all tweets into a listtweets = []async for tweet in client.get_user_tweets_by_id('44196397', 'Tweets'): tweets.append(tweet) # Limit to 100 tweets if len(tweets) >= 100: breakprint(f'Collected {len(tweets)} tweets')
async def get_limited_tweets(user_id, limit=50): """Fetch a limited number of tweets using AsyncGenerator.""" tweets = [] async for tweet in client.get_user_tweets_by_id(user_id, 'Tweets'): tweets.append(tweet) if len(tweets) >= limit: break return tweetstweets = await get_limited_tweets('44196397', limit=100)
async def process_tweets_in_batches(user_id, batch_size=20): """Process tweets in batches for memory efficiency.""" batch = [] async for tweet in client.get_user_tweets_by_id(user_id, 'Tweets'): batch.append(tweet) if len(batch) >= batch_size: # Process batch await process_batch(batch) batch = [] # Process remaining tweets if batch: await process_batch(batch)async def process_batch(tweets): """Process a batch of tweets.""" for tweet in tweets: print(tweet.text)
Many methods return Result objects that support pagination:
# Initial requesttweets = await client.search_tweet('python', 'Latest', count=20)# Iterate through first batchfor tweet in tweets: print(tweet.text)# Get next pagemore_tweets = await tweets.next()for tweet in more_tweets: print(tweet.text)# Get previous pageprevious_tweets = await tweets.previous()for tweet in previous_tweets: print(tweet.text)
import time# Wrong: Blocking the event loopasync def blocking_function(): user = await client.get_user_by_screen_name('username') time.sleep(10) # This blocks everything! return user# Correct: Use async sleepasync def non_blocking_function(): user = await client.get_user_by_screen_name('username') await asyncio.sleep(10) # This allows other tasks to run return user
import asynciofrom twikit.errors import TooManyRequestsasync def rate_limited_gather(*coros, max_concurrent=5): """Run coroutines with concurrency limit.""" semaphore = asyncio.Semaphore(max_concurrent) async def limited_coro(coro): async with semaphore: return await coro return await asyncio.gather(*[limited_coro(c) for c in coros])# Fetch 50 users with max 5 concurrent requestsuser_ids = [f'{i}' for i in range(50)]coros = [client.get_user_by_id(uid) for uid in user_ids]users = await rate_limited_gather(*coros, max_concurrent=5)
async def monitor_user_tweets(user_id, interval=60): """Monitor a user's tweets in the background.""" seen_tweets = set() while True: try: tweets = await client.get_user_tweets_by_id(user_id, 'Tweets') for tweet in tweets: if tweet.id not in seen_tweets: print(f'New tweet: {tweet.text}') seen_tweets.add(tweet.id) await asyncio.sleep(interval) except Exception as e: print(f'Error: {e}') await asyncio.sleep(interval)async def main(): # Start background monitor monitor_task = asyncio.create_task(monitor_user_tweets('44196397')) # Do other work await other_operations() # Cancel monitor when done monitor_task.cancel()
Jupyter notebooks require special handling for async code:
# In Jupyter, you can use await directly at the top levelfrom twikit import Clientclient = Client('en-US')await client.login( auth_info_1='username', password='password')tweets = await client.search_tweet('python', 'Latest')for tweet in tweets: print(tweet.text)