Skip to main content

Overview

Streaming allows you to receive API responses incrementally as they’re generated, rather than waiting for the complete response. This provides:
  • Lower perceived latency - Display partial results immediately
  • Better user experience - Show progress for long-running generations
  • Real-time feedback - Process data as it arrives
The SDK uses Server-Sent Events (SSE) to deliver streaming responses.

Basic Streaming

Synchronous Streaming

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a story about a robot."}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

Asynchronous Streaming

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Write a story about a robot."}],
        stream=True,
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="")

asyncio.run(main())

Stream Object

Streaming responses return a Stream (sync) or AsyncStream (async) object:
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
)

# Stream is an iterator
for chunk in stream:
    print(chunk)

# Access the underlying HTTP response
print(stream.response.headers)

Context Manager

Use a context manager to ensure proper cleanup:
from openai import OpenAI

client = OpenAI()

with client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
) as stream:
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
# Stream is automatically closed

Stream Chunks

Each chunk in a stream contains:
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Count to 5"}],
    stream=True,
)

for chunk in stream:
    print(f"Chunk ID: {chunk.id}")
    print(f"Model: {chunk.model}")
    print(f"Created: {chunk.created}")
    
    # Delta contains incremental content
    delta = chunk.choices[0].delta
    
    if delta.content is not None:
        print(f"Content: {delta.content}")
    
    # Check for function calls
    if delta.function_call:
        print(f"Function: {delta.function_call}")
    
    # Check finish reason
    if chunk.choices[0].finish_reason:
        print(f"Finish: {chunk.choices[0].finish_reason}")

Accumulating Responses

Build the complete response from stream chunks:
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Explain Python"}],
    stream=True,
)

full_response = ""
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        content = chunk.choices[0].delta.content
        full_response += content
        print(content, end="")

print(f"\n\nFull response: {full_response}")

Function Calling with Streaming

Stream function calls as they’re generated:
from openai import OpenAI
import json

client = OpenAI()

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
                },
                "required": ["location"],
            },
        },
    }
]

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    tools=tools,
    stream=True,
)

function_name = ""
function_args = ""

for chunk in stream:
    delta = chunk.choices[0].delta
    
    if delta.function_call:
        if delta.function_call.name:
            function_name += delta.function_call.name
        if delta.function_call.arguments:
            function_args += delta.function_call.arguments

if function_name:
    print(f"Function: {function_name}")
    print(f"Arguments: {json.loads(function_args)}")

Error Handling

Handle errors during streaming:
import openai
from openai import OpenAI

client = OpenAI()

try:
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True,
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
            
except openai.APIConnectionError as e:
    print(f"Connection error: {e}")
except openai.APIStatusError as e:
    print(f"API error: {e.status_code} - {e.message}")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    # Ensure stream is closed
    if 'stream' in locals():
        stream.close()

Streaming with Async Error Handling

import asyncio
import openai
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def stream_with_retry(prompt: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            stream = await client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
            )
            
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    print(chunk.choices[0].delta.content, end="")
            
            return  # Success
            
        except openai.APIConnectionError as e:
            print(f"\nConnection error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

asyncio.run(stream_with_retry("Tell me a joke"))

Manual Stream Control

Manually control stream iteration:
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Count to 100"}],
    stream=True,
)

# Get first 10 chunks only
for i, chunk in enumerate(stream):
    if i >= 10:
        stream.close()
        break
    
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Responses API Streaming

Stream responses using the Responses API:
from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="Write a haiku about coding",
    stream=True,
)

for event in stream:
    print(event)

Server-Sent Events Format

The SDK automatically handles SSE parsing. Each event contains:
  • event - Event type (if specified)
  • data - JSON payload
  • id - Event ID (if specified)
  • retry - Retry timeout in milliseconds (if specified)
The stream automatically closes when it receives a [DONE] message or when the response is complete.

Best Practices

  • Always close streams properly using context managers or explicit .close() calls
  • Handle errors during streaming - connections can drop mid-stream
  • Don’t assume chunks arrive in a specific size or timing
  • Accumulate function call arguments before parsing as JSON
  • Use async streaming for high-concurrency workloads

Performance Considerations

  • Buffering - The SDK buffers SSE data automatically, no manual buffering needed
  • Backpressure - Slow consumers may cause buffering in the underlying HTTP connection
  • Connection pooling - Streaming requests hold connections longer, consider connection limits
  • Timeouts - Streaming respects timeout settings, consider increasing for long streams

Build docs developers (and LLMs) love