Skip to main content

Overview

Streaming allows you to receive response content incrementally as it’s generated, providing a better user experience for long-running requests. The Responses API supports multiple streaming patterns for different use cases.

Basic Streaming

The simplest way to stream responses is using the responses.stream method and handling events:
#!/usr/bin/env ruby
# frozen_string_literal: true
# typed: strict

require_relative "../../lib/openai"

client = OpenAI::Client.new

stream = client.responses.stream(
  input: "Write a haiku about OpenAI.",
  model: "gpt-4o-2024-08-06"
)

stream.each do |event|
  case event
  when OpenAI::Streaming::ResponseTextDeltaEvent
    print(event.delta)
  when OpenAI::Streaming::ResponseTextDoneEvent
    puts("\n--------------------------")
  when OpenAI::Streaming::ResponseCompletedEvent
    puts("Response completed! (response id: #{event.response.id})")
  end
end

Key Events

  • ResponseTextDeltaEvent: Contains incremental text content in event.delta
  • ResponseTextDoneEvent: Fired when text generation is complete
  • ResponseCompletedEvent: The final event containing the full response object

Simplified Text Streaming

For cases where you only need the text content, use the .text helper method:
#!/usr/bin/env ruby
# frozen_string_literal: true
# typed: strong

require_relative "../../lib/openai"

client = OpenAI::Client.new

stream = client.responses.stream(
  input: "Write a haiku about OpenAI.",
  model: "gpt-4o-2024-08-06"
)

stream.text.each do |text|
  print(text)
end

puts

# Get all of the text that was streamed with .get_output_text
puts "Character count: #{stream.get_output_text.length}"
The .text method provides a simplified iterator that yields only text deltas, filtering out other event types automatically.

Streaming with Tools

Stream function call arguments as they’re generated:
#!/usr/bin/env ruby
# frozen_string_literal: true
# typed: true

require_relative "../../lib/openai"

class DynamicValue < OpenAI::BaseModel
  required :column_name, String
end

class Condition < OpenAI::BaseModel
  required :column, String
  required :operator, OpenAI::EnumOf[:eq, :gt, :lt, :le, :ge, :ne]
  required :value, OpenAI::UnionOf[String, Integer, DynamicValue]
end

# you can assign `OpenAI::{...}` schema specifiers to a constant
Columns = OpenAI::EnumOf[
  :id,
  :status,
  :expected_delivery_date,
  :delivered_at,
  :shipped_at,
  :ordered_at,
  :canceled_at
]

class Query < OpenAI::BaseModel
  required :table_name, OpenAI::EnumOf[:orders, :customers, :products]
  required :columns, OpenAI::ArrayOf[Columns]
  required :conditions, OpenAI::ArrayOf[Condition]
  required :order_by, OpenAI::EnumOf[:asc, :desc]
end

client = OpenAI::Client.new

stream = client.responses.stream(
  model: "gpt-4o-2024-08-06",
  input: "look up all my orders in november of last year that were fulfilled but not delivered on time",
  tools: [Query]
)

stream.each do |event|
  case event
  when OpenAI::Streaming::ResponseFunctionCallArgumentsDeltaEvent
    puts("delta: #{event.delta}")
    puts("snapshot: #{event.snapshot}")
  end
end

response = stream.get_final_response

puts
puts("----- parsed outputs from final response -----")
response
  .output
  .each do |output|
    case output
    when OpenAI::Models::Responses::ResponseFunctionToolCall
      # parsed is an instance of `Query`
      pp(output.parsed)
    end
  end

Tool Streaming Events

  • ResponseFunctionCallArgumentsDeltaEvent: Contains:
    • event.delta: The incremental JSON string fragment
    • event.snapshot: The accumulated JSON so far
The snapshot field is particularly useful for displaying partial function arguments to users as they’re being generated.

Resuming Streams from Previous Responses

You can pause and resume streaming responses using previous_response_id and starting_after:
1

Create a background streaming response

