Skip to main content
Twikit is an asynchronous library that uses Python’s async/await syntax. Understanding async programming is essential for using Twikit effectively.

Why Twikit is async-only

Twikit is built on asynchronous I/O to provide several advantages:
  • Better performance: Handle multiple API requests concurrently
  • Non-blocking operations: Your program continues running while waiting for API responses
  • Efficient resource usage: Use fewer system resources than synchronous alternatives
  • Scalability: Process hundreds of requests without spawning multiple threads
All Twikit client methods are coroutines and must be called using await. Forgetting to use await will return a coroutine object instead of the actual result.

Basic async/await syntax

Running async code

Use asyncio.run() to execute async functions:
import asyncio
from twikit import Client

client = Client('en-US')

async def main():
    await client.login(
        auth_info_1='username',
        password='password'
    )
    
    tweets = await client.search_tweet('python', 'Latest')
    for tweet in tweets:
        print(tweet.text)

# Run the async function
asyncio.run(main())
asyncio.run() should only be called once in your program, typically at the entry point. Don’t call it inside other async functions.

The await keyword

Use await to call async functions:
# Correct: Using await
user = await client.get_user_by_screen_name('elonmusk')
print(user.name)  # "Elon Musk"

# Wrong: Forgetting await
user = client.get_user_by_screen_name('elonmusk')  # Returns a coroutine object!
print(user)  # <coroutine object get_user_by_screen_name at 0x...>

Async function definition

Define your own async functions with async def:
async def get_user_info(screen_name):
    """Fetch and print user information."""
    user = await client.get_user_by_screen_name(screen_name)
    print(f'Name: {user.name}')
    print(f'Followers: {user.followers_count}')
    print(f'Bio: {user.description}')
    return user

# Call it with await
user = await get_user_info('elonmusk')

Running multiple requests concurrently

One of the biggest advantages of async is running multiple operations at the same time.

Using asyncio.gather()

Run multiple coroutines concurrently:
import asyncio

async def fetch_multiple_users():
    """Fetch multiple users concurrently."""
    # These run at the same time!
    users = await asyncio.gather(
        client.get_user_by_screen_name('elonmusk'),
        client.get_user_by_screen_name('BillGates'),
        client.get_user_by_screen_name('jeffbezos')
    )
    
    for user in users:
        print(f'{user.name}: {user.followers_count} followers')
    
    return users

# Much faster than fetching one at a time!
users = await fetch_multiple_users()
asyncio.gather() is perfect when you need to fetch multiple independent resources. It can be 5-10x faster than sequential requests.

Sequential vs concurrent comparison

import time

# Sequential: Slow (3-5 seconds)
async def fetch_sequential():
    start = time.time()
    
    user1 = await client.get_user_by_screen_name('user1')
    user2 = await client.get_user_by_screen_name('user2')
    user3 = await client.get_user_by_screen_name('user3')
    
    print(f'Sequential: {time.time() - start:.2f}s')
    return [user1, user2, user3]

# Concurrent: Fast (1-2 seconds)
async def fetch_concurrent():
    start = time.time()
    
    users = await asyncio.gather(
        client.get_user_by_screen_name('user1'),
        client.get_user_by_screen_name('user2'),
        client.get_user_by_screen_name('user3')
    )
    
    print(f'Concurrent: {time.time() - start:.2f}s')
    return users

Using asyncio.create_task()

Create tasks for more control over concurrent execution:
async def process_multiple_queries():
    """Process multiple search queries concurrently."""
    # Create tasks
    task1 = asyncio.create_task(client.search_tweet('python', 'Latest'))
    task2 = asyncio.create_task(client.search_tweet('javascript', 'Latest'))
    task3 = asyncio.create_task(client.search_tweet('rust', 'Latest'))
    
    # Do other work while tasks run
    print('Fetching tweets...')
    
    # Wait for all tasks to complete
    tweets1 = await task1
    tweets2 = await task2
    tweets3 = await task3
    
    return tweets1, tweets2, tweets3

AsyncGenerator patterns for pagination

