Skip to main content

Overview

The TikTokLive client provides three methods for establishing connections to livestreams, each suited for different use cases:
  • run() - Thread-blocking connection (simplest, for standalone scripts)
  • connect() - Future-blocking connection (for async contexts)
  • start() - Non-blocking connection (for advanced use cases)
All three methods support the same connection options and event handling.

Connection Methods

run() - Thread-Blocking

The simplest way to connect. Blocks the current thread until the connection ends.
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent

client = TikTokLiveClient(unique_id="@username")

@client.on(CommentEvent)
async def on_comment(event: CommentEvent):
    print(f"{event.user_info.nickname}: {event.comment}")

if __name__ == "__main__":
    # Blocks here until disconnected
    client.run()
    print("Stream ended")
Method Signature:
def run(**kwargs) -> Task
Use Cases:
  • Simple standalone scripts
  • Single-purpose bots
  • Command-line tools
  • When you want the main thread to wait

connect() - Future-Blocking

Blocks the current async context but allows other coroutines to run. Use in async functions.
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent

client = TikTokLiveClient(unique_id="@username")

@client.on(ConnectEvent)
async def on_connect(event: ConnectEvent):
    print(f"Connected to @{event.unique_id}")

async def main():
    # Other async work can happen here
    print("Starting connection...")
    
    # Blocks this coroutine until disconnected
    await client.connect()
    
    print("Connection ended")

if __name__ == "__main__":
    asyncio.run(main())
Method Signature:
async def connect(
    callback: Optional[
        Union[
            Callable[[None], None],
            Callable[[None], Coroutine[None, None, None]],
            Coroutine[None, None, None],
        ]
    ] = None,
    **kwargs
) -> Task
callback
Callable | Coroutine
default:"None"
Optional function or coroutine to run immediately after connection is established.
Use Cases:
  • Async applications
  • Web servers (FastAPI, aiohttp)
  • When you need async/await control
  • Part of a larger async workflow

start() - Non-Blocking

Starts the connection without blocking. Returns a Task that you manage.
import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent

client = TikTokLiveClient(unique_id="@username")

@client.on(CommentEvent)
async def on_comment(event: CommentEvent):
    print(f"{event.user_info.nickname}: {event.comment}")

async def main():
    # Start connection in background
    task = await client.start()
    
    # Do other work while connected
    while client.connected:
        print("Doing other work...")
        await asyncio.sleep(10)
    
    # Wait for task to complete
    await task

if __name__ == "__main__":
    asyncio.run(main())
Method Signature:
async def start(
    *,
    process_connect_events: bool = True,
    compress_ws_events: bool = True,
    fetch_room_info: bool = False,
    fetch_gift_info: bool = False,
    fetch_live_check: bool = True,
    room_id: Optional[int] = None,
) -> Task
Use Cases:
  • Multi-client applications
  • When you need fine-grained control
  • Monitoring multiple streams
  • Background services

Connection Options

All three connection methods accept the same keyword arguments:

Basic Options

process_connect_events
bool
default:"True"
Whether to process the initial batch of events sent when joining the room.
# Skip initial events (join messages, etc.)
client.run(process_connect_events=False)
compress_ws_events
bool
default:"True"
Whether to use gzip compression for WebSocket messages. Recommended to keep enabled.
client.run(compress_ws_events=True)
fetch_live_check
bool
default:"True"
Whether to verify the user is live before connecting. Almost always keep this enabled.
# Connect even if user might not be live (not recommended)
client.run(fetch_live_check=False)

Optional Metadata

fetch_room_info
bool
default:"False"
Fetch detailed room information on connection. Accessible via client.room_info.
client.run(fetch_room_info=True)
# Later access via client.room_info
fetch_gift_info
bool
default:"False"
Fetch gift information on connection. Accessible via client.gift_info.
client.run(fetch_gift_info=True)
# Later access via client.gift_info

Advanced Options

room_id
Optional[int]
default:"None"
Override room ID to connect directly without scraping. Useful for scaling.
# Connect with known room ID (bypasses HTML scraping)
client.run(room_id=7318817831388130078)
When providing a room_id directly, the client skips HTML scraping. This is faster and avoids potential rate limits, but you need to obtain the room ID through other means first.

