Overview
The TikTokLive client provides three methods for establishing connections to livestreams, each suited for different use cases:
run() - Thread-blocking connection (simplest, for standalone scripts)
connect() - Future-blocking connection (for async contexts)
start() - Non-blocking connection (for advanced use cases)
All three methods support the same connection options and event handling.
Connection Methods
run() - Thread-Blocking
The simplest way to connect. Blocks the current thread until the connection ends.
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent
client = TikTokLiveClient( unique_id = "@username" )
@client.on (CommentEvent)
async def on_comment ( event : CommentEvent):
print ( f " { event.user_info.nickname } : { event.comment } " )
if __name__ == "__main__" :
# Blocks here until disconnected
client.run()
print ( "Stream ended" )
Method Signature:
def run ( ** kwargs ) -> Task
Use Cases:
Simple standalone scripts
Single-purpose bots
Command-line tools
When you want the main thread to wait
connect() - Future-Blocking
Blocks the current async context but allows other coroutines to run. Use in async functions.
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent
client = TikTokLiveClient( unique_id = "@username" )
@client.on (ConnectEvent)
async def on_connect ( event : ConnectEvent):
print ( f "Connected to @ { event.unique_id } " )
async def main ():
# Other async work can happen here
print ( "Starting connection..." )
# Blocks this coroutine until disconnected
await client.connect()
print ( "Connection ended" )
if __name__ == "__main__" :
asyncio.run(main())
Method Signature:
async def connect (
callback : Optional[
Union[
Callable[[ None ], None ],
Callable[[ None ], Coroutine[ None , None , None ]],
Coroutine[ None , None , None ],
]
] = None ,
** kwargs
) -> Task
callback
Callable | Coroutine
default: "None"
Optional function or coroutine to run immediately after connection is established.
Use Cases:
Async applications
Web servers (FastAPI, aiohttp)
When you need async/await control
Part of a larger async workflow
start() - Non-Blocking
Starts the connection without blocking. Returns a Task that you manage.
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent
client = TikTokLiveClient( unique_id = "@username" )
@client.on (CommentEvent)
async def on_comment ( event : CommentEvent):
print ( f " { event.user_info.nickname } : { event.comment } " )
async def main ():
# Start connection in background
task = await client.start()
# Do other work while connected
while client.connected:
print ( "Doing other work..." )
await asyncio.sleep( 10 )
# Wait for task to complete
await task
if __name__ == "__main__" :
asyncio.run(main())
Method Signature:
async def start (
* ,
process_connect_events : bool = True ,
compress_ws_events : bool = True ,
fetch_room_info : bool = False ,
fetch_gift_info : bool = False ,
fetch_live_check : bool = True ,
room_id : Optional[ int ] = None ,
) -> Task
Use Cases:
Multi-client applications
When you need fine-grained control
Monitoring multiple streams
Background services
Connection Options
All three connection methods accept the same keyword arguments:
Basic Options
Whether to process the initial batch of events sent when joining the room. # Skip initial events (join messages, etc.)
client.run( process_connect_events = False )
Whether to use gzip compression for WebSocket messages. Recommended to keep enabled. client.run( compress_ws_events = True )
Whether to verify the user is live before connecting. Almost always keep this enabled. # Connect even if user might not be live (not recommended)
client.run( fetch_live_check = False )
Fetch detailed room information on connection. Accessible via client.room_info. client.run( fetch_room_info = True )
# Later access via client.room_info
Fetch gift information on connection. Accessible via client.gift_info. client.run( fetch_gift_info = True )
# Later access via client.gift_info
Advanced Options
room_id
Optional[int]
default: "None"
Override room ID to connect directly without scraping. Useful for scaling. # Connect with known room ID (bypasses HTML scraping)
client.run( room_id = 7318817831388130078 )
When providing a room_id directly, the client skips HTML scraping. This is faster and avoids potential rate limits, but you need to obtain the room ID through other means first.
Connection Examples
from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent
client = TikTokLiveClient( unique_id = "@username" )
@client.on (ConnectEvent)
async def on_connect ( event : ConnectEvent):
print ( f "Connected to { event.unique_id } " )
print ( f "Room info: { client.room_info } " )
print ( f "Gift info: { client.gift_info } " )
if __name__ == "__main__" :
client.run(
fetch_room_info = True ,
fetch_gift_info = True ,
)
Using Callback with connect()
import asyncio
from TikTokLive import TikTokLiveClient
client = TikTokLiveClient( unique_id = "@username" )
async def on_connected ():
"""Called immediately after connection is established"""
print ( "Connection successful!" )
print ( f "Connected to room: { client.room_id } " )
async def main ():
await client.connect( callback = on_connected)
if __name__ == "__main__" :
asyncio.run(main())
Multiple Clients with start()
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent
async def monitor_stream ( username : str ):
"""Monitor a single stream"""
client = TikTokLiveClient( unique_id = username)
@client.on (CommentEvent)
async def on_comment ( event : CommentEvent):
print ( f "[ { username } ] { event.user_info.nickname } : { event.comment } " )
try :
task = await client.start()
await task
except Exception as e:
print ( f "Error monitoring { username } : { e } " )
async def main ():
# Monitor multiple streams simultaneously
await asyncio.gather(
monitor_stream( "user1" ),
monitor_stream( "user2" ),
monitor_stream( "user3" ),
)
if __name__ == "__main__" :
asyncio.run(main())
Disconnection
Graceful Disconnect
Manually disconnect from a stream:
async def main ():
client = TikTokLiveClient( unique_id = "@username" )
task = await client.start()
# Run for 60 seconds
await asyncio.sleep( 60 )
# Disconnect gracefully
await client.disconnect()
print ( "Disconnected" )
Method Signature:
async def disconnect ( close_client : bool = False ) -> None
Whether to also close the HTTP client. Set to True if you don’t plan to reconnect.
Cleanup Resources
Close the HTTP client when you’re done:
async def main ():
client = TikTokLiveClient( unique_id = "@username" )
try :
await client.connect()
finally :
# Clean up resources
await client.close()
Method Signature:
async def close () -> None
Call close() when you’re completely done with the client and won’t reconnect. This releases HTTP client resources.
Connection Lifecycle Events
Monitor connection state changes using lifecycle events:
from TikTokLive import TikTokLiveClient
from TikTokLive.events import (
ConnectEvent,
DisconnectEvent,
LiveEndEvent,
LivePauseEvent,
LiveUnpauseEvent,
)
client = TikTokLiveClient( unique_id = "@username" )
@client.on (ConnectEvent)
async def on_connect ( event : ConnectEvent):
print ( f "✅ Connected to @ { event.unique_id } " )
print ( f "📍 Room ID: { event.room_id } " )
@client.on (DisconnectEvent)
async def on_disconnect ( event : DisconnectEvent):
print ( "❌ Disconnected from stream" )
@client.on (LiveEndEvent)
async def on_live_end ( event : LiveEndEvent):
print ( "🛑 Stream has ended" )
# Client automatically disconnects
@client.on (LivePauseEvent)
async def on_pause ( event : LivePauseEvent):
print ( "⏸️ Stream paused" )
@client.on (LiveUnpauseEvent)
async def on_unpause ( event : LiveUnpauseEvent):
print ( "▶️ Stream resumed" )
if __name__ == "__main__" :
client.run()
When a LiveEndEvent occurs, the client automatically calls disconnect() for you.
Error Handling
Common Exceptions
from TikTokLive import TikTokLiveClient
from TikTokLive.client.errors import (
AlreadyConnectedError,
UserOfflineError,
UserNotFoundError,
)
import asyncio
async def safe_connect ( username : str ):
client = TikTokLiveClient( unique_id = username)
try :
await client.connect()
except AlreadyConnectedError:
print ( "Client is already connected!" )
except UserOfflineError:
print ( f "@ { username } is not currently live" )
except UserNotFoundError:
print ( f "User @ { username } not found" )
except Exception as e:
print ( f "Unexpected error: { e } " )
finally :
if client.connected:
await client.disconnect( close_client = True )
if __name__ == "__main__" :
asyncio.run(safe_connect( "username" ))
Reconnection Logic
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.client.errors import UserOfflineError
async def connect_with_retry ( username : str , max_retries : int = 5 ):
client = TikTokLiveClient( unique_id = username)
for attempt in range (max_retries):
try :
print ( f "Connection attempt { attempt + 1 } ..." )
await client.connect()
break # Success
except UserOfflineError:
print ( f "@ { username } is offline. Retrying in 30 seconds..." )
await asyncio.sleep( 30 )
except Exception as e:
print ( f "Error: { e } . Retrying in 10 seconds..." )
await asyncio.sleep( 10 )
else :
print ( f "Failed to connect after { max_retries } attempts" )
# Cleanup
if client.connected:
await client.disconnect( close_client = True )
if __name__ == "__main__" :
asyncio.run(connect_with_retry( "username" ))
Best Practices
Use run() for Scripts For simple standalone scripts, use run() - it’s the simplest approach.
Always Handle Errors Wrap connections in try-except blocks to handle offline users and connection failures.
Clean Up Resources Call close() when completely done to release HTTP client resources.
Monitor Connection State Use client.connected to check connection status before performing actions.
Checking Connection State
if client.connected:
print ( "Client is connected" )
print ( f "Room ID: { client.room_id } " )
else :
print ( "Client is not connected" )
Automatic Reconnection
from TikTokLive import TikTokLiveClient
from TikTokLive.events import DisconnectEvent
import asyncio
client = TikTokLiveClient( unique_id = "@username" )
@client.on (DisconnectEvent)
async def on_disconnect ( event : DisconnectEvent):
print ( "Disconnected. Attempting to reconnect in 10 seconds..." )
await asyncio.sleep( 10 )
try :
await client.connect()
except Exception as e:
print ( f "Reconnection failed: { e } " )
if __name__ == "__main__" :
client.run()
Quick Reference
Method Blocking Use Case run()Thread Simple scripts, CLI tools connect()Async context Async applications, part of async workflow start()Non-blocking Multiple clients, background tasks
Option Default Description process_connect_eventsTrueProcess initial room events compress_ws_eventsTrueEnable gzip compression fetch_live_checkTrueVerify user is live fetch_room_infoFalseFetch room metadata fetch_gift_infoFalseFetch gift metadata room_idNoneDirect room ID (skip scraping)
Complete Example
from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent, DisconnectEvent, LiveEndEvent
from TikTokLive.client.errors import (
AlreadyConnectedError,
UserOfflineError,
UserNotFoundError,
)
import asyncio
client = TikTokLiveClient( unique_id = "@username" )
@client.on (ConnectEvent)
async def on_connect ( event : ConnectEvent):
print ( f "✅ Connected to @ { event.unique_id } " )
print ( f "📍 Room: { event.room_id } " )
if client.room_info:
print ( f "👥 Viewers: { client.room_info.get( 'user_count' , 'N/A' ) } " )
@client.on (DisconnectEvent)
async def on_disconnect ( event : DisconnectEvent):
print ( "❌ Disconnected" )
@client.on (LiveEndEvent)
async def on_live_end ( event : LiveEndEvent):
print ( "🛑 Stream ended" )
async def main ():
try :
print ( "Starting connection..." )
await client.connect(
fetch_room_info = True ,
fetch_gift_info = True ,
)
except UserOfflineError:
print ( "User is not live" )
except UserNotFoundError:
print ( "User not found" )
except AlreadyConnectedError:
print ( "Already connected" )
except Exception as e:
print ( f "Error: { e } " )
finally :
if client.connected:
await client.disconnect( close_client = True )
if __name__ == "__main__" :
asyncio.run(main())
Next Steps
Event Handling Learn how to handle events
Events Overview Explore all available events