Skip to main content
Flows orchestrate multi-step AI tasks with automatic tracing. See genkit.DefineFlow() and genkit.DefineStreamingFlow() for full API documentation.

Quick Reference

// Non-streaming flow
myFlow := genkit.DefineFlow(g, "myFlow",
    func(ctx context.Context, input string) (string, error) {
        // Flow implementation
        return output, nil
    },
)

result, err := myFlow.Run(ctx, "input")
// Streaming flow
streamFlow := genkit.DefineStreamingFlow(g, "streamFlow",
    func(ctx context.Context, input int, stream func(context.Context, int) error) (string, error) {
        for i := 0; i < input; i++ {
            if err := stream(ctx, i); err != nil {
                return "", err
            }
        }
        return "done", nil
    },
)

for result := range streamFlow.Stream(ctx, 5) {
    if result.Done {
        fmt.Println(result.Output)
    } else {
        fmt.Println(result.Stream)
    }
}

Flow Methods

Run()

result, err := flow.Run(ctx, input)

Stream()

for result := range flow.Stream(ctx, input) {
    // Process streaming results
}

Tracing with Run()

genkit.DefineFlow(g, "tracedFlow",
    func(ctx context.Context, input string) (string, error) {
        // Traced step 1
        processed, err := genkit.Run(ctx, "process", func() (string, error) {
            return strings.ToUpper(input), nil
        })
        if err != nil {
            return "", err
        }
        
        // Traced step 2
        result, err := genkit.Run(ctx, "generate", func() (string, error) {
            resp, err := genkit.Generate(ctx, g,
                ai.WithPrompt(processed),
            )
            if err != nil {
                return "", err
            }
            return resp.Text(), nil
        })
        
        return result, nil
    },
)

Build docs developers (and LLMs) love