Skip to main content

Overview

Streaming allows you to receive chat completion responses incrementally as they are generated, rather than waiting for the entire response to complete. This is particularly useful for creating responsive user experiences.

Basic Streaming

Set stream=True to enable streaming:
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "Tell me a story about a unicorn."}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

Stream Response Format

When streaming is enabled, the API returns a stream of ChatCompletionChunk objects instead of a single ChatCompletion object.

ChatCompletionChunk Structure

{
    "id": "chatcmpl-123",
    "object": "chat.completion.chunk",
    "created": 1677652288,
    "model": "gpt-5.2",
    "choices": [
        {
            "index": 0,
            "delta": {
                "role": "assistant",
                "content": "Hello"
            },
            "finish_reason": null
        }
    ]
}
id
string
A unique identifier for the chat completion. Each chunk has the same ID.
object
string
The object type, which is always chat.completion.chunk.
created
integer
The Unix timestamp (in seconds) of when the chat completion was created. Each chunk has the same timestamp.
model
string
The model used to generate the completion.
choices
array
A list of chat completion choices. Can contain more than one element if n is greater than 1.

Async Streaming

Use AsyncOpenAI for async streaming:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    stream = await client.chat.completions.create(
        model="gpt-5.2",
        messages=[{"role": "user", "content": "Tell me a story about a unicorn."}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="")

asyncio.run(main())

Stream Options

You can configure streaming behavior with stream_options:
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
    stream_options={"include_usage": True}
)

for chunk in stream:
    # Process chunks
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    
    # The last chunk will include usage statistics
    if chunk.usage:
        print(f"\n\nTokens used: {chunk.usage.total_tokens}")

Stream Options Parameters

include_usage
boolean
default:"false"
If set to true, the final chunk will include a usage field with token usage statistics for the entire request.Note: If the stream is interrupted or cancelled, you may not receive the final usage chunk.

Streaming with Function Calling

When streaming with tool calls, the function information is sent incrementally:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["location"]
            }
        }
    }
]

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in Boston?"}],
    tools=tools,
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.tool_calls:
        for tool_call in chunk.choices[0].delta.tool_calls:
            if tool_call.function.name:
                print(f"Calling function: {tool_call.function.name}")
            if tool_call.function.arguments:
                print(f"Arguments chunk: {tool_call.function.arguments}", end="")

Accumulating Tool Call Data

Since tool calls are sent incrementally, you need to accumulate the chunks:
import json
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in Boston?"}],
    tools=tools,
    stream=True
)

# Accumulate tool call information
tool_calls = {}

for chunk in stream:
    if chunk.choices[0].delta.tool_calls:
        for tool_call_delta in chunk.choices[0].delta.tool_calls:
            index = tool_call_delta.index
            
            # Initialize tool call if not seen before
            if index not in tool_calls:
                tool_calls[index] = {
                    "id": tool_call_delta.id,
                    "function": {
                        "name": "",
                        "arguments": ""
                    }
                }
            
            # Accumulate function name and arguments
            if tool_call_delta.function.name:
                tool_calls[index]["function"]["name"] += tool_call_delta.function.name
            if tool_call_delta.function.arguments:
                tool_calls[index]["function"]["arguments"] += tool_call_delta.function.arguments

# Parse completed tool calls
for idx, tool_call in tool_calls.items():
    function_name = tool_call["function"]["name"]
    function_args = json.loads(tool_call["function"]["arguments"])
    print(f"Function: {function_name}")
    print(f"Arguments: {function_args}")

Handling Finish Reasons

The finish_reason field is only present in the final chunk:
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    
    # Check if stream is complete
    if chunk.choices[0].finish_reason is not None:
        print(f"\n\nStream completed. Reason: {chunk.choices[0].finish_reason}")
Possible finish reasons:
  • stop - Model reached a natural stopping point
  • length - Maximum token limit reached
  • tool_calls - Model called a tool
  • content_filter - Content was filtered
  • function_call - (Deprecated) Model called a function

Error Handling

Handle errors during streaming:
from openai import OpenAI, APIError

client = OpenAI()

try:
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
except APIError as e:
    print(f"\nError occurred: {e}")

Streaming with Context Manager

Use a context manager to ensure proper cleanup:
from openai import OpenAI

client = OpenAI()

with client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
) as stream:
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")

Complete Example: Streaming Chat UI

Here’s a complete example of building a streaming chat interface:
from openai import OpenAI
import sys

client = OpenAI()

def stream_chat_completion(messages):
    """Stream a chat completion and print it in real-time."""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
        stream_options={"include_usage": True}
    )
    
    collected_content = ""
    
    for chunk in stream:
        # Extract content from the chunk
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            collected_content += content
            print(content, end="", flush=True)
        
        # Check for completion
        if chunk.choices and chunk.choices[0].finish_reason:
            print()  # New line
        
        # Print usage stats if available
        if chunk.usage:
            print(f"\n[Tokens: {chunk.usage.total_tokens}]")
    
    return collected_content

def main():
    messages = [
        {"role": "system", "content": "You are a helpful assistant."}
    ]
    
    print("Chat started. Type 'quit' to exit.\n")
    
    while True:
        # Get user input
        user_input = input("You: ")
        
        if user_input.lower() == "quit":
            break
        
        # Add user message to conversation
        messages.append({"role": "user", "content": user_input})
        
        # Stream assistant response
        print("\nAssistant: ", end="", flush=True)
        assistant_content = stream_chat_completion(messages)
        
        # Add assistant response to conversation
        messages.append({"role": "assistant", "content": assistant_content})
        print()  # New line

if __name__ == "__main__":
    main()

Streaming Best Practices

1. Always Handle Incomplete Chunks

Not every chunk will contain content. Always check before accessing:
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

2. Accumulate Complete Messages

For multi-turn conversations, accumulate the full response:
collected_messages = []
for chunk in stream:
    if chunk.choices[0].delta.content:
        collected_messages.append(chunk.choices[0].delta.content)

full_message = "".join(collected_messages)

3. Use Flush for Real-time Display

When printing to stdout, use flush=True for immediate display:
print(content, end="", flush=True)

4. Handle Connection Issues

Always wrap streaming in try-except blocks:
try:
    for chunk in stream:
        # Process chunk
        pass
except Exception as e:
    print(f"Stream interrupted: {e}")

5. Request Usage Stats When Needed

If you need token counts, enable include_usage:
stream_options={"include_usage": True}

Comparison: Streaming vs Non-Streaming

# Wait for complete response
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a poem"}]
)

# Get full content at once
print(response.choices[0].message.content)
Pros:
  • Simpler to implement
  • Full message available immediately
  • Easier to handle errors
Cons:
  • Higher perceived latency
  • No feedback until complete

Build docs developers (and LLMs) love