Connection Examples

Basic Connection with Metadata

from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent

client = TikTokLiveClient(unique_id="@username")

@client.on(ConnectEvent)
async def on_connect(event: ConnectEvent):
    print(f"Connected to {event.unique_id}")
    print(f"Room info: {client.room_info}")
    print(f"Gift info: {client.gift_info}")

if __name__ == "__main__":
    client.run(
        fetch_room_info=True,
        fetch_gift_info=True,
    )

Using Callback with connect()

import asyncio
from TikTokLive import TikTokLiveClient

client = TikTokLiveClient(unique_id="@username")

async def on_connected():
    """Called immediately after connection is established"""
    print("Connection successful!")
    print(f"Connected to room: {client.room_id}")

async def main():
    await client.connect(callback=on_connected)

if __name__ == "__main__":
    asyncio.run(main())

Multiple Clients with start()

import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.events import CommentEvent

async def monitor_stream(username: str):
    """Monitor a single stream"""
    client = TikTokLiveClient(unique_id=username)
    
    @client.on(CommentEvent)
    async def on_comment(event: CommentEvent):
        print(f"[{username}] {event.user_info.nickname}: {event.comment}")
    
    try:
        task = await client.start()
        await task
    except Exception as e:
        print(f"Error monitoring {username}: {e}")

async def main():
    # Monitor multiple streams simultaneously
    await asyncio.gather(
        monitor_stream("user1"),
        monitor_stream("user2"),
        monitor_stream("user3"),
    )

if __name__ == "__main__":
    asyncio.run(main())

Disconnection

Graceful Disconnect

Manually disconnect from a stream:
async def main():
    client = TikTokLiveClient(unique_id="@username")
    task = await client.start()
    
    # Run for 60 seconds
    await asyncio.sleep(60)
    
    # Disconnect gracefully
    await client.disconnect()
    print("Disconnected")
Method Signature:
async def disconnect(close_client: bool = False) -> None
close_client
bool
default:"False"
Whether to also close the HTTP client. Set to True if you don’t plan to reconnect.

Cleanup Resources

Close the HTTP client when you’re done:
async def main():
    client = TikTokLiveClient(unique_id="@username")
    
    try:
        await client.connect()
    finally:
        # Clean up resources
        await client.close()
Method Signature:
async def close() -> None
Call close() when you’re completely done with the client and won’t reconnect. This releases HTTP client resources.

Connection Lifecycle Events

Monitor connection state changes using lifecycle events:
from TikTokLive import TikTokLiveClient
from TikTokLive.events import (
    ConnectEvent,
    DisconnectEvent,
    LiveEndEvent,
    LivePauseEvent,
    LiveUnpauseEvent,
)

client = TikTokLiveClient(unique_id="@username")

@client.on(ConnectEvent)
async def on_connect(event: ConnectEvent):
    print(f"✅ Connected to @{event.unique_id}")
    print(f"📍 Room ID: {event.room_id}")

@client.on(DisconnectEvent)
async def on_disconnect(event: DisconnectEvent):
    print("❌ Disconnected from stream")

@client.on(LiveEndEvent)
async def on_live_end(event: LiveEndEvent):
    print("🛑 Stream has ended")
    # Client automatically disconnects

@client.on(LivePauseEvent)
async def on_pause(event: LivePauseEvent):
    print("⏸️ Stream paused")

@client.on(LiveUnpauseEvent)
async def on_unpause(event: LiveUnpauseEvent):
    print("▶️ Stream resumed")

if __name__ == "__main__":
    client.run()
When a LiveEndEvent occurs, the client automatically calls disconnect() for you.

Error Handling

Common Exceptions

from TikTokLive import TikTokLiveClient
from TikTokLive.client.errors import (
    AlreadyConnectedError,
    UserOfflineError,
    UserNotFoundError,
)
import asyncio

