Overview
Twikit’s streaming API allows you to monitor direct messages in real-time and automatically respond to incoming messages. This is useful for creating chatbots, automated customer support, or auto-reply systems.
Auto-reply systems should be used responsibly. Make sure to comply with Twitter’s automation rules and avoid spamming users.
How it works
The DM auto-reply system uses Twikit’s streaming functionality to:
- Establish a streaming session for DM updates
- Monitor incoming messages from a specific user or conversation
- Automatically send a reply when new messages are detected
- Filter out messages sent by your own account to avoid loops
Basic auto-reply setup
Here’s a simple example that responds to all DMs from a specific user:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
# User ID to monitor
partner_user_id = '1234567890'
your_user_id = await client.user_id()
# Create conversation ID
conversation_id = f'{your_user_id}-{partner_user_id}'
# Set up streaming topic
topics = {Topic.dm_update(conversation_id)}
streaming_session = await client.get_streaming_session(topics)
# Listen for messages
async for topic, payload in streaming_session:
if payload.dm_update:
# Ignore your own messages
if payload.dm_update.user_id == your_user_id:
continue
# Send auto-reply
await client.send_dm(
payload.dm_update.user_id,
'Thank you for your message!'
)
asyncio.run(main())
Complete working example
This is the actual code from Twikit’s examples for DM auto-reply:
import asyncio
import os
from twikit import Client
from twikit.streaming import Topic
AUTH_INFO_1 = ''
AUTH_INFO_2 = ''
PASSWORD = ''
client = Client()
async def main():
# Login with cookies if available
if os.path.exists('cookies.json'):
client.load_cookies('cookies.json')
else:
await client.login(
auth_info_1=AUTH_INFO_1,
auth_info_2=AUTH_INFO_2,
password=PASSWORD
)
client.save_cookies('cookies.json')
# User ID of the DM partner to stream
user_id = '1752362966203469824'
reply_message = 'Hello'
# Set up streaming topics
topics = {
Topic.dm_update(f'{await client.user_id()}-{user_id}')
}
streaming_session = await client.get_streaming_session(topics)
# Listen for DM updates
async for topic, payload in streaming_session:
if payload.dm_update:
# Ignore messages sent by yourself
if await client.user_id() == payload.dm_update.user_id:
continue
# Send auto-reply
await client.send_dm(payload.dm_update.user_id, reply_message)
asyncio.run(main())
Understanding the streaming loop
Let’s break down how the streaming session works:
Setting up topics
# Get your user ID
your_user_id = await client.user_id()
# Partner user ID (who you're chatting with)
partner_user_id = '1234567890'
# Create conversation ID
conversation_id = f'{your_user_id}-{partner_user_id}'
# Subscribe to DM updates
topics = {Topic.dm_update(conversation_id)}
Creating streaming session
# Get streaming session
streaming_session = await client.get_streaming_session(topics)
# Iterate over events
async for topic, payload in streaming_session:
# Handle events here
pass
Processing DM updates
async for topic, payload in streaming_session:
if payload.dm_update:
# Check who sent the message
sender_id = payload.dm_update.user_id
conversation_id = payload.dm_update.conversation_id
# Ignore your own messages
if sender_id == await client.user_id():
continue
# Send reply
await client.send_dm(sender_id, 'Your reply here')
Always filter out your own messages by checking the user_id to prevent infinite reply loops.
Cookie-based authentication
The example uses cookie-based authentication to avoid repeated logins:
import os
from twikit import Client
client = Client()
async def main():
# Load cookies if they exist
if os.path.exists('cookies.json'):
client.load_cookies('cookies.json')
else:
# Login and save cookies
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
client.save_cookies('cookies.json')
Using cookies allows your bot to maintain sessions without re-authenticating every time it starts, which is more efficient and reduces the risk of triggering security checks.
Customizing auto-replies
You can create more sophisticated auto-reply logic:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
partner_user_id = '1234567890'
your_user_id = await client.user_id()
conversation_id = f'{your_user_id}-{partner_user_id}'
topics = {Topic.dm_update(conversation_id)}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.dm_update:
if payload.dm_update.user_id == your_user_id:
continue
# Get the actual message content
user = await client.get_user_by_id(payload.dm_update.user_id)
messages = await user.get_dm_history()
latest_message = messages[0]
# Custom reply logic
if 'hello' in latest_message.text.lower():
reply = 'Hi there! How can I help you?'
elif 'help' in latest_message.text.lower():
reply = 'I am here to assist you!'
else:
reply = 'Thanks for your message!'
await client.send_dm(payload.dm_update.user_id, reply)
asyncio.run(main())
Monitoring multiple conversations
You can monitor DMs from multiple users simultaneously:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
your_user_id = await client.user_id()
# Multiple users to monitor
partner_user_ids = ['1234567890', '0987654321', '1111111111']
# Create topics for all conversations
topics = {
Topic.dm_update(f'{your_user_id}-{user_id}')
for user_id in partner_user_ids
}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.dm_update:
if payload.dm_update.user_id == your_user_id:
continue
await client.send_dm(
payload.dm_update.user_id,
'Thank you for reaching out!'
)
asyncio.run(main())
Rate limit considerations
When implementing auto-reply systems, be aware of rate limits:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
message_count = 0
max_messages_per_hour = 50
async def main():
global message_count
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
partner_user_id = '1234567890'
your_user_id = await client.user_id()
conversation_id = f'{your_user_id}-{partner_user_id}'
topics = {Topic.dm_update(conversation_id)}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.dm_update:
if payload.dm_update.user_id == your_user_id:
continue
# Check rate limit
if message_count >= max_messages_per_hour:
print('Rate limit reached, waiting...')
await asyncio.sleep(3600) # Wait 1 hour
message_count = 0
await client.send_dm(
payload.dm_update.user_id,
'Thank you for your message!'
)
message_count += 1
asyncio.run(main())
Sending too many automated messages can result in account restrictions. Implement proper rate limiting and avoid spamming users.
Error handling and reconnection
Add error handling to keep your bot running:
import asyncio
from twikit import Client
from twikit.streaming import Topic
client = Client()
async def main():
await client.login(
auth_info_1='USERNAME',
auth_info_2='EMAIL',
password='PASSWORD'
)
partner_user_id = '1234567890'
your_user_id = await client.user_id()
conversation_id = f'{your_user_id}-{partner_user_id}'
while True:
try:
topics = {Topic.dm_update(conversation_id)}
streaming_session = await client.get_streaming_session(topics)
async for topic, payload in streaming_session:
if payload.dm_update:
if payload.dm_update.user_id == your_user_id:
continue
await client.send_dm(
payload.dm_update.user_id,
'Thank you for your message!'
)
except Exception as e:
print(f'Error occurred: {e}')
print('Reconnecting in 5 seconds...')
await asyncio.sleep(5)
asyncio.run(main())
Key points
Set up streaming session
Use Topic.dm_update() to subscribe to DM updates for specific conversations
Filter own messages
Always check the sender’s user ID to avoid replying to your own messages
Use cookie authentication
Save and load cookies to maintain sessions without repeated logins
Implement rate limiting
Track message counts and add delays to respect Twitter’s rate limits
Add error handling
Wrap streaming loops in try-catch blocks to handle disconnections gracefully