Skip to main content

Overview

Streaming allows you to receive and display tokens as they are generated, providing a more responsive user experience. This is particularly useful for chat applications and interactive interfaces where users expect immediate feedback.

Why Use Streaming?

Better UX

Users see responses immediately, not after full generation

Perceived Speed

Feels faster even if total time is the same

Early Termination

Stop generation early if needed

Real-time Feedback

Perfect for chatbots and assistants

Basic Streaming

Qwen provides the chat_stream method for streaming responses:
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

tokenizer = AutoTokenizer.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen-7B-Chat",
    device_map="auto",
    trust_remote_code=True
).eval()

config = GenerationConfig.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)

# Stream responses
query = "请给我讲一个有趣的故事"
for response in model.chat_stream(tokenizer, query, history=None, generation_config=config):
    print(f"\rQwen: {response}", end="", flush=True)

print()  # New line after completion

CLI Demo Implementation

Here’s the complete streaming implementation from the official CLI demo:
cli_demo.py
import argparse
import platform
import shutil
from copy import deepcopy

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
from transformers.trainer_utils import set_seed

DEFAULT_CKPT_PATH = 'Qwen/Qwen-7B-Chat'

def _load_model_tokenizer(args):
    tokenizer = AutoTokenizer.from_pretrained(
        args.checkpoint_path, 
        trust_remote_code=True, 
        resume_download=True,
    )

    if args.cpu_only:
        device_map = "cpu"
    else:
        device_map = "auto"

    model = AutoModelForCausalLM.from_pretrained(
        args.checkpoint_path,
        device_map=device_map,
        trust_remote_code=True,
        resume_download=True,
    ).eval()

    config = GenerationConfig.from_pretrained(
        args.checkpoint_path, 
        trust_remote_code=True, 
        resume_download=True,
    )

    return model, tokenizer, config

def _clear_screen():
    if platform.system() == "Windows":
        os.system("cls")
    else:
        os.system("clear")

def main():
    parser = argparse.ArgumentParser(
        description='QWen-Chat command-line interactive chat demo.')
    parser.add_argument(
        "-c", "--checkpoint-path", 
        type=str, 
        default=DEFAULT_CKPT_PATH,
        help="Checkpoint name or path, default to %(default)r"
    )
    parser.add_argument(
        "-s", "--seed", 
        type=int, 
        default=1234, 
        help="Random seed"
    )
    parser.add_argument(
        "--cpu-only", 
        action="store_true", 
        help="Run demo with CPU only"
    )
    args = parser.parse_args()

    history, response = [], ''

    model, tokenizer, config = _load_model_tokenizer(args)
    orig_gen_config = deepcopy(model.generation_config)

    _clear_screen()
    print("Welcome to Qwen-Chat! Type your message to start chatting.")

    seed = args.seed

    while True:
        query = input("\nUser> ").strip()
        
        if not query:
            print('[ERROR] Query is empty')
            continue
            
        if query == ":quit":
            break

        # Run streaming chat
        set_seed(seed)
        try:
            for response in model.chat_stream(
                tokenizer, 
                query, 
                history=history, 
                generation_config=config
            ):
                _clear_screen()
                print(f"\nUser: {query}")
                print(f"\nQwen-Chat: {response}")
        except KeyboardInterrupt:
            print('\n[WARNING] Generation interrupted')
            continue

        history.append((query, response))

if __name__ == "__main__":
    main()
The CLI demo uses chat_stream to provide a smooth, real-time chat experience. See cli_demo.py:198 in the source code.

Web Demo with Gradio

Here’s how streaming works in a web interface:
web_demo.py
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

def _load_model_tokenizer(checkpoint_path, cpu_only=False):
    tokenizer = AutoTokenizer.from_pretrained(
        checkpoint_path, 
        trust_remote_code=True, 
        resume_download=True,
    )

    device_map = "cpu" if cpu_only else "auto"

    model = AutoModelForCausalLM.from_pretrained(
        checkpoint_path,
        device_map=device_map,
        trust_remote_code=True,
        resume_download=True,
    ).eval()

    config = GenerationConfig.from_pretrained(
        checkpoint_path, 
        trust_remote_code=True, 
        resume_download=True,
    )

    return model, tokenizer, config

def predict(query, chatbot, task_history, model, tokenizer, config):
    """Stream responses in Gradio interface."""
    print(f"User: {query}")
    chatbot.append((query, ""))
    full_response = ""

    # Stream tokens
    for response in model.chat_stream(
        tokenizer, 
        query, 
        history=task_history, 
        generation_config=config
    ):
        chatbot[-1] = (query, response)
        yield chatbot
        full_response = response

    print(f"Qwen-Chat: {full_response}")
    task_history.append((query, full_response))

def launch_demo(checkpoint_path="Qwen/Qwen-7B-Chat"):
    model, tokenizer, config = _load_model_tokenizer(checkpoint_path)
    
    with gr.Blocks() as demo:
        gr.Markdown("# Qwen-Chat Streaming Demo")
        
        chatbot = gr.Chatbot(label='Qwen-Chat')
        query = gr.Textbox(lines=2, label='Input')
        task_history = gr.State([])
        
        submit_btn = gr.Button("🚀 Submit")
        
        submit_btn.click(
            predict, 
            [query, chatbot, task_history, gr.State(model), gr.State(tokenizer), gr.State(config)], 
            [chatbot], 
            show_progress=True
        )
    
    demo.queue().launch()