async def safe_connect(username: str):
    client = TikTokLiveClient(unique_id=username)
    
    try:
        await client.connect()
    except AlreadyConnectedError:
        print("Client is already connected!")
    except UserOfflineError:
        print(f"@{username} is not currently live")
    except UserNotFoundError:
        print(f"User @{username} not found")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        if client.connected:
            await client.disconnect(close_client=True)

if __name__ == "__main__":
    asyncio.run(safe_connect("username"))

Reconnection Logic

import asyncio
from TikTokLive import TikTokLiveClient
from TikTokLive.client.errors import UserOfflineError

async def connect_with_retry(username: str, max_retries: int = 5):
    client = TikTokLiveClient(unique_id=username)
    
    for attempt in range(max_retries):
        try:
            print(f"Connection attempt {attempt + 1}...")
            await client.connect()
            break  # Success
        except UserOfflineError:
            print(f"@{username} is offline. Retrying in 30 seconds...")
            await asyncio.sleep(30)
        except Exception as e:
            print(f"Error: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)
    else:
        print(f"Failed to connect after {max_retries} attempts")
    
    # Cleanup
    if client.connected:
        await client.disconnect(close_client=True)

if __name__ == "__main__":
    asyncio.run(connect_with_retry("username"))

Best Practices

Use run() for Scripts

For simple standalone scripts, use run() - it’s the simplest approach.

Always Handle Errors

Wrap connections in try-except blocks to handle offline users and connection failures.

Clean Up Resources

Call close() when completely done to release HTTP client resources.

Monitor Connection State

Use client.connected to check connection status before performing actions.

Checking Connection State

if client.connected:
    print("Client is connected")
    print(f"Room ID: {client.room_id}")
else:
    print("Client is not connected")

Automatic Reconnection

from TikTokLive import TikTokLiveClient
from TikTokLive.events import DisconnectEvent
import asyncio

client = TikTokLiveClient(unique_id="@username")

@client.on(DisconnectEvent)
async def on_disconnect(event: DisconnectEvent):
    print("Disconnected. Attempting to reconnect in 10 seconds...")
    await asyncio.sleep(10)
    
    try:
        await client.connect()
    except Exception as e:
        print(f"Reconnection failed: {e}")

if __name__ == "__main__":
    client.run()

Quick Reference

MethodBlockingUse Case
run()ThreadSimple scripts, CLI tools
connect()Async contextAsync applications, part of async workflow
start()Non-blockingMultiple clients, background tasks
OptionDefaultDescription
process_connect_eventsTrueProcess initial room events
compress_ws_eventsTrueEnable gzip compression
fetch_live_checkTrueVerify user is live
fetch_room_infoFalseFetch room metadata
fetch_gift_infoFalseFetch gift metadata
room_idNoneDirect room ID (skip scraping)

Complete Example

from TikTokLive import TikTokLiveClient
from TikTokLive.events import ConnectEvent, DisconnectEvent, LiveEndEvent
from TikTokLive.client.errors import (
    AlreadyConnectedError,
    UserOfflineError,
    UserNotFoundError,
)
import asyncio

client = TikTokLiveClient(unique_id="@username")

@client.on(ConnectEvent)
async def on_connect(event: ConnectEvent):
    print(f"✅ Connected to @{event.unique_id}")
    print(f"📍 Room: {event.room_id}")
    if client.room_info:
        print(f"👥 Viewers: {client.room_info.get('user_count', 'N/A')}")

@client.on(DisconnectEvent)
async def on_disconnect(event: DisconnectEvent):
    print("❌ Disconnected")

@client.on(LiveEndEvent)
async def on_live_end(event: LiveEndEvent):
    print("🛑 Stream ended")

async def main():
    try:
        print("Starting connection...")
        await client.connect(
            fetch_room_info=True,
            fetch_gift_info=True,
        )
    except UserOfflineError:
        print("User is not live")
    except UserNotFoundError:
        print("User not found")
    except AlreadyConnectedError:
        print("Already connected")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if client.connected:
            await client.disconnect(close_client=True)

if __name__ == "__main__":
    asyncio.run(main())

Next Steps

Event Handling

Learn how to handle events

Events Overview

Explore all available events

Build docs developers (and LLMs) love