Skip to main content

Overview

Streaming allows you to receive LLM responses incrementally as they’re generated, rather than waiting for the complete response. This significantly improves perceived latency and enables real-time user interfaces like ChatGPT. The Gateway supports streaming for all compatible providers with automatic format transformation to OpenAI’s Server-Sent Events (SSE) format.

How It Works

When streaming is enabled:
  1. Request is sent to the provider with stream: true
  2. Provider begins generating the response
  3. Gateway receives chunks as they’re generated
  4. Chunks are transformed to OpenAI format (if needed)
  5. Transformed chunks are streamed to your application
  6. Final chunk signals completion
The Gateway handles different streaming formats (SSE, JSON lines, AWS event streams) and normalizes them to OpenAI’s format, making it easy to switch providers.

Configuration

Basic Streaming

Simply set stream: true in your request:
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Tell me a story"}],
    stream=True
)
No special Gateway configuration needed.

Streaming with Configs

Combine streaming with other features:
{
  "retry": { "attempts": 2 },
  "request_timeout": 60000,
  "strategy": { "mode": "fallback" },
  "targets": [
    {"provider": "openai", "api_key": "sk-***"},
    {"provider": "anthropic", "api_key": "sk-ant-***"}
  ]
}
Retries and fallbacks work seamlessly with streaming. If a stream fails mid-way, the Gateway can retry or fallback automatically.

Usage Examples

from portkey_ai import Portkey

client = Portkey(
    api_key="PORTKEY_API_KEY",
    provider="openai",
    Authorization="sk-***"
)

# Streaming chat completion
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a poem about AI"}],
    stream=True
)

# Process chunks as they arrive
for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Stream Format

OpenAI Format (Default)

The Gateway outputs streams in OpenAI’s SSE format:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Chunk Structure

interface StreamChunk {
  id: string;
  object: "chat.completion.chunk";
  created: number;
  model: string;
  choices: Array<{
    index: number;
    delta: {
      role?: string;
      content?: string;
      function_call?: {
        name?: string;
        arguments?: string;
      };
    };
    finish_reason: string | null;
  }>;
}

Provider Support

Streaming Providers

All major providers support streaming:
ProviderStreamingNative FormatGateway Transforms
OpenAISSENo
AnthropicSSEYes
Azure OpenAISSENo
Google GeminiJSONYes
CohereJSONYes
AWS BedrockEvent StreamYes
GroqSSENo
Together AISSEYes
MistralSSEYes
The Gateway automatically transforms all streaming formats to OpenAI-compatible SSE, so you can switch providers without changing client code.

Implementation Details

Stream Processing

From src/handlers/streamHandler.ts:
export async function* readStream(
  reader: ReadableStreamDefaultReader,
  splitPattern: SplitPatternType,
  transformFunction: Function | undefined,
  isSleepTimeRequired: boolean,
  fallbackChunkId: string,
  strictOpenAiCompliance: boolean,
  gatewayRequest: Params
) {
  let buffer = '';
  const decoder = new TextDecoder();
  const streamState = {};

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    while (buffer.split(splitPattern).length > 1) {
      const parts = buffer.split(splitPattern);
      const lastPart = parts.pop() ?? '';
      
      for (const part of parts) {
        if (part.length > 0) {
          if (transformFunction) {
            const transformedChunk = transformFunction(
              part,
              fallbackChunkId,
              streamState,
              strictOpenAiCompliance,
              gatewayRequest
            );
            if (transformedChunk !== undefined) {
              yield transformedChunk;
            }
          } else {
            yield part + splitPattern;
          }
        }
      }

      buffer = lastPart;
    }
  }
}

Provider-Specific Handling

AWS Bedrock Event Streams

Bedrock uses binary event streams that require special parsing:
export async function* readAWSStream(
  reader: ReadableStreamDefaultReader,
  transformFunction: Function | undefined,
  fallbackChunkId: string,
  strictOpenAiCompliance: boolean,
  gatewayRequest: Params
) {
  let buffer = new Uint8Array();
  let expectedLength = 0;

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer = concatenateUint8Arrays(buffer, value);

    while (buffer.length >= expectedLength && buffer.length !== 0) {
      const data = buffer.subarray(0, expectedLength);
      buffer = buffer.subarray(expectedLength);
      
      const payload = getPayloadFromAWSChunk(data);
      
      if (transformFunction) {
        const transformedChunk = transformFunction(
          payload,
          fallbackChunkId,
          streamState,
          strictOpenAiCompliance,
          gatewayRequest
        );
        yield transformedChunk;
      }
    }
  }
}