if __name__ == '__main__':
    launch_demo()
The web demo uses streaming with yield to update the Gradio interface progressively. See web_demo.py:124 in the source.

Streaming with Custom Display

Create custom display logic for different use cases:
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

tokenizer = AutoTokenizer.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen-7B-Chat",
    device_map="auto",
    trust_remote_code=True
).eval()

config = GenerationConfig.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)

query = "Tell me a story"
print("Assistant: ", end="", flush=True)

for response in model.chat_stream(tokenizer, query, history=None, generation_config=config):
    print(f"\rAssistant: {response}", end="", flush=True)

print()  # Newline

Handling Interruptions

Gracefully handle user interruptions:
import signal
import sys
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

# Global flag for interruption
interrupted = False

def signal_handler(sig, frame):
    global interrupted
    interrupted = True
    print("\n[Interrupting generation...]")

signal.signal(signal.SIGINT, signal_handler)

tokenizer = AutoTokenizer.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen-7B-Chat",
    device_map="auto",
    trust_remote_code=True
).eval()

config = GenerationConfig.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)

query = "Write a very long story"
last_response = ""

try:
    for response in model.chat_stream(
        tokenizer, query, history=None, generation_config=config
    ):
        if interrupted:
            print("\n[Generation stopped by user]")
            break
        
        print(f"\rQwen: {response}", end="", flush=True)
        last_response = response
except KeyboardInterrupt:
    print("\n[Generation interrupted]")

print(f"\n\nLast response: {last_response[:100]}...")

Multi-turn Streaming Chat

Maintain conversation history with streaming:
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

tokenizer = AutoTokenizer.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen-7B-Chat",
    device_map="auto",
    trust_remote_code=True
).eval()

config = GenerationConfig.from_pretrained(
    "Qwen/Qwen-7B-Chat", 
    trust_remote_code=True
)

history = []

while True:
    query = input("\nYou: ").strip()
    
    if not query:
        continue
    if query.lower() in ['quit', 'exit']:
        break
    
    print("Assistant: ", end="", flush=True)
    
    response = ""
    try:
        for response in model.chat_stream(
            tokenizer, 
            query, 
            history=history, 
            generation_config=config
        ):
            print(f"\rAssistant: {response}", end="", flush=True)
    except KeyboardInterrupt:
        print("\n[Interrupted]")
        continue
    
    print()  # Newline
    history.append((query, response))
    
    # Optional: limit history length
    if len(history) > 10:
        history = history[-10:]

print("Goodbye!")

Advanced: Streaming with Callbacks

Implement custom callbacks for token generation:
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
from typing import Callable

class StreamingCallback:
    def __init__(self, on_token: Callable[[str], None] = None):
        self.on_token = on_token or (lambda x: None)
        self.full_text = ""
    
    def __call__(self, text: str):
        # Calculate new tokens
        new_text = text[len(self.full_text):]
        if new_text:
            self.on_token(new_text)
        self.full_text = text

def main():
    tokenizer = AutoTokenizer.from_pretrained(
        "Qwen/Qwen-7B-Chat", 
        trust_remote_code=True
    )
    model = AutoModelForCausalLM.from_pretrained(
        "Qwen/Qwen-7B-Chat",
        device_map="auto",
        trust_remote_code=True
    ).eval()
    
    config = GenerationConfig.from_pretrained(
        "Qwen/Qwen-7B-Chat", 
        trust_remote_code=True
    )
    
    # Define callback
    def on_new_token(token: str):
        print(token, end="", flush=True)
    
    callback = StreamingCallback(on_token=on_new_token)
    
    query = "Tell me about artificial intelligence"
    print("Assistant: ", end="", flush=True)
    
    for response in model.chat_stream(
        tokenizer, query, history=None, generation_config=config
    ):
        callback(response)
    
    print()

if __name__ == "__main__":
    main()

Performance Considerations

Streaming reduces time-to-first-token but doesn’t necessarily increase overall throughput:
  • Time to First Token: Much faster with streaming
  • Total Generation Time: Similar to non-streaming
  • User Experience: Significantly better with streaming
For remote APIs, streaming may have overhead:
# Local inference: streaming is always beneficial
# Remote API: consider trade-offs

# For remote, batch smaller requests when possible
config.max_new_tokens = 512  # Reasonable limit
Streaming doesn’t reduce memory usage:
# Memory usage is the same for streaming and non-streaming
# To reduce memory, use quantization or smaller models

model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen-7B-Chat-Int4",  # Quantized version
    device_map="auto",
    trust_remote_code=True
).eval()

Next Steps

Multi-GPU Inference

Scale streaming across multiple GPUs

Batch Inference

Process multiple requests efficiently

Web Deployment

Build web interfaces with streaming

API Server

Create streaming API endpoints

Build docs developers (and LLMs) love