Start a stream with background: true to allow it to continue processing server-side:
stream = client.responses.stream(
  model: "o4-mini",
  input: "Tell me a short story about a robot learning to paint.",
  instructions: "You are a creative storyteller.",
  background: true
)
2

Capture the response ID and last sequence number

Process events until you want to pause, capturing the response ID and last sequence number:
events = []
response_id = ""

stream.each do |event|
  events << event
  
  case event
  when OpenAI::Models::Responses::ResponseCreatedEvent
    response_id = event.response.id if response_id.empty?
  end

  # Simulate stopping after a few events
  if events.length >= 5
    break
  end
end

last_sequence = events.last.sequence_number
3

Resume the stream

Create a new stream using the saved response ID:
resumed_stream = client.responses.stream(
  previous_response_id: response_id,
  starting_after: last_sequence
)

resumed_stream.each do |event|
  # Process events from where you left off
  puts "Event: #{event.type} (seq: #{event.sequence_number})"
end

Resuming with Structured Outputs

When resuming a stream that uses structured outputs, you must pass the same text parameter:
# Initial stream
stream = client.responses.stream(
  input: "solve 8x + 31 = 2",
  model: "gpt-4o-2024-08-06",
  text: MathResponse,
  background: true
)

# Resume with the same text parameter
resumed_stream = client.responses.stream(
  previous_response_id: response_id,
  starting_after: last_sequence,
  text: MathResponse  # Required to access parsed outputs
)
The background: true option allows the response to continue processing on the server even after you stop consuming events, enabling efficient pause-and-resume patterns.

Helper Methods

get_final_response()

Retrieve the complete response object after streaming completes:
response = stream.get_final_response

get_output_text()

Get all accumulated text from a completed stream:
full_text = stream.get_output_text

Complete Resuming Example

#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative "../../lib/openai"

# This example demonstrates how to resume a streaming response.

client = OpenAI::Client.new

begin
  puts "----- resuming stream from a previous response -----"

  # Request 1: Create a new streaming response with background=true
  puts "Creating a new streaming response..."
  stream = client.responses.stream(
    model: "o4-mini",
    input: "Tell me a short story about a robot learning to paint.",
    instructions: "You are a creative storyteller.",
    background: true
  )

  events = []
  response_id = ""

  stream.each do |event|
    events << event
    puts "Event from initial stream: #{event.type} (seq: #{event.sequence_number})"
    case event

    when OpenAI::Models::Responses::ResponseCreatedEvent
      response_id = event.response.id if response_id.empty?
      puts("Captured response ID: #{response_id}")
    end

    # Simulate stopping after a few events
    if events.length >= 5
      puts "Terminating after #{events.length} events"
      break
    end
  end

  puts "Collected #{events.length} events"
  puts "Response ID: #{response_id}"
  puts "Last event sequence number: #{events.last.sequence_number}.\n"

  # Give the background response some time to process more events.
  puts "Waiting a moment for the background response to progress...\n"
  sleep(3)

  # Request 2: Resume the stream using the captured response_id.
  puts
  puts "Resuming stream from sequence #{events.last.sequence_number}..."

  resumed_stream = client.responses.stream(
    previous_response_id: response_id,
    starting_after: events.last.sequence_number
  )

  resumed_events = []
  resumed_stream.each do |event|
    resumed_events << event
    puts "Event from resumed stream: #{event.type} (seq: #{event.sequence_number})"
    # Stop when we get the completed event or collect enough events.
    if event.is_a?(OpenAI::Models::Responses::ResponseCompletedEvent)
      puts "Response completed!"
      break
    end

    break if resumed_events.length >= 10
  end

  puts "Collected #{resumed_events.length} additional events"

  # Show that we properly resumed from where we left off.
  if resumed_events.any?
    first_resumed_event = resumed_events.first
    last_initial_event = events.last
    puts "First resumed event sequence: #{first_resumed_event.sequence_number}"
    puts "Should be greater than last initial event: #{last_initial_event.sequence_number}"
  end
end

Build docs developers (and LLMs) love