Skip to main content

Overview

Streaming allows you to receive LLM responses incrementally as they’re generated, providing a better user experience. LiteLLM supports streaming across 100+ providers with a consistent interface.

Quick Start

from litellm import completion

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a story about AI"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Streaming Across Providers

from litellm import completion

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Async Streaming

import asyncio
from litellm import acompletion

async def stream_response():
    response = await acompletion(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Write a story"}],
        stream=True
    )
    
    async for chunk in response:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")

asyncio.run(stream_response())

Streaming with Function Calls

from litellm import completion

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }
]

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in NYC?"}],
    tools=tools,
    stream=True
)

for chunk in response:
    # Handle tool call deltas
    if chunk.choices[0].delta.tool_calls:
        for tool_call in chunk.choices[0].delta.tool_calls:
            if tool_call.function.name:
                print(f"\nCalling: {tool_call.function.name}")
            if tool_call.function.arguments:
                print(tool_call.function.arguments, end="")
    
    # Handle content
    elif chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Streaming with Reasoning

Handle reasoning content separately from final answers.
from litellm import completion

response = completion(
    model="openai/o1",
    messages=[{"role": "user", "content": "Solve this problem..."}],
    stream=True
)

for chunk in response:
    # Reasoning content
    if chunk.choices[0].delta.reasoning_content:
        print("[Thinking]", chunk.choices[0].delta.reasoning_content, end="")
    
    # Final answer
    elif chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Complete Response from Stream

Build a complete response object from streaming chunks.
from litellm import completion, stream_chunk_builder

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

chunks = []
for chunk in response:
    chunks.append(chunk)
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

# Build complete response
complete_response = stream_chunk_builder(chunks)
print("\n\nUsage:", complete_response.usage)
print("Model:", complete_response.model)

Error Handling

from litellm import completion
from litellm.exceptions import APIError, Timeout

try:
    response = completion(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True,
        timeout=30
    )
    
    for chunk in response:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
            
except Timeout:
    print("Stream timeout")
except APIError as e:
    print(f"API error: {e}")
except Exception as e:
    print(f"Stream error: {e}")

Streaming Usage Tracking

from litellm import completion
import litellm

# Enable cost tracking in streaming
litellm.include_cost_in_streaming_usage = True

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    
    # Check usage in final chunk
    if chunk.usage:
        print(f"\n\nTokens: {chunk.usage.total_tokens}")
        if hasattr(chunk.usage, 'completion_tokens_details'):
            print(f"Cost: ${chunk.usage.completion_tokens_details}")

Fake Streaming

Some providers don’t support native streaming. LiteLLM can fake it.
from litellm import completion

# LiteLLM automatically enables fake streaming when needed
response = completion(
    model="groq/llama-3.3-70b-versatile",
    messages=[{"role": "user", "content": "Hello!"}],
    response_format={"type": "json_object"},  # Not supported with streaming
    stream=True  # LiteLLM will fake stream the response
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
LiteLLM automatically detects when fake streaming is needed and handles it transparently.

Stream Timeout Configuration

from litellm import completion
import litellm

# Set max streaming duration (in seconds)
litellm.LITELLM_MAX_STREAMING_DURATION_SECONDS = 300  # 5 minutes

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Very long task..."}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

OpenAI SDK Compatibility

Use LiteLLM with OpenAI’s SDK streaming patterns.
import openai
from litellm import completion

# LiteLLM returns OpenAI-compatible stream objects
response = completion(
    model="anthropic/claude-3.5-sonnet",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

# Works with OpenAI SDK patterns
for chunk in response:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="")

Chunk Structure

Understanding the streaming chunk format:
from litellm import completion

response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in response:
    print("Chunk structure:")
    print(f"  ID: {chunk.id}")
    print(f"  Model: {chunk.model}")
    print(f"  Created: {chunk.created}")
    
    for choice in chunk.choices:
        print(f"  Choice {choice.index}:")
        print(f"    Delta: {choice.delta}")
        print(f"    Finish Reason: {choice.finish_reason}")
    
    if chunk.usage:
        print(f"  Usage: {chunk.usage}")

Provider-Specific Notes

  • Native streaming support
  • Returns usage in final chunk
  • Supports all features (tools, vision, audio)
  • Native streaming support
  • Reasoning content in separate delta field
  • Supports prompt caching in streaming
  • Ultra-fast streaming performance
  • May use fake streaming for JSON schema
  • Excellent for real-time applications
  • Native streaming support
  • Citations available in stream
  • Tool calls streamed incrementally
  • Local streaming
  • No network latency
  • Thinking content in separate field

Best Practices

  • Always use streaming for user-facing applications
  • Show typing indicators or placeholders
  • Handle partial tool call arguments gracefully
  • Implement timeout handling
  • Log partial responses before errors
  • Provide fallback to non-streaming on errors
  • Streaming reduces perceived latency
  • Monitor time-to-first-token (TTFT)
  • Use async for concurrent streams
  • Enable include_cost_in_streaming_usage
  • Save final chunks for usage data
  • Build complete response for detailed metrics

Build docs developers (and LLMs) love