Skip to main content

Streaming Responses

The OpenAI Python SDK provides support for streaming responses using Server-Side Events (SSE). This allows you to receive model output in real-time as it’s generated, rather than waiting for the entire response to complete.

Basic Streaming

To enable streaming, set the stream parameter to True when calling client.responses.create():
from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="Write a one-sentence bedtime story about a unicorn.",
    stream=True,
)

for event in stream:
    print(event)

Stream Events

When streaming is enabled, the API returns a Stream[ResponseStreamEvent] object. Each event in the stream represents a different type of update:

Event Types

The stream emits various event types to communicate the progress of the response:
  • response.created - Initial event when the response starts
  • response.in_progress - Response is being generated
  • response.output_item_added - A new output item was added
  • response.content_part_added - A new content part was added
  • response.text_delta - Text content delta (incremental update)
  • response.text_done - Text content is complete
  • response.output_item_done - An output item is complete
  • response.completed - Response generation is complete
  • response.failed - Response generation failed
  • response.error - An error occurred
  • response.incomplete - Response is incomplete

Tool Call Events

When using tools, additional events are emitted:
  • response.function_call_arguments_delta - Function call arguments delta
  • response.function_call_arguments_done - Function call arguments complete
  • response.web_search_call_searching - Web search in progress
  • response.web_search_call_completed - Web search completed
  • response.file_search_call_searching - File search in progress
  • response.file_search_call_completed - File search completed
  • response.code_interpreter_call_interpreting - Code interpreter running
  • response.code_interpreter_call_code_delta - Code delta
  • response.code_interpreter_call_code_done - Code complete
  • response.code_interpreter_call_completed - Code interpreter completed

Reasoning Events

For reasoning models (o-series and gpt-5):
  • response.reasoning_text_delta - Reasoning text delta
  • response.reasoning_text_done - Reasoning text complete
  • response.reasoning_summary_part_added - Reasoning summary part added
  • response.reasoning_summary_text_delta - Reasoning summary text delta
  • response.reasoning_summary_text_done - Reasoning summary text complete
  • response.reasoning_summary_part_done - Reasoning summary part complete

Async Streaming

The async client uses the same interface:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    stream = await client.responses.create(
        model="gpt-5.2",
        input="Write a one-sentence bedtime story about a unicorn.",
        stream=True,
    )

    async for event in stream:
        print(event)

asyncio.run(main())

Processing Text Deltas

Here’s an example of processing only the text content as it streams:
from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="Write a short poem about Python programming.",
    stream=True,
)

for event in stream:
    if event.type == "response.text_delta":
        print(event.delta, end="", flush=True)
    elif event.type == "response.text_done":
        print()  # New line when done

Handling Different Event Types

from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="What's the weather like today?",
    stream=True,
)

for event in stream:
    if event.type == "response.created":
        print(f"Response started with ID: {event.response.id}")
    
    elif event.type == "response.text_delta":
        print(event.delta, end="", flush=True)
    
    elif event.type == "response.text_done":
        print()  # New line
    
    elif event.type == "response.completed":
        print(f"\nResponse completed at {event.response.completed_at}")
        print(f"Total tokens used: {event.response.usage.total_tokens}")
    
    elif event.type == "response.failed":
        print(f"Response failed: {event.response.error}")
    
    elif event.type == "response.error":
        print(f"Error: {event.error}")

Stream Options

You can configure streaming behavior using the stream_options parameter:
from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="Explain quantum computing.",
    stream=True,
    stream_options={
        "include_usage": True,  # Include token usage in final event
    },
)

for event in stream:
    if event.type == "response.text_delta":
        print(event.delta, end="", flush=True)
    elif event.type == "response.completed":
        print(f"\n\nTokens used: {event.response.usage.total_tokens}")

Streaming with Tools

from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5.2",
    input="Search the web for recent news about AI.",
    tools=[{"type": "web_search"}],
    stream=True,
)

for event in stream:
    if event.type == "response.web_search_call_searching":
        print("Searching the web...")
    
    elif event.type == "response.web_search_call_completed":
        print("Web search completed")
    
    elif event.type == "response.text_delta":
        print(event.delta, end="", flush=True)
    
    elif event.type == "response.completed":
        print("\nResponse complete!")

Error Handling

from openai import OpenAI
import openai

client = OpenAI()

try:
    stream = client.responses.create(
        model="gpt-5.2",
        input="Tell me a joke.",
        stream=True,
    )
    
    for event in stream:
        if event.type == "response.text_delta":
            print(event.delta, end="", flush=True)
        elif event.type == "response.error":
            print(f"\nStream error: {event.error}")
            break

except openai.APIConnectionError as e:
    print("The server could not be reached")
    print(e.__cause__)  # an underlying Exception
except openai.RateLimitError as e:
    print("A 429 status code was received; we should back off a bit.")
except openai.APIStatusError as e:
    print("Another non-200-range status code was received")
    print(e.status_code)
    print(e.response)

Best Practices

  1. Use flush=True - When printing text deltas, use flush=True to ensure immediate output:
    print(event.delta, end="", flush=True)
    
  2. Handle all event types - Make sure to handle different event types gracefully, especially error events.
  3. Close streams properly - When using context managers or manual stream handling, ensure streams are properly closed to free up resources.
  4. Monitor token usage - Use stream_options to include usage information in the final event to track costs.
  5. Implement timeouts - Set appropriate timeouts to prevent indefinite waiting:
    stream = client.responses.create(
        model="gpt-5.2",
        input="Hello",
        stream=True,
        timeout=30.0,  # 30 second timeout
    )
    

Return Type

When stream=True, the method returns:
  • Sync: Stream[ResponseStreamEvent]
  • Async: AsyncStream[ResponseStreamEvent]
When stream=False or omitted, the method returns:
  • Response - The complete response object

See Also

Build docs developers (and LLMs) love