Skip to main content

Real-time Streaming

Streaming allows you to receive AI responses in real-time as they’re generated, providing a better user experience for long-form content.

How Streaming Works

Cencori uses Server-Sent Events (SSE) to stream responses from AI providers. Each chunk is sent as a data: event with JSON payload.

Stream Format

Each chunk follows this format:
{
  "delta": "text chunk",
  "finish_reason": "stop" | "length" | "tool_calls" | null
}

Basic Streaming

import { Cencori } from '@cencori/sdk';

const cencori = new Cencori({
  apiKey: process.env.CENCORI_API_KEY
});

async function streamChat() {
  const stream = await cencori.ai.chatStream({
    messages: [
      { role: 'user', content: 'Tell me a long story' }
    ],
    model: 'gpt-4o',
    stream: true
  });

  for await (const chunk of stream) {
    process.stdout.write(chunk.delta);
    
    if (chunk.finishReason) {
      console.log('\nStream finished:', chunk.finishReason);
    }
  }
}

Next.js App Router Example

Here’s how streaming is implemented in the Cencori platform:
app/api/ai/chat/route.ts
if (stream === true) {
  const encoder = new TextEncoder();

  const customReadable = new ReadableStream({
    async start(controller) {
      try {
        let fullContent = '';

        for await (const chunk of provider.stream(chatRequest)) {
          // Accumulate content
          fullContent += chunk.delta;

          // Real-time security scanning
          const outputSecurity = checkOutputSecurity(fullContent, {
            inputText,
            conversationHistory: unifiedMessages
          });

          if (!outputSecurity.safe) {
            controller.enqueue(
              encoder.encode(
                `data: ${JSON.stringify({ 
                  error: 'Response blocked' 
                })}\n\n`
              )
            );
            controller.close();
            return;
          }

          // Send chunk to client
          controller.enqueue(
            encoder.encode(
              `data: ${JSON.stringify({ 
                delta: chunk.delta,
                finish_reason: chunk.finishReason 
              })}\n\n`
            )
          );

          if (chunk.finishReason) {
            controller.enqueue(encoder.encode('data: [DONE]\n\n'));
          }
        }
        controller.close();
      } catch (error) {
        controller.enqueue(
          encoder.encode(
            `data: ${JSON.stringify({ 
              error: error.message 
            })}\n\n`
          )
        );
        controller.close();
      }
    },
  });

  return new Response(customReadable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

Client-Side Streaming

React Hook Example

hooks/use-stream.ts
import { useState } from 'react';

export function useStreamingChat() {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState<string | null>(null);

  const streamChat = async (messages: Array<{role: string; content: string}>) => {
    setIsStreaming(true);
    setContent('');
    setError(null);

    try {
      const response = await fetch('/api/ai/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages, stream: true })
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) throw new Error('No reader');

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          
          const data = line.slice(6);
          if (data === '[DONE]') continue;

          try {
            const parsed = JSON.parse(data);
            if (parsed.error) {
              setError(parsed.error);
              break;
            }
            setContent(prev => prev + parsed.delta);
          } catch (e) {
            // Skip invalid JSON
          }
        }
      }
    } catch (err) {
      setError(err instanceof Error ? err.message : 'Stream error');
    } finally {
      setIsStreaming(false);
    }
  };

  return { content, isStreaming, error, streamChat };
}

Streaming with Fallback

Cencori automatically handles failover during streaming:
lib/providers/router.ts
async function* tryStreamWithFallback() {
  let actualProvider = providerName;
  let usedFallback = false;

  // Try primary provider
  if (!(await isCircuitOpen(providerName))) {
    for (let attempt = 0; attempt < maxRetries; attempt++) {
      try {
        const streamGen = provider.stream(chatRequest);
        for await (const chunk of streamGen) {
          yield { ...chunk, actualProvider, usedFallback };
        }
        await recordSuccess(providerName);
        return;
      } catch (error) {
        if (isNonRetryableError(error)) throw error;
        await recordFailure(providerName);
      }
    }
  }

  // Try fallback providers
  if (enableFallback) {
    const fallbackChain = getFallbackChain(providerName);
    
    for (const fallbackProvider of fallbackChain) {
      if (await isCircuitOpen(fallbackProvider)) continue;

      try {
        const fallbackModel = getFallbackModel(model, fallbackProvider);
        actualProvider = fallbackProvider;
        usedFallback = true;

        const streamGen = fallbackProvider.stream({
          ...chatRequest,
          model: fallbackModel
        });

        for await (const chunk of streamGen) {
          yield { ...chunk, actualProvider, usedFallback };
        }
        
        return;
      } catch (error) {
        console.warn(`Fallback ${fallbackProvider} failed`);
      }
    }
  }

  throw new Error('All providers failed');
}

Error Handling

  1. Detect Errors in Stream Check each chunk for error fields:
    if (parsed.error) {
      console.error('Stream error:', parsed.error);
      // Stop processing
      break;
    }
    
  2. Handle Network Failures Wrap stream processing in try-catch:
    try {
      for await (const chunk of stream) {
        // Process chunk
      }
    } catch (error) {
      if (error.name === 'AbortError') {
        console.log('Stream cancelled');
      } else {
        console.error('Stream failed:', error);
      }
    }
    
  3. Implement Timeout Add timeout to prevent hanging:
    const controller = new AbortController();
    const timeout = setTimeout(() => controller.abort(), 30000);
    
    try {
      const response = await fetch('/api/ai/chat', {
        signal: controller.signal,
        // ...
      });
    } finally {
      clearTimeout(timeout);
    }
    

Best Practices

  • Buffer Management: Process chunks immediately to avoid memory buildup
  • Security Scanning: Implement real-time content filtering during streaming
  • User Feedback: Show loading states and progress indicators
  • Cancellation: Provide abort functionality for long-running streams
  • Error Recovery: Gracefully handle stream interruptions

Performance Tips

  1. Use Smaller Models: gemini-2.0-flash streams faster than gpt-4
  2. Reduce max_tokens: Limit response length for faster completion
  3. Client-Side Buffering: Batch small chunks before rendering
  4. Connection Pooling: Reuse HTTP connections for multiple streams

Next Steps

Build docs developers (and LLMs) love