Skip to main content

Overview

Streaming allows you to receive chat completion responses incrementally as they are generated, rather than waiting for the complete response.

Endpoint

POST /v1/chat/completions
Set stream: true in the request body to enable streaming.

Request

Streaming Parameters

stream
boolean
required
Set to true to enable streaming
stream_options
object
Additional streaming options
{
  "include_usage": true
}
All other parameters are identical to the Chat Completions endpoint.

Response Format

Streamed responses are sent as Server-Sent Events (SSE):
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Chunk Object

id
string
Unique identifier for the completion
object
string
Object type, always chat.completion.chunk
created
integer
Unix timestamp of creation
model
string
The model used
choices
array
Array of streaming choices
index
integer
Choice index
delta
object
Incremental message content
role
string
Role (only in first chunk)
content
string
Incremental content
tool_calls
array
Incremental tool calls
finish_reason
string
Reason for completion (only in last chunk)

Examples

Basic Streaming Request

curl http://localhost:8787/v1/chat/completions \
  -H "Content-Type: application/json" \
  -H "x-portkey-provider: openai" \
  -H "x-portkey-api-key: sk-..." \
  -d '{
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Write a haiku about recursion"}],
    "stream": true
  }'

Python Streaming Example

from portkey_ai import Portkey

client = Portkey(
    provider="openai",
    Authorization="sk-..."
)

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a story about AI"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

JavaScript Streaming Example

import Portkey from 'portkey-ai';

const client = new Portkey({
  provider: 'openai',
  Authorization: 'sk-...'
});

const stream = await client.chat.completions.create({
  model: 'gpt-4o-mini',
  messages: [{role: 'user', content: 'Write a story about AI'}],
  stream: true
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || '';
  process.stdout.write(content);
}

OpenAI SDK Streaming

from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8787/v1",
    default_headers={
        "x-portkey-provider": "openai",
        "x-portkey-api-key": "sk-..."
    }
)

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Tell me a joke"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")

Streaming with Usage Information

from portkey_ai import Portkey

client = Portkey(
    provider="openai",
    Authorization="sk-..."
)

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
    stream_options={"include_usage": True}
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    
    # Usage info in final chunk
    if hasattr(chunk, 'usage') and chunk.usage:
        print(f"\n\nTokens used: {chunk.usage.total_tokens}")

Streaming with Function Calling

from portkey_ai import Portkey
import json

client = Portkey(
    provider="openai",
    Authorization="sk-..."
)

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather in a location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
            "required": ["location"]
        }
    }
}]

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "What's the weather in Boston?"}],
    tools=tools,
    stream=True
)

for chunk in stream:
    delta = chunk.choices[0].delta
    
    # Handle tool calls
    if delta.tool_calls:
        for tool_call in delta.tool_calls:
            if tool_call.function.name:
                print(f"Calling: {tool_call.function.name}")
            if tool_call.function.arguments:
                print(f"Args: {tool_call.function.arguments}")
    
    # Handle content
    if delta.content:
        print(delta.content, end="")

Error Handling

Handle errors during streaming:
from portkey_ai import Portkey

client = Portkey(
    provider="openai",
    Authorization="sk-..."
)

try:
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
            
except Exception as e:
    print(f"Streaming error: {e}")

Best Practices

  1. Buffer Management: Process chunks as they arrive to provide real-time feedback
  2. Error Recovery: Implement proper error handling for connection issues
  3. Token Counting: Use stream_options.include_usage to track token usage
  4. Connection Timeout: Set appropriate timeouts for long-running streams
  5. UI Updates: Update your UI incrementally for better user experience

Build docs developers (and LLMs) love