Some Twikit methods return AsyncGenerator for streaming large result sets.

Basic AsyncGenerator usage

# get_user_tweets_by_id returns an AsyncGenerator
async for tweet in client.get_user_tweets_by_id('44196397', 'Tweets'):
    print(tweet.text)
    
    # Process each tweet as it's fetched
    if tweet.favorite_count > 1000:
        print(f'Popular tweet: {tweet.favorite_count} likes')

Converting AsyncGenerator to list

# Collect all tweets into a list
tweets = []
async for tweet in client.get_user_tweets_by_id('44196397', 'Tweets'):
    tweets.append(tweet)
    
    # Limit to 100 tweets
    if len(tweets) >= 100:
        break

print(f'Collected {len(tweets)} tweets')

AsyncGenerator with limit

async def get_limited_tweets(user_id, limit=50):
    """Fetch a limited number of tweets using AsyncGenerator."""
    tweets = []
    async for tweet in client.get_user_tweets_by_id(user_id, 'Tweets'):
        tweets.append(tweet)
        if len(tweets) >= limit:
            break
    return tweets

tweets = await get_limited_tweets('44196397', limit=100)

Processing in batches

async def process_tweets_in_batches(user_id, batch_size=20):
    """Process tweets in batches for memory efficiency."""
    batch = []
    
    async for tweet in client.get_user_tweets_by_id(user_id, 'Tweets'):
        batch.append(tweet)
        
        if len(batch) >= batch_size:
            # Process batch
            await process_batch(batch)
            batch = []
    
    # Process remaining tweets
    if batch:
        await process_batch(batch)

async def process_batch(tweets):
    """Process a batch of tweets."""
    for tweet in tweets:
        print(tweet.text)

Result objects and pagination

Many methods return Result objects that support pagination:
# Initial request
tweets = await client.search_tweet('python', 'Latest', count=20)

# Iterate through first batch
for tweet in tweets:
    print(tweet.text)

# Get next page
more_tweets = await tweets.next()
for tweet in more_tweets:
    print(tweet.text)

# Get previous page
previous_tweets = await tweets.previous()
for tweet in previous_tweets:
    print(tweet.text)

Paginate through all results

async def get_all_search_results(query, max_pages=5):
    """Paginate through search results."""
    all_tweets = []
    
    tweets = await client.search_tweet(query, 'Latest')
    all_tweets.extend(tweets)
    
    for _ in range(max_pages - 1):
        if not tweets.next_cursor:
            break
        
        tweets = await tweets.next()
        all_tweets.extend(tweets)
    
    return all_tweets

tweets = await get_all_search_results('python', max_pages=10)
print(f'Found {len(tweets)} tweets')

Common async pitfalls and solutions

Pitfall 1: Forgetting await

# Wrong: Forgetting await
user = client.get_user_by_screen_name('username')  # Returns coroutine!
print(user.name)  # AttributeError: 'coroutine' object has no attribute 'name'

# Correct: Using await
user = await client.get_user_by_screen_name('username')
print(user.name)  # Works!

Pitfall 2: Using asyncio.run() inside async function

# Wrong: asyncio.run() inside async function
async def bad_function():
    asyncio.run(client.search_tweet('query', 'Latest'))  # RuntimeError!

# Correct: Use await directly
async def good_function():
    tweets = await client.search_tweet('query', 'Latest')
    return tweets

Pitfall 3: Not handling exceptions in concurrent tasks

# Problem: One failed task crashes everything
async def risky_concurrent():
    await asyncio.gather(
        client.get_user_by_screen_name('user1'),
        client.get_user_by_screen_name('nonexistent'),  # This fails!
        client.get_user_by_screen_name('user2')
    )

# Solution: Use return_exceptions=True
async def safe_concurrent():
    results = await asyncio.gather(
        client.get_user_by_screen_name('user1'),
        client.get_user_by_screen_name('nonexistent'),
        client.get_user_by_screen_name('user2'),
        return_exceptions=True  # Exceptions returned as values
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f'Task {i} failed: {result}')
        else:
            print(f'Task {i} succeeded: {result.name}')

