Skip to main content

Overview

Twikit’s streaming API allows you to monitor direct messages in real-time and automatically respond to incoming messages. This is useful for creating chatbots, automated customer support, or auto-reply systems.
Auto-reply systems should be used responsibly. Make sure to comply with Twitter’s automation rules and avoid spamming users.

How it works

The DM auto-reply system uses Twikit’s streaming functionality to:
  1. Establish a streaming session for DM updates
  2. Monitor incoming messages from a specific user or conversation
  3. Automatically send a reply when new messages are detected
  4. Filter out messages sent by your own account to avoid loops

Basic auto-reply setup

Here’s a simple example that responds to all DMs from a specific user:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    # User ID to monitor
    partner_user_id = '1234567890'
    your_user_id = await client.user_id()
    
    # Create conversation ID
    conversation_id = f'{your_user_id}-{partner_user_id}'
    
    # Set up streaming topic
    topics = {Topic.dm_update(conversation_id)}
    streaming_session = await client.get_streaming_session(topics)
    
    # Listen for messages
    async for topic, payload in streaming_session:
        if payload.dm_update:
            # Ignore your own messages
            if payload.dm_update.user_id == your_user_id:
                continue
            
            # Send auto-reply
            await client.send_dm(
                payload.dm_update.user_id,
                'Thank you for your message!'
            )

asyncio.run(main())

Complete working example

This is the actual code from Twikit’s examples for DM auto-reply:
import asyncio
import os

from twikit import Client
from twikit.streaming import Topic

AUTH_INFO_1 = ''
AUTH_INFO_2 = ''
PASSWORD = ''

client = Client()


async def main():
    # Login with cookies if available
    if os.path.exists('cookies.json'):
        client.load_cookies('cookies.json')
    else:
        await client.login(
            auth_info_1=AUTH_INFO_1,
            auth_info_2=AUTH_INFO_2,
            password=PASSWORD
        )
        client.save_cookies('cookies.json')

    # User ID of the DM partner to stream
    user_id = '1752362966203469824'
    reply_message = 'Hello'

    # Set up streaming topics
    topics = {
        Topic.dm_update(f'{await client.user_id()}-{user_id}')
    }
    streaming_session = await client.get_streaming_session(topics)

    # Listen for DM updates
    async for topic, payload in streaming_session:
        if payload.dm_update:
            # Ignore messages sent by yourself
            if await client.user_id() == payload.dm_update.user_id:
                continue
            # Send auto-reply
            await client.send_dm(payload.dm_update.user_id, reply_message)

asyncio.run(main())

Understanding the streaming loop

Let’s break down how the streaming session works:

Setting up topics

# Get your user ID
your_user_id = await client.user_id()

# Partner user ID (who you're chatting with)
partner_user_id = '1234567890'

# Create conversation ID
conversation_id = f'{your_user_id}-{partner_user_id}'

# Subscribe to DM updates
topics = {Topic.dm_update(conversation_id)}

Creating streaming session

# Get streaming session
streaming_session = await client.get_streaming_session(topics)

# Iterate over events
async for topic, payload in streaming_session:
    # Handle events here
    pass

Processing DM updates

async for topic, payload in streaming_session:
    if payload.dm_update:
        # Check who sent the message
        sender_id = payload.dm_update.user_id
        conversation_id = payload.dm_update.conversation_id
        
        # Ignore your own messages
        if sender_id == await client.user_id():
            continue
        
        # Send reply
        await client.send_dm(sender_id, 'Your reply here')
Always filter out your own messages by checking the user_id to prevent infinite reply loops.
The example uses cookie-based authentication to avoid repeated logins:
import os
from twikit import Client

client = Client()

async def main():
    # Load cookies if they exist
    if os.path.exists('cookies.json'):
        client.load_cookies('cookies.json')
    else:
        # Login and save cookies
        await client.login(
            auth_info_1='USERNAME',
            auth_info_2='EMAIL',
            password='PASSWORD'
        )
        client.save_cookies('cookies.json')
Using cookies allows your bot to maintain sessions without re-authenticating every time it starts, which is more efficient and reduces the risk of triggering security checks.

Customizing auto-replies

