Overview
Streaming allows you to receive API responses incrementally as they’re generated, rather than waiting for the complete response. This provides:
- Lower perceived latency - Display partial results immediately
- Better user experience - Show progress for long-running generations
- Real-time feedback - Process data as it arrives
The SDK uses Server-Sent Events (SSE) to deliver streaming responses.
Basic Streaming
Synchronous Streaming
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story about a robot."}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
Asynchronous Streaming
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story about a robot."}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
asyncio.run(main())
Stream Object
Streaming responses return a Stream (sync) or AsyncStream (async) object:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)
# Stream is an iterator
for chunk in stream:
print(chunk)
# Access the underlying HTTP response
print(stream.response.headers)
Context Manager
Use a context manager to ensure proper cleanup:
from openai import OpenAI
client = OpenAI()
with client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# Stream is automatically closed
Stream Chunks
Each chunk in a stream contains:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True,
)
for chunk in stream:
print(f"Chunk ID: {chunk.id}")
print(f"Model: {chunk.model}")
print(f"Created: {chunk.created}")
# Delta contains incremental content
delta = chunk.choices[0].delta
if delta.content is not None:
print(f"Content: {delta.content}")
# Check for function calls
if delta.function_call:
print(f"Function: {delta.function_call}")
# Check finish reason
if chunk.choices[0].finish_reason:
print(f"Finish: {chunk.choices[0].finish_reason}")
Accumulating Responses
Build the complete response from stream chunks:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Explain Python"}],
stream=True,
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
print(content, end="")
print(f"\n\nFull response: {full_response}")
Function Calling with Streaming
Stream function calls as they’re generated:
from openai import OpenAI
import json
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
]
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
tools=tools,
stream=True,
)
function_name = ""
function_args = ""
for chunk in stream:
delta = chunk.choices[0].delta
if delta.function_call:
if delta.function_call.name:
function_name += delta.function_call.name
if delta.function_call.arguments:
function_args += delta.function_call.arguments
if function_name:
print(f"Function: {function_name}")
print(f"Arguments: {json.loads(function_args)}")
Error Handling
Handle errors during streaming:
import openai
from openai import OpenAI
client = OpenAI()
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except openai.APIConnectionError as e:
print(f"Connection error: {e}")
except openai.APIStatusError as e:
print(f"API error: {e.status_code} - {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Ensure stream is closed
if 'stream' in locals():
stream.close()
Streaming with Async Error Handling
import asyncio
import openai
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def stream_with_retry(prompt: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
return # Success
except openai.APIConnectionError as e:
print(f"\nConnection error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
asyncio.run(stream_with_retry("Tell me a joke"))
Manual Stream Control
Manually control stream iteration:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Count to 100"}],
stream=True,
)
# Get first 10 chunks only
for i, chunk in enumerate(stream):
if i >= 10:
stream.close()
break
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Responses API Streaming
Stream responses using the Responses API:
from openai import OpenAI
client = OpenAI()
stream = client.responses.create(
model="gpt-5.2",
input="Write a haiku about coding",
stream=True,
)
for event in stream:
print(event)
The SDK automatically handles SSE parsing. Each event contains:
- event - Event type (if specified)
- data - JSON payload
- id - Event ID (if specified)
- retry - Retry timeout in milliseconds (if specified)
The stream automatically closes when it receives a [DONE] message or when the response is complete.
Best Practices
- Always close streams properly using context managers or explicit
.close() calls
- Handle errors during streaming - connections can drop mid-stream
- Don’t assume chunks arrive in a specific size or timing
- Accumulate function call arguments before parsing as JSON
- Use async streaming for high-concurrency workloads
- Buffering - The SDK buffers SSE data automatically, no manual buffering needed
- Backpressure - Slow consumers may cause buffering in the underlying HTTP connection
- Connection pooling - Streaming requests hold connections longer, consider connection limits
- Timeouts - Streaming respects timeout settings, consider increasing for long streams