Overview
Streaming allows you to receive LLM responses incrementally as they’re generated, rather than waiting for the complete response. This significantly improves perceived latency and enables real-time user interfaces like ChatGPT.
The Gateway supports streaming for all compatible providers with automatic format transformation to OpenAI’s Server-Sent Events (SSE) format.
How It Works
When streaming is enabled:
Request is sent to the provider with stream: true
Provider begins generating the response
Gateway receives chunks as they’re generated
Chunks are transformed to OpenAI format (if needed)
Transformed chunks are streamed to your application
Final chunk signals completion
The Gateway handles different streaming formats (SSE, JSON lines, AWS event streams) and normalizes them to OpenAI’s format, making it easy to switch providers.
Configuration
Basic Streaming
Simply set stream: true in your request:
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Tell me a story" }],
stream = True
)
No special Gateway configuration needed.
Streaming with Configs
Combine streaming with other features:
{
"retry" : { "attempts" : 2 },
"request_timeout" : 60000 ,
"strategy" : { "mode" : "fallback" },
"targets" : [
{ "provider" : "openai" , "api_key" : "sk-***" },
{ "provider" : "anthropic" , "api_key" : "sk-ant-***" }
]
}
Retries and fallbacks work seamlessly with streaming. If a stream fails mid-way, the Gateway can retry or fallback automatically.
Usage Examples
from portkey_ai import Portkey
client = Portkey(
api_key = "PORTKEY_API_KEY" ,
provider = "openai" ,
Authorization = "sk-***"
)
# Streaming chat completion
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Write a poem about AI" }],
stream = True
)
# Process chunks as they arrive
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" , flush = True )
The Gateway outputs streams in OpenAI’s SSE format:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Chunk Structure
interface StreamChunk {
id : string ;
object : "chat.completion.chunk" ;
created : number ;
model : string ;
choices : Array <{
index : number ;
delta : {
role ?: string ;
content ?: string ;
function_call ?: {
name ?: string ;
arguments ?: string ;
};
};
finish_reason : string | null ;
}>;
}
Provider Support
Streaming Providers
All major providers support streaming:
Provider Streaming Native Format Gateway Transforms OpenAI ✅ SSE No Anthropic ✅ SSE Yes Azure OpenAI ✅ SSE No Google Gemini ✅ JSON Yes Cohere ✅ JSON Yes AWS Bedrock ✅ Event Stream Yes Groq ✅ SSE No Together AI ✅ SSE Yes Mistral ✅ SSE Yes
The Gateway automatically transforms all streaming formats to OpenAI-compatible SSE, so you can switch providers without changing client code.
Implementation Details
Stream Processing
From src/handlers/streamHandler.ts:
export async function* readStream (
reader : ReadableStreamDefaultReader ,
splitPattern : SplitPatternType ,
transformFunction : Function | undefined ,
isSleepTimeRequired : boolean ,
fallbackChunkId : string ,
strictOpenAiCompliance : boolean ,
gatewayRequest : Params
) {
let buffer = '' ;
const decoder = new TextDecoder ();
const streamState = {};
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
buffer += decoder . decode ( value , { stream: true });
while ( buffer . split ( splitPattern ). length > 1 ) {
const parts = buffer . split ( splitPattern );
const lastPart = parts . pop () ?? '' ;
for ( const part of parts ) {
if ( part . length > 0 ) {
if ( transformFunction ) {
const transformedChunk = transformFunction (
part ,
fallbackChunkId ,
streamState ,
strictOpenAiCompliance ,
gatewayRequest
);
if ( transformedChunk !== undefined ) {
yield transformedChunk ;
}
} else {
yield part + splitPattern ;
}
}
}
buffer = lastPart ;
}
}
}
Provider-Specific Handling
AWS Bedrock Event Streams
Bedrock uses binary event streams that require special parsing:
export async function* readAWSStream (
reader : ReadableStreamDefaultReader ,
transformFunction : Function | undefined ,
fallbackChunkId : string ,
strictOpenAiCompliance : boolean ,
gatewayRequest : Params
) {
let buffer = new Uint8Array ();
let expectedLength = 0 ;
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
buffer = concatenateUint8Arrays ( buffer , value );
while ( buffer . length >= expectedLength && buffer . length !== 0 ) {
const data = buffer . subarray ( 0 , expectedLength );
buffer = buffer . subarray ( expectedLength );
const payload = getPayloadFromAWSChunk ( data );
if ( transformFunction ) {
const transformedChunk = transformFunction (
payload ,
fallbackChunkId ,
streamState ,
strictOpenAiCompliance ,
gatewayRequest
);
yield transformedChunk ;
}
}
}
}
Advanced Streaming Patterns
Function Calling with Streaming
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "What's the weather?" }],
tools = [{
"type" : "function" ,
"function" : {
"name" : "get_weather" ,
"parameters" : { "type" : "object" , "properties" : {}}
}
}],
stream = True
)
for chunk in response:
delta = chunk.choices[ 0 ].delta
# Check for function call
if delta.function_call:
print ( f "Function: { delta.function_call.name } " )
print ( f "Args: { delta.function_call.arguments } " )
# Check for content
if delta.content:
print (delta.content, end = "" )
client = Portkey(
api_key = "PORTKEY_API_KEY" ,
provider = "openai" ,
Authorization = "sk-***" ,
metadata = {
"user_id" : "user_123" ,
"session_id" : "sess_456"
}
)
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello" }],
stream = True
)
for chunk in response:
# Metadata is logged for each chunk
print (chunk.choices[ 0 ].delta.content, end = "" )
Multi-Provider Streaming with Fallback
client = Portkey(
api_key = "PORTKEY_API_KEY" ,
config = {
"strategy" : { "mode" : "fallback" },
"targets" : [
{ "provider" : "openai" , "api_key" : "sk-***" },
{ "provider" : "anthropic" , "api_key" : "sk-ant-***" },
{ "provider" : "google" , "api_key" : "***" }
]
}
)
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Tell me a story" }],
stream = True
)
for chunk in response:
# If OpenAI fails, automatically falls back to Anthropic or Google
# Format remains consistent across all providers
print (chunk.choices[ 0 ].delta.content, end = "" )
Error Handling
Handling Stream Errors
try :
response = client.chat.completions.create(
model = "gpt-4o" ,
messages = [{ "role" : "user" , "content" : "Hello" }],
stream = True
)
for chunk in response:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" )
except Exception as e:
print ( f "Stream error: { e } " )
# Handle error (retry, fallback, etc.)
Mid-Stream Failures
The Gateway handles mid-stream failures:
{
"retry" : { "attempts" : 2 },
"strategy" : { "mode" : "fallback" },
"targets" : [
{ "provider" : "openai" , "api_key" : "sk-***" },
{ "provider" : "anthropic" , "api_key" : "sk-ant-***" }
]
}
If a stream fails:
Gateway attempts retry with same provider
Falls back to next provider if retry fails
New stream starts from beginning
When a stream fails mid-way and falls back, the response starts over from the beginning. Your application should handle partial responses appropriately.
Latency Optimization
# Azure OpenAI has slight delays between chunks
client = Portkey(
api_key = "PORTKEY_API_KEY" ,
provider = "azure-openai" ,
# Gateway adds 1ms sleep between chunks for Azure
custom_host = "https://your-resource.openai.azure.com"
)
The Gateway automatically handles provider-specific timing requirements.
Chunk Buffering
For better UI rendering, buffer small chunks:
let buffer = '' ;
const BUFFER_SIZE = 10 ;
for await ( const chunk of stream ) {
const content = chunk . choices [ 0 ]?. delta ?. content ;
if ( content ) {
buffer += content ;
if ( buffer . length >= BUFFER_SIZE ) {
updateUI ( buffer );
buffer = '' ;
}
}
}
if ( buffer ) {
updateUI ( buffer ); // Flush remaining
}
Best Practices
Always check for finish_reason to detect stream completion: if chunk.choices[ 0 ].finish_reason == "stop" :
print ( " \n Stream completed successfully" )
Streaming responses take longer. Set higher timeouts: { "request_timeout" : 60000 }
Streams can fail mid-way. Implement client-side retry: max_retries = 3
for attempt in range (max_retries):
try :
for chunk in stream:
process(chunk)
break
except Exception as e:
if attempt == max_retries - 1 :
raise
Buffer small chunks before updating UI to reduce render overhead and improve perceived performance.
Monitor Stream Performance
Track metrics like time-to-first-chunk and total stream duration to optimize user experience.
Realtime APIs WebSocket-based realtime APIs
Timeouts Configure streaming timeouts
Fallbacks Fallback on stream failures
Retries Retry failed streams