Pitfall 4: Blocking the event loop

import time

# Wrong: Blocking the event loop
async def blocking_function():
    user = await client.get_user_by_screen_name('username')
    time.sleep(10)  # This blocks everything!
    return user

# Correct: Use async sleep
async def non_blocking_function():
    user = await client.get_user_by_screen_name('username')
    await asyncio.sleep(10)  # This allows other tasks to run
    return user

Pitfall 5: Creating client in wrong scope

# Wrong: Creating client inside loop
async def inefficient():
    for i in range(10):
        client = Client('en-US')  # Creates new client each time!
        await client.login(...)
        tweets = await client.search_tweet('query', 'Latest')

# Correct: Reuse client
client = Client('en-US')

async def efficient():
    await client.login(...)
    
    for i in range(10):
        tweets = await client.search_tweet(f'query_{i}', 'Latest')

Advanced patterns

Rate-limited concurrent requests

import asyncio
from twikit.errors import TooManyRequests

async def rate_limited_gather(*coros, max_concurrent=5):
    """Run coroutines with concurrency limit."""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_coro(coro):
        async with semaphore:
            return await coro
    
    return await asyncio.gather(*[limited_coro(c) for c in coros])

# Fetch 50 users with max 5 concurrent requests
user_ids = [f'{i}' for i in range(50)]
coros = [client.get_user_by_id(uid) for uid in user_ids]
users = await rate_limited_gather(*coros, max_concurrent=5)

Async context manager

class TwitterSession:
    """Async context manager for Twitter client."""
    
    def __init__(self, username, password):
        self.client = Client('en-US')
        self.username = username
        self.password = password
    
    async def __aenter__(self):
        await self.client.login(
            auth_info_1=self.username,
            password=self.password
        )
        return self.client
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.logout()

# Usage
async with TwitterSession('username', 'password') as client:
    tweets = await client.search_tweet('python', 'Latest')
    for tweet in tweets:
        print(tweet.text)

Background tasks

async def monitor_user_tweets(user_id, interval=60):
    """Monitor a user's tweets in the background."""
    seen_tweets = set()
    
    while True:
        try:
            tweets = await client.get_user_tweets_by_id(user_id, 'Tweets')
            
            for tweet in tweets:
                if tweet.id not in seen_tweets:
                    print(f'New tweet: {tweet.text}')
                    seen_tweets.add(tweet.id)
            
            await asyncio.sleep(interval)
        except Exception as e:
            print(f'Error: {e}')
            await asyncio.sleep(interval)

async def main():
    # Start background monitor
    monitor_task = asyncio.create_task(monitor_user_tweets('44196397'))
    
    # Do other work
    await other_operations()
    
    # Cancel monitor when done
    monitor_task.cancel()

Queue-based processing

async def producer(queue, user_ids):
    """Produce user IDs to process."""
    for user_id in user_ids:
        await queue.put(user_id)
    await queue.put(None)  # Sentinel value

async def consumer(queue):
    """Consume and process user IDs."""
    while True:
        user_id = await queue.get()
        if user_id is None:
            break
        
        try:
            user = await client.get_user_by_id(user_id)
            print(f'Processed: {user.name}')
        except Exception as e:
            print(f'Error processing {user_id}: {e}')

async def main():
    queue = asyncio.Queue()
    user_ids = ['123', '456', '789']
    
    # Run producer and consumer concurrently
    await asyncio.gather(
        producer(queue, user_ids),
        consumer(queue)
    )

Using Twikit in Jupyter notebooks

Jupyter notebooks require special handling for async code:
# In Jupyter, you can use await directly at the top level
from twikit import Client

client = Client('en-US')
await client.login(
    auth_info_1='username',
    password='password'
)

tweets = await client.search_tweet('python', 'Latest')
for tweet in tweets:
    print(tweet.text)

Rate limits

Understand rate limits for concurrent requests

Error handling

Handle errors in async code

Build docs developers (and LLMs) love