Overview
Streaming allows you to receive LLM responses incrementally as they’re generated, providing a better user experience. LiteLLM supports streaming across 100+ providers with a consistent interface.
Quick Start
from litellm import completion
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Write a story about AI" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
Streaming Across Providers
OpenAI
Anthropic
Cohere
Groq
from litellm import completion
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
response = completion(
model = "anthropic/claude-3.5-sonnet" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
response = completion(
model = "cohere/command-r-plus" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
# Groq excels at fast streaming
response = completion(
model = "groq/llama-3.3-70b-versatile" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
Async Streaming
import asyncio
from litellm import acompletion
async def stream_response ():
response = await acompletion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Write a story" }],
stream = True
)
async for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
asyncio.run(stream_response())
Streaming with Function Calls
from litellm import completion
tools = [
{
"type" : "function" ,
"function" : {
"name" : "get_weather" ,
"description" : "Get current weather" ,
"parameters" : {
"type" : "object" ,
"properties" : {
"location" : { "type" : "string" }
},
"required" : [ "location" ]
}
}
}
]
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "What's the weather in NYC?" }],
tools = tools,
stream = True
)
for chunk in response:
# Handle tool call deltas
if chunk.choices[ 0 ].delta.tool_calls:
for tool_call in chunk.choices[ 0 ].delta.tool_calls:
if tool_call.function.name:
print ( f " \n Calling: { tool_call.function.name } " )
if tool_call.function.arguments:
print (tool_call.function.arguments, end = "" )
# Handle content
elif chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
Streaming with Reasoning
Handle reasoning content separately from final answers.
from litellm import completion
response = completion(
model = "openai/o1" ,
messages = [{ "role" : "user" , "content" : "Solve this problem..." }],
stream = True
)
for chunk in response:
# Reasoning content
if chunk.choices[ 0 ].delta.reasoning_content:
print ( "[Thinking]" , chunk.choices[ 0 ].delta.reasoning_content, end = "" )
# Final answer
elif chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
Complete Response from Stream
Build a complete response object from streaming chunks.
from litellm import completion, stream_chunk_builder
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
chunks = []
for chunk in response:
chunks.append(chunk)
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
# Build complete response
complete_response = stream_chunk_builder(chunks)
print ( " \n\n Usage:" , complete_response.usage)
print ( "Model:" , complete_response.model)
Error Handling
from litellm import completion
from litellm.exceptions import APIError, Timeout
try :
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True ,
timeout = 30
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
except Timeout:
print ( "Stream timeout" )
except APIError as e:
print ( f "API error: { e } " )
except Exception as e:
print ( f "Stream error: { e } " )
Streaming Usage Tracking
from litellm import completion
import litellm
# Enable cost tracking in streaming
litellm.include_cost_in_streaming_usage = True
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
# Check usage in final chunk
if chunk.usage:
print ( f " \n\n Tokens: { chunk.usage.total_tokens } " )
if hasattr (chunk.usage, 'completion_tokens_details' ):
print ( f "Cost: $ { chunk.usage.completion_tokens_details } " )
Fake Streaming
Some providers don’t support native streaming. LiteLLM can fake it.
from litellm import completion
# LiteLLM automatically enables fake streaming when needed
response = completion(
model = "groq/llama-3.3-70b-versatile" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
response_format = { "type" : "json_object" }, # Not supported with streaming
stream = True # LiteLLM will fake stream the response
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
LiteLLM automatically detects when fake streaming is needed and handles it transparently.
Stream Timeout Configuration
from litellm import completion
import litellm
# Set max streaming duration (in seconds)
litellm. LITELLM_MAX_STREAMING_DURATION_SECONDS = 300 # 5 minutes
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Very long task..." }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
OpenAI SDK Compatibility
Use LiteLLM with OpenAI’s SDK streaming patterns.
import openai
from litellm import completion
# LiteLLM returns OpenAI-compatible stream objects
response = completion(
model = "anthropic/claude-3.5-sonnet" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
# Works with OpenAI SDK patterns
for chunk in response:
content = chunk.choices[ 0 ].delta.content
if content:
print (content, end = "" )
Chunk Structure
Understanding the streaming chunk format:
from litellm import completion
response = completion(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello!" }],
stream = True
)
for chunk in response:
print ( "Chunk structure:" )
print ( f " ID: { chunk.id } " )
print ( f " Model: { chunk.model } " )
print ( f " Created: { chunk.created } " )
for choice in chunk.choices:
print ( f " Choice { choice.index } :" )
print ( f " Delta: { choice.delta } " )
print ( f " Finish Reason: { choice.finish_reason } " )
if chunk.usage:
print ( f " Usage: { chunk.usage } " )
Provider-Specific Notes
Native streaming support
Returns usage in final chunk
Supports all features (tools, vision, audio)
Native streaming support
Reasoning content in separate delta field
Supports prompt caching in streaming
Ultra-fast streaming performance
May use fake streaming for JSON schema
Excellent for real-time applications
Native streaming support
Citations available in stream
Tool calls streamed incrementally
Local streaming
No network latency
Thinking content in separate field
Best Practices
Always use streaming for user-facing applications
Show typing indicators or placeholders
Handle partial tool call arguments gracefully
Implement timeout handling
Log partial responses before errors
Provide fallback to non-streaming on errors
Enable include_cost_in_streaming_usage
Save final chunks for usage data
Build complete response for detailed metrics