Overview
Streaming allows you to receive and display tokens as they are generated, providing a more responsive user experience. This is particularly useful for chat applications and interactive interfaces where users expect immediate feedback.Why Use Streaming?
Better UX
Users see responses immediately, not after full generation
Perceived Speed
Feels faster even if total time is the same
Early Termination
Stop generation early if needed
Real-time Feedback
Perfect for chatbots and assistants
Basic Streaming
Qwen provides thechat_stream method for streaming responses:
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
# Stream responses
query = "请给我讲一个有趣的故事"
for response in model.chat_stream(tokenizer, query, history=None, generation_config=config):
print(f"\rQwen: {response}", end="", flush=True)
print() # New line after completion
CLI Demo Implementation
Here’s the complete streaming implementation from the official CLI demo:cli_demo.py
import argparse
import platform
import shutil
from copy import deepcopy
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
from transformers.trainer_utils import set_seed
DEFAULT_CKPT_PATH = 'Qwen/Qwen-7B-Chat'
def _load_model_tokenizer(args):
tokenizer = AutoTokenizer.from_pretrained(
args.checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
if args.cpu_only:
device_map = "cpu"
else:
device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(
args.checkpoint_path,
device_map=device_map,
trust_remote_code=True,
resume_download=True,
).eval()
config = GenerationConfig.from_pretrained(
args.checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
return model, tokenizer, config
def _clear_screen():
if platform.system() == "Windows":
os.system("cls")
else:
os.system("clear")
def main():
parser = argparse.ArgumentParser(
description='QWen-Chat command-line interactive chat demo.')
parser.add_argument(
"-c", "--checkpoint-path",
type=str,
default=DEFAULT_CKPT_PATH,
help="Checkpoint name or path, default to %(default)r"
)
parser.add_argument(
"-s", "--seed",
type=int,
default=1234,
help="Random seed"
)
parser.add_argument(
"--cpu-only",
action="store_true",
help="Run demo with CPU only"
)
args = parser.parse_args()
history, response = [], ''
model, tokenizer, config = _load_model_tokenizer(args)
orig_gen_config = deepcopy(model.generation_config)
_clear_screen()
print("Welcome to Qwen-Chat! Type your message to start chatting.")
seed = args.seed
while True:
query = input("\nUser> ").strip()
if not query:
print('[ERROR] Query is empty')
continue
if query == ":quit":
break
# Run streaming chat
set_seed(seed)
try:
for response in model.chat_stream(
tokenizer,
query,
history=history,
generation_config=config
):
_clear_screen()
print(f"\nUser: {query}")
print(f"\nQwen-Chat: {response}")
except KeyboardInterrupt:
print('\n[WARNING] Generation interrupted')
continue
history.append((query, response))
if __name__ == "__main__":
main()
The CLI demo uses
chat_stream to provide a smooth, real-time chat experience. See cli_demo.py:198 in the source code.Web Demo with Gradio
Here’s how streaming works in a web interface:web_demo.py
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
def _load_model_tokenizer(checkpoint_path, cpu_only=False):
tokenizer = AutoTokenizer.from_pretrained(
checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
device_map = "cpu" if cpu_only else "auto"
model = AutoModelForCausalLM.from_pretrained(
checkpoint_path,
device_map=device_map,
trust_remote_code=True,
resume_download=True,
).eval()
config = GenerationConfig.from_pretrained(
checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
return model, tokenizer, config
def predict(query, chatbot, task_history, model, tokenizer, config):
"""Stream responses in Gradio interface."""
print(f"User: {query}")
chatbot.append((query, ""))
full_response = ""
# Stream tokens
for response in model.chat_stream(
tokenizer,
query,
history=task_history,
generation_config=config
):
chatbot[-1] = (query, response)
yield chatbot
full_response = response
print(f"Qwen-Chat: {full_response}")
task_history.append((query, full_response))
def launch_demo(checkpoint_path="Qwen/Qwen-7B-Chat"):
model, tokenizer, config = _load_model_tokenizer(checkpoint_path)
with gr.Blocks() as demo:
gr.Markdown("# Qwen-Chat Streaming Demo")
chatbot = gr.Chatbot(label='Qwen-Chat')
query = gr.Textbox(lines=2, label='Input')
task_history = gr.State([])
submit_btn = gr.Button("🚀 Submit")
submit_btn.click(
predict,
[query, chatbot, task_history, gr.State(model), gr.State(tokenizer), gr.State(config)],
[chatbot],
show_progress=True
)
demo.queue().launch()
if __name__ == '__main__':
launch_demo()
The web demo uses streaming with
yield to update the Gradio interface progressively. See web_demo.py:124 in the source.Streaming with Custom Display
Create custom display logic for different use cases:- Simple Terminal
- With Timestamps
- Save to File
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
query = "Tell me a story"
print("Assistant: ", end="", flush=True)
for response in model.chat_stream(tokenizer, query, history=None, generation_config=config):
print(f"\rAssistant: {response}", end="", flush=True)
print() # Newline
import time
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
query = "Explain quantum computing"
start_time = time.time()
token_count = 0
print("Assistant: ", end="", flush=True)
for response in model.chat_stream(tokenizer, query, history=None, generation_config=config):
print(f"\rAssistant: {response}", end="", flush=True)
token_count = len(tokenizer.encode(response))
elapsed = time.time() - start_time
print(f"\n\n[Generated {token_count} tokens in {elapsed:.2f}s ({token_count/elapsed:.1f} tokens/s)]")
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
query = "Write a poem about AI"
with open("output.txt", "w", encoding="utf-8") as f:
f.write(f"Query: {query}\n\n")
f.write("Response: ")
for response in model.chat_stream(
tokenizer, query, history=None, generation_config=config
):
# Display progress
print(f"\rGenerating... {len(response)} chars", end="", flush=True)
# Write final response
f.write(response)
f.write("\n")
print(f"\nSaved to output.txt")
Handling Interruptions
Gracefully handle user interruptions:import signal
import sys
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
# Global flag for interruption
interrupted = False
def signal_handler(sig, frame):
global interrupted
interrupted = True
print("\n[Interrupting generation...]")
signal.signal(signal.SIGINT, signal_handler)
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
query = "Write a very long story"
last_response = ""
try:
for response in model.chat_stream(
tokenizer, query, history=None, generation_config=config
):
if interrupted:
print("\n[Generation stopped by user]")
break
print(f"\rQwen: {response}", end="", flush=True)
last_response = response
except KeyboardInterrupt:
print("\n[Generation interrupted]")
print(f"\n\nLast response: {last_response[:100]}...")
Multi-turn Streaming Chat
Maintain conversation history with streaming:from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
history = []
while True:
query = input("\nYou: ").strip()
if not query:
continue
if query.lower() in ['quit', 'exit']:
break
print("Assistant: ", end="", flush=True)
response = ""
try:
for response in model.chat_stream(
tokenizer,
query,
history=history,
generation_config=config
):
print(f"\rAssistant: {response}", end="", flush=True)
except KeyboardInterrupt:
print("\n[Interrupted]")
continue
print() # Newline
history.append((query, response))
# Optional: limit history length
if len(history) > 10:
history = history[-10:]
print("Goodbye!")
Advanced: Streaming with Callbacks
Implement custom callbacks for token generation:from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig
from typing import Callable
class StreamingCallback:
def __init__(self, on_token: Callable[[str], None] = None):
self.on_token = on_token or (lambda x: None)
self.full_text = ""
def __call__(self, text: str):
# Calculate new tokens
new_text = text[len(self.full_text):]
if new_text:
self.on_token(new_text)
self.full_text = text
def main():
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat",
device_map="auto",
trust_remote_code=True
).eval()
config = GenerationConfig.from_pretrained(
"Qwen/Qwen-7B-Chat",
trust_remote_code=True
)
# Define callback
def on_new_token(token: str):
print(token, end="", flush=True)
callback = StreamingCallback(on_token=on_new_token)
query = "Tell me about artificial intelligence"
print("Assistant: ", end="", flush=True)
for response in model.chat_stream(
tokenizer, query, history=None, generation_config=config
):
callback(response)
print()
if __name__ == "__main__":
main()
Performance Considerations
Latency vs Throughput
Latency vs Throughput
Streaming reduces time-to-first-token but doesn’t necessarily increase overall throughput:
- Time to First Token: Much faster with streaming
- Total Generation Time: Similar to non-streaming
- User Experience: Significantly better with streaming
Network Overhead
Network Overhead
For remote APIs, streaming may have overhead:
# Local inference: streaming is always beneficial
# Remote API: consider trade-offs
# For remote, batch smaller requests when possible
config.max_new_tokens = 512 # Reasonable limit
Memory Usage
Memory Usage
Streaming doesn’t reduce memory usage:
# Memory usage is the same for streaming and non-streaming
# To reduce memory, use quantization or smaller models
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B-Chat-Int4", # Quantized version
device_map="auto",
trust_remote_code=True
).eval()
Next Steps
Multi-GPU Inference
Scale streaming across multiple GPUs
Batch Inference
Process multiple requests efficiently
Web Deployment
Build web interfaces with streaming
API Server
Create streaming API endpoints