You can create more sophisticated auto-reply logic:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    partner_user_id = '1234567890'
    your_user_id = await client.user_id()
    conversation_id = f'{your_user_id}-{partner_user_id}'
    
    topics = {Topic.dm_update(conversation_id)}
    streaming_session = await client.get_streaming_session(topics)
    
    async for topic, payload in streaming_session:
        if payload.dm_update:
            if payload.dm_update.user_id == your_user_id:
                continue
            
            # Get the actual message content
            user = await client.get_user_by_id(payload.dm_update.user_id)
            messages = await user.get_dm_history()
            latest_message = messages[0]
            
            # Custom reply logic
            if 'hello' in latest_message.text.lower():
                reply = 'Hi there! How can I help you?'
            elif 'help' in latest_message.text.lower():
                reply = 'I am here to assist you!'
            else:
                reply = 'Thanks for your message!'
            
            await client.send_dm(payload.dm_update.user_id, reply)

asyncio.run(main())

Monitoring multiple conversations

You can monitor DMs from multiple users simultaneously:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    your_user_id = await client.user_id()
    
    # Multiple users to monitor
    partner_user_ids = ['1234567890', '0987654321', '1111111111']
    
    # Create topics for all conversations
    topics = {
        Topic.dm_update(f'{your_user_id}-{user_id}')
        for user_id in partner_user_ids
    }
    
    streaming_session = await client.get_streaming_session(topics)
    
    async for topic, payload in streaming_session:
        if payload.dm_update:
            if payload.dm_update.user_id == your_user_id:
                continue
            
            await client.send_dm(
                payload.dm_update.user_id,
                'Thank you for reaching out!'
            )

asyncio.run(main())

Rate limit considerations

When implementing auto-reply systems, be aware of rate limits:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()
message_count = 0
max_messages_per_hour = 50

async def main():
    global message_count
    
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    partner_user_id = '1234567890'
    your_user_id = await client.user_id()
    conversation_id = f'{your_user_id}-{partner_user_id}'
    
    topics = {Topic.dm_update(conversation_id)}
    streaming_session = await client.get_streaming_session(topics)
    
    async for topic, payload in streaming_session:
        if payload.dm_update:
            if payload.dm_update.user_id == your_user_id:
                continue
            
            # Check rate limit
            if message_count >= max_messages_per_hour:
                print('Rate limit reached, waiting...')
                await asyncio.sleep(3600)  # Wait 1 hour
                message_count = 0
            
            await client.send_dm(
                payload.dm_update.user_id,
                'Thank you for your message!'
            )
            message_count += 1

asyncio.run(main())
Sending too many automated messages can result in account restrictions. Implement proper rate limiting and avoid spamming users.

Error handling and reconnection

Add error handling to keep your bot running:
import asyncio
from twikit import Client
from twikit.streaming import Topic

client = Client()

async def main():
    await client.login(
        auth_info_1='USERNAME',
        auth_info_2='EMAIL',
        password='PASSWORD'
    )
    
    partner_user_id = '1234567890'
    your_user_id = await client.user_id()
    conversation_id = f'{your_user_id}-{partner_user_id}'
    
    while True:
        try:
            topics = {Topic.dm_update(conversation_id)}
            streaming_session = await client.get_streaming_session(topics)
            
            async for topic, payload in streaming_session:
                if payload.dm_update:
                    if payload.dm_update.user_id == your_user_id:
                        continue
                    
                    await client.send_dm(
                        payload.dm_update.user_id,
                        'Thank you for your message!'
                    )
        except Exception as e:
            print(f'Error occurred: {e}')
            print('Reconnecting in 5 seconds...')
            await asyncio.sleep(5)

asyncio.run(main())

Key points

1

Set up streaming session

Use Topic.dm_update() to subscribe to DM updates for specific conversations
2

Filter own messages

Always check the sender’s user ID to avoid replying to your own messages
3

Use cookie authentication

Save and load cookies to maintain sessions without repeated logins
4

Implement rate limiting

Track message counts and add delays to respect Twitter’s rate limits
5

Add error handling

Wrap streaming loops in try-catch blocks to handle disconnections gracefully

Build docs developers (and LLMs) love