Streaming allows you to receive chat completion responses incrementally as they are generated, rather than waiting for the entire response to complete. This is particularly useful for creating responsive user experiences.
from openai import OpenAIclient = OpenAI()stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "Tell me a story about a unicorn."}], stream=True)for chunk in stream: if chunk.choices[0].delta.content is not None: print(chunk.choices[0].delta.content, end="")
import asynciofrom openai import AsyncOpenAIclient = AsyncOpenAI()async def main(): stream = await client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "Tell me a story about a unicorn."}], stream=True ) async for chunk in stream: if chunk.choices[0].delta.content is not None: print(chunk.choices[0].delta.content, end="")asyncio.run(main())
You can configure streaming behavior with stream_options:
stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello!"}], stream=True, stream_options={"include_usage": True})for chunk in stream: # Process chunks if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") # The last chunk will include usage statistics if chunk.usage: print(f"\n\nTokens used: {chunk.usage.total_tokens}")
If set to true, the final chunk will include a usage field with token usage statistics for the entire request.Note: If the stream is interrupted or cancelled, you may not receive the final usage chunk.
Since tool calls are sent incrementally, you need to accumulate the chunks:
import jsonfrom openai import OpenAIclient = OpenAI()stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "What's the weather in Boston?"}], tools=tools, stream=True)# Accumulate tool call informationtool_calls = {}for chunk in stream: if chunk.choices[0].delta.tool_calls: for tool_call_delta in chunk.choices[0].delta.tool_calls: index = tool_call_delta.index # Initialize tool call if not seen before if index not in tool_calls: tool_calls[index] = { "id": tool_call_delta.id, "function": { "name": "", "arguments": "" } } # Accumulate function name and arguments if tool_call_delta.function.name: tool_calls[index]["function"]["name"] += tool_call_delta.function.name if tool_call_delta.function.arguments: tool_calls[index]["function"]["arguments"] += tool_call_delta.function.arguments# Parse completed tool callsfor idx, tool_call in tool_calls.items(): function_name = tool_call["function"]["name"] function_args = json.loads(tool_call["function"]["arguments"]) print(f"Function: {function_name}") print(f"Arguments: {function_args}")
The finish_reason field is only present in the final chunk:
stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello!"}], stream=True)for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") # Check if stream is complete if chunk.choices[0].finish_reason is not None: print(f"\n\nStream completed. Reason: {chunk.choices[0].finish_reason}")
Possible finish reasons:
stop - Model reached a natural stopping point
length - Maximum token limit reached
tool_calls - Model called a tool
content_filter - Content was filtered
function_call - (Deprecated) Model called a function
from openai import OpenAIclient = OpenAI()with client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello!"}], stream=True) as stream: for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="")
Here’s a complete example of building a streaming chat interface:
from openai import OpenAIimport sysclient = OpenAI()def stream_chat_completion(messages): """Stream a chat completion and print it in real-time.""" stream = client.chat.completions.create( model="gpt-4o", messages=messages, stream=True, stream_options={"include_usage": True} ) collected_content = "" for chunk in stream: # Extract content from the chunk if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content collected_content += content print(content, end="", flush=True) # Check for completion if chunk.choices and chunk.choices[0].finish_reason: print() # New line # Print usage stats if available if chunk.usage: print(f"\n[Tokens: {chunk.usage.total_tokens}]") return collected_contentdef main(): messages = [ {"role": "system", "content": "You are a helpful assistant."} ] print("Chat started. Type 'quit' to exit.\n") while True: # Get user input user_input = input("You: ") if user_input.lower() == "quit": break # Add user message to conversation messages.append({"role": "user", "content": user_input}) # Stream assistant response print("\nAssistant: ", end="", flush=True) assistant_content = stream_chat_completion(messages) # Add assistant response to conversation messages.append({"role": "assistant", "content": assistant_content}) print() # New lineif __name__ == "__main__": main()
# Wait for complete responseresponse = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Write a poem"}])# Get full content at onceprint(response.choices[0].message.content)
Pros:
Simpler to implement
Full message available immediately
Easier to handle errors
Cons:
Higher perceived latency
No feedback until complete
# Get response incrementallystream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Write a poem"}], stream=True)# Display content as it arrivesfor chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="")