Overview
Streaming allows you to receive tokens as they are generated, providing a better user experience for real-time applications like chatbots.
Basic Usage
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a short story"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Streaming Parameters
Set to True to enable streaming. When enabled, the function returns a CustomStreamWrapper instead of a ModelResponse.
Additional streaming options. Currently supports:
include_usage: If True, includes token usage in the final chunk
stream_options={"include_usage": True}
Each chunk in the stream is a ModelResponseStream object:
class ModelResponseStream:
id: str
choices: List[StreamingChoices]
created: int
model: str
system_fingerprint: Optional[str]
class StreamingChoices:
finish_reason: Optional[str]
index: int
delta: Delta
class Delta:
content: Optional[str]
role: Optional[str]
tool_calls: Optional[List[ChatCompletionDeltaToolCall]]
Examples
Basic Streaming
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Count from 1 to 10"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # New line after streaming completes
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Explain AI"}],
stream=True,
stream_options={"include_usage": True}
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
# Usage info is in the last chunk
if hasattr(chunk, 'usage') and chunk.usage:
print(f"\n\nTokens used: {chunk.usage.total_tokens}")
Building Complete Response from Stream
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
complete_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
complete_response += content
print(content, end="", flush=True)
print(f"\n\nComplete response: {complete_response}")
Handling Finish Reasons
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Explain quantum physics"}],
stream=True,
max_tokens=100 # Will likely hit token limit
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.choices[0].finish_reason:
print(f"\n\nFinished: {chunk.choices[0].finish_reason}")
# Possible values: 'stop', 'length', 'content_filter', 'tool_calls'
Streaming with Different Providers
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a joke"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Streaming with Function Calls
from litellm import completion
import json
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
}
}
}
}]
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
tools=tools,
stream=True
)
function_name = ""
function_args = ""
for chunk in response:
delta = chunk.choices[0].delta
if delta.tool_calls:
for tool_call in delta.tool_calls:
if tool_call.function.name:
function_name += tool_call.function.name
if tool_call.function.arguments:
function_args += tool_call.function.arguments
if chunk.choices[0].finish_reason == "tool_calls":
print(f"Function: {function_name}")
print(f"Arguments: {json.loads(function_args)}")
Error Handling
from litellm import completion
from litellm.exceptions import APIError, Timeout
try:
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
timeout=10.0
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except Timeout:
print("Request timed out during streaming")
except APIError as e:
print(f"API error occurred: {e}")
except Exception as e:
print(f"Unexpected error during streaming: {e}")
Advanced Usage
Custom Stream Processing
from litellm import completion
import time
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a poem"}],
stream=True
)
start_time = time.time()
chunk_count = 0
total_content = ""
for chunk in response:
chunk_count += 1
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
total_content += content
print(content, end="", flush=True)
end_time = time.time()
print(f"\n\nReceived {chunk_count} chunks in {end_time - start_time:.2f}s")
print(f"Total content length: {len(total_content)} characters")
Streaming with Token Counting
from litellm import completion
import tiktoken
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Explain machine learning"}],
stream=True
)
encoding = tiktoken.encoding_for_model("gpt-4")
complete_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
complete_response += content
print(content, end="", flush=True)
token_count = len(encoding.encode(complete_response))
print(f"\n\nGenerated approximately {token_count} tokens")
Cancelling a Stream
from litellm import completion
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a very long story"}],
stream=True
)
max_chunks = 10
chunk_count = 0
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
chunk_count += 1
if chunk_count >= max_chunks:
print("\n\nStopping early...")
break # Stream automatically closes on iteration exit
Best Practices
- Always handle empty content: Check if
delta.content exists before using it
- Use
flush=True: Ensure output appears immediately with print(..., flush=True)
- Monitor finish_reason: Check
finish_reason to understand why streaming ended
- Handle errors gracefully: Wrap streaming in try-except blocks
- Close streams properly: The stream auto-closes when iteration completes or breaks
- Latency: Streaming reduces time-to-first-token, improving perceived responsiveness
- Memory: Streaming uses less memory than buffering the complete response
- Connection: Keep connections alive during streaming - consider timeout settings
- Rate limits: Streaming counts toward rate limits the same as non-streaming