Advanced Streaming Patterns

Function Calling with Streaming

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "parameters": {"type": "object", "properties": {}}
        }
    }],
    stream=True
)

for chunk in response:
    delta = chunk.choices[0].delta
    
    # Check for function call
    if delta.function_call:
        print(f"Function: {delta.function_call.name}")
        print(f"Args: {delta.function_call.arguments}")
    
    # Check for content
    if delta.content:
        print(delta.content, end="")

Streaming with Metadata

client = Portkey(
    api_key="PORTKEY_API_KEY",
    provider="openai",
    Authorization="sk-***",
    metadata={
        "user_id": "user_123",
        "session_id": "sess_456"
    }
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True
)

for chunk in response:
    # Metadata is logged for each chunk
    print(chunk.choices[0].delta.content, end="")

Multi-Provider Streaming with Fallback

client = Portkey(
    api_key="PORTKEY_API_KEY",
    config={
        "strategy": {"mode": "fallback"},
        "targets": [
            {"provider": "openai", "api_key": "sk-***"},
            {"provider": "anthropic", "api_key": "sk-ant-***"},
            {"provider": "google", "api_key": "***"}
        ]
    }
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Tell me a story"}],
    stream=True
)

for chunk in response:
    # If OpenAI fails, automatically falls back to Anthropic or Google
    # Format remains consistent across all providers
    print(chunk.choices[0].delta.content, end="")

Error Handling

Handling Stream Errors

try:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True
    )
    
    for chunk in response:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
            
except Exception as e:
    print(f"Stream error: {e}")
    # Handle error (retry, fallback, etc.)

Mid-Stream Failures

The Gateway handles mid-stream failures:
{
  "retry": { "attempts": 2 },
  "strategy": { "mode": "fallback" },
  "targets": [
    {"provider": "openai", "api_key": "sk-***"},
    {"provider": "anthropic", "api_key": "sk-ant-***"}
  ]
}
If a stream fails:
  1. Gateway attempts retry with same provider
  2. Falls back to next provider if retry fails
  3. New stream starts from beginning
When a stream fails mid-way and falls back, the response starts over from the beginning. Your application should handle partial responses appropriately.

Performance Optimization

Latency Optimization

# Azure OpenAI has slight delays between chunks
client = Portkey(
    api_key="PORTKEY_API_KEY",
    provider="azure-openai",
    # Gateway adds 1ms sleep between chunks for Azure
    custom_host="https://your-resource.openai.azure.com"
)
The Gateway automatically handles provider-specific timing requirements.

Chunk Buffering

For better UI rendering, buffer small chunks:
let buffer = '';
const BUFFER_SIZE = 10;

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) {
    buffer += content;
    
    if (buffer.length >= BUFFER_SIZE) {
      updateUI(buffer);
      buffer = '';
    }
  }
}

if (buffer) {
  updateUI(buffer);  // Flush remaining
}

Best Practices

Always check for finish_reason to detect stream completion:
if chunk.choices[0].finish_reason == "stop":
    print("\nStream completed successfully")
Streaming responses take longer. Set higher timeouts:
{"request_timeout": 60000}
Streams can fail mid-way. Implement client-side retry:
max_retries = 3
for attempt in range(max_retries):
    try:
        for chunk in stream:
            process(chunk)
        break
    except Exception as e:
        if attempt == max_retries - 1:
            raise
Buffer small chunks before updating UI to reduce render overhead and improve perceived performance.
Track metrics like time-to-first-chunk and total stream duration to optimize user experience.

Realtime APIs

WebSocket-based realtime APIs

Timeouts

Configure streaming timeouts

Fallbacks

Fallback on stream failures

Retries

Retry failed streams

Build docs developers (and LLMs) love