Skip to main content

Streaming

Streaming allows you to receive AI-generated content incrementally as it’s produced, creating responsive user experiences without waiting for the complete response.

Basic Text Streaming

Stream text as it’s generated:
import { genkit } from 'genkit';
import { googleAI } from '@genkit-ai/google-genai';

const ai = genkit({ plugins: [googleAI()] });

const { stream } = await ai.generate({
  model: googleAI.model('gemini-2.5-flash'),
  prompt: 'Write a short story about a robot learning to paint',
  streamingCallback: (chunk) => {
    process.stdout.write(chunk.text);
  }
});

// Or iterate over chunks
for await (const chunk of stream) {
  process.stdout.write(chunk.text);
}

Streaming Flows

Create flows that stream responses:
genkit.DefineStreamingFlow(g, "streamStory",
    func(ctx context.Context, topic string, send core.StreamCallback[string]) (string, error) {
        stream := genkit.GenerateStream(ctx, g,
            ai.WithModelName("googleai/gemini-2.5-flash"),
            ai.WithPrompt("Write a story about %s", topic),
        )

        for result, err := range stream {
            if err != nil {
                return "", err
            }
            if result.Done {
                return result.Response.Text(), nil
            }
            send(ctx, result.Chunk.Text())
        }
        return "", nil
    },
)

Streaming Structured Data

Stream type-safe JSON objects as they’re being generated:
type Ingredient struct {
    Name   string `json:"name"`
    Amount string `json:"amount"`
}

type Recipe struct {
    Title       string        `json:"title"`
    Ingredients []*Ingredient `json:"ingredients"`
}

stream := genkit.GenerateDataStream[*Recipe](ctx, g,
    ai.WithModelName("googleai/gemini-2.5-flash"),
    ai.WithPrompt("Create a recipe for spaghetti carbonara."),
)

for result, err := range stream {
    if err != nil {
        log.Fatal(err)
    }
    if result.Done {
        fmt.Printf("\nComplete recipe: %s\n", result.Output.Title)
        break
    }
    // Access partial data as it streams in
    if result.Chunk != nil && len(result.Chunk.Ingredients) > 0 {
        fmt.Printf("Found ingredient: %s\n", result.Chunk.Ingredients[0].Name)
    }
}

Streaming Flow with Structured Output

genkit.DefineStreamingFlow(g, "structuredJokesFlow",
    func(ctx context.Context, input JokeRequest, sendChunk core.StreamCallback[*Joke]) (*Joke, error) {
        stream := genkit.GenerateDataStream[*Joke](ctx, g,
            ai.WithModelName("googleai/gemini-2.5-flash"),
            ai.WithPrompt("Share a long joke about %s.", input.Topic),
        )

        for result, err := range stream {
            if err != nil {
                return nil, fmt.Errorf("could not generate joke: %w", err)
            }
            if result.Done {
                return result.Output, nil
            }
            sendChunk(ctx, result.Chunk)
        }

        return nil, nil
    })

Server-Sent Events (SSE)

When serving flows over HTTP, Genkit automatically streams responses using Server-Sent Events:
import "net/http"

mux := http.NewServeMux()
for _, flow := range genkit.ListFlows(g) {
    mux.HandleFunc("POST /"+flow.Name(), genkit.Handler(flow))
}
log.Fatal(http.ListenAndServe(":8080", mux))
The response streams as Server-Sent Events:
data: {"message":"Once upon"}

data: {"message":" a time"}

data: {"message":" there was"}

Passthrough Streaming

Pass streaming chunks directly from the model to the client:
genkit.DefineStreamingFlow(g, "streamingJokesFlow",
    func(ctx context.Context, input string, sendChunk ai.ModelStreamCallback) (string, error) {
        if input == "" {
            input = "airplane food"
        }

        resp, err := genkit.Generate(ctx, g,
            ai.WithModelName("googleai/gemini-2.5-flash"),
            ai.WithPrompt("Share a joke about %s.", input),
            ai.WithStreaming(sendChunk),
        )
        if err != nil {
            return "", fmt.Errorf("could not generate joke: %w", err)
        }

        return resp.Text(), nil
    },
)

Durable Streaming (Experimental)

Allow clients to reconnect to in-progress or completed streams:
import "github.com/firebase/genkit/go/core/x/streaming"

mux.HandleFunc("POST /myFlow", genkit.Handler(myStreamingFlow,
    genkit.WithStreamManager(streaming.NewInMemoryStreamManager(
        streaming.WithTTL(10*time.Minute),
    )),
))
Clients receive a stream ID in the X-Genkit-Stream-Id header and can reconnect to replay buffered chunks. See the durable-streaming sample for a complete example.

Best Practices

Use Streaming for Long Responses

Stream responses when generating long-form content to improve perceived performance:
  • Stories, articles, or essays
  • Detailed explanations
  • Code generation
  • Multi-paragraph summaries

Handle Errors Gracefully

Always check for errors in streaming loops:
Go
for result, err := range stream {
    if err != nil {
        log.Printf("Stream error: %v", err)
        return "", err
    }
    // Process chunk
}

Consider Network Conditions

Streaming works best with stable connections. For unreliable networks, consider:
  • Using durable streaming with reconnection support
  • Buffering chunks before sending to the client
  • Falling back to non-streaming for small responses

Next Steps

Build docs developers (and